Skip to content

Data ingest from parquet files

Initial data import

Parquet is an efficient data storage format. This tutorial demonstrates how to ingest data from Apache Parquet files stored in a Microsoft Azure storage bucket to the kdb Insights Enterprise database. The parquet files used in this walkthrough are the New York City taxi ride data from February 2022. These files are provided by the NYC Taxi and Limousine Commission.

In cases where you wish to ingest your own data, stored in Parquet file format, the pipeline created as part of this walkthrough can be used as a template.

Prerequisites

  • kdb Insights Enterprise is installed
  • A copy of the February 2022 New York City taxi Parquet files stored in an accessible Microsoft Azure storage bucket.
  • A Kubernetes secret to access the Microsoft Azure storage bucket containing the Parquet files. The Parquet Reader q API documentation includes details on how to set up this secret.

    Suggested secret name

    The rest of this tutorial assumes the secret name is pqtcreds

The steps in the process are:

  1. Create and deploy the database
  2. Create and deploy a pipeline
  3. Query the data

Create and deploy the database

  1. Select Build a Database in the Overview page of the kdb Insights Enteprise UI.

    Detailed database creation walkthrough

    A detailed walkthrough of how to create and deploy a database is available here.

  2. Name the database taxidb.

  3. Open the Schema Settings tab and click code view button on the right-hand-side.

  4. Paste the following JSON schema into the code editor:

    JSON schema

    Paste into the code editor:

    [
        {
        "name": "taxitable",
        "type": "partitioned",
        "primaryKeys": [],
        "prtnCol": "lpep_dropoff_datetime",
        "sortColsDisk": [
            "VendorID"
        ],
        "sortColsMem": [
            "VendorID"
        ],
        "sortColsOrd": [
            "VendorID"
        ],
        "columns": [
        {
            "type": "long",
            "attrDisk": "parted",
            "attrOrd": "parted",
            "name": "VendorID",
            "attrMem": "",
            "foreign": ""
        },
        {
            "type": "timestamp",
            "attrDisk": "",
            "attrMem": "",
            "attrOrd": "",
            "name": "lpep_pickup_datetime",
            "foreign": ""
        },
        {
            "type": "timestamp",
            "attrDisk": "",
            "attrMem": "",
            "attrOrd": "",
            "name": "lpep_dropoff_datetime",
            "foreign": ""
        },
        {
            "type": "string",
            "attrDisk": "",
            "attrMem": "",
            "attrOrd": "",
            "name": "store_and_fwd_flag",
            "foreign": ""
        },
        {
            "type": "float",
            "attrDisk": "",
            "attrMem": "",
            "attrOrd": "",
            "name": "RatecodeID",
            "foreign": ""
        },
        {
            "type": "long",
            "attrDisk": "",
            "attrMem": "",
            "attrOrd": "",
            "name": "PULocationID",
            "foreign": ""
        },
        {
            "type": "long",
            "attrDisk": "",
            "attrMem": "",
            "attrOrd": "",
            "name": "DOLocationID",
            "foreign": ""
        },
        {
            "type": "float",
            "attrDisk": "",
            "attrMem": "",
            "attrOrd": "",
            "name": "passenger_count",
            "foreign": ""
        },
        {
            "type": "float",
            "attrDisk": "",
            "attrMem": "",
            "attrOrd": "",
            "name": "trip_distance",
            "foreign": ""
        },
        {
            "type": "float",
            "attrDisk": "",
            "attrMem": "",
            "attrOrd": "",
            "name": "fare_amount",
            "foreign": ""
        },
        {
            "type": "float",
            "attrDisk": "",
            "attrMem": "",
            "attrOrd": "",
            "name": "extra",
            "foreign": ""
        },
        {
            "type": "float",
            "attrDisk": "",
            "attrMem": "",
            "attrOrd": "",
            "name": "mta_tax",
            "foreign": ""
        },
        {
            "type": "float",
            "attrDisk": "",
            "attrMem": "",
            "attrOrd": "",
            "name": "tip_amount",
            "foreign": ""
        },
        {
            "type": "float",
            "attrDisk": "",
            "attrMem": "",
            "attrOrd": "",
            "name": "tolls_amount",
            "foreign": ""
        },
        {
            "type": "",
            "attrDisk": "",
            "attrMem": "",
            "attrOrd": "",
            "name": "ehail_fee",
            "foreign": ""
        },
        {
            "type": "float",
            "attrDisk": "",
            "attrMem": "",
            "attrOrd": "",
            "name": "improvement_surcharge",
            "foreign": ""
        },
        {
            "type": "float",
            "attrDisk": "",
            "attrMem": "",
            "attrOrd": "",
            "name": "total_amount",
            "foreign": ""
        },
        {
            "type": "float",
            "attrDisk": "",
            "attrMem": "",
            "attrOrd": "",
            "name": "payment_type",
            "foreign": ""
        },
        {
            "type": "float",
            "attrDisk": "",
            "attrMem": "",
            "attrOrd": "",
            "name": "trip_type",
            "foreign": ""
        },
        {
            "type": "float",
            "attrDisk": "",
            "attrMem": "",
            "attrOrd": "",
            "name": "congestion_surcharge",
            "foreign": ""
        }
    ]
    }
    ]
    
  5. Apply the JSON.

  6. Save the database.

  7. Deploy the database.

When the database status changes to Active, it is ready to use.

Create and deploy a Parquet reader pipeline

To create the pipeline:

  1. Hover over Pipelines in the menu on the left pane and click the + symbol to add a pipeline.

  2. Name this pipeline taxipipeline

  3. Add a Parquet Reader node:

    1. Type parquet in the search box
    2. Drag and drop the Parquet node into the pipeline pane.

    The screen will update as illustrated:

    Drag and drop parquet node

  4. Configure the Parquet Reader node:

    1. Click on Parquet node in the pipeline pane
    2. Input the configuration parameters, described in the following table, in the Configure Parquet Node screen
    variable value details
    Path Type Azure Select from dropdown
    Parquet URl URl to your storage bucket For example, ms:///trip_data/taxi=green/year=2022/month=02/green_tripdata*.parquet. Note the * present in the URl, * will match any string in a directory or file name. Glob patterns provide a way to specify a path that can match one or more files
    Decode Modality Table The format to which the parquet file is decoded, select from dropdown
    Use Watching No Watch for new storage bucket objects. This is unchecked here as the pipeline is intended to pick up a fixed set of parquet files
    Use Authentication Yes Use a Kubernetes secret with the cloud provider
    Kubernetes Secret pqtcreds Name of the Kubernetes secret
    Use Certificates No

    Configure the parquet node

  5. Add a Database Writer node to the pipeline:

    1. Type writers in the search box.
    2. Drag and drop the kdb Insights Database node into the pipeline pane.
    3. Connect the parquet reader node to the database writer node by clicking and drag the dot that represents the data output terminal on the Parquet node to the dot that represents the data input terminal on the kdb Insights Database node.

    You screen will be updated as illustrated:

    Drag and drop a _kdb Insights Database_ node

  6. Configure the Database Writer to point to the database and table created previously. Use the variables described in the following table to complete the Configure kdb Insights Database Node screen.

    variable value details
    Database taxidb kdb Insights Database to write the data to
    Table taxitable Table to write the data to in the selected database
    Write Direct to HDB Yes When enabled, data is directly written to the database
    Deduplicate Stream No Data will be deduplicated. Useful if running multiple publishers that are generating the same data
    Set Timeout Valuable No
    Overwrite Yes Overwrites content within a each date partition with the new batch ingest table data

    Write Direct to HDB

    See here for more details on the kdb Insights Database writer.

    Configure the _kdb Insights Database_ node

  7. Configure the pipeline memory. You must allocate sufficient memory to your pipeline to ensure it can ingest the parquet files.

    Memory Allocation

    Parquet files can have varying levels of compression. When the file is read by the pipeline, the data held in memory is uncompressed. The level of compression determines the memory allocation required. If you're unsure of the memory required, allocate an amount that is 8 times the size of the largest parquet file you plan to ingest.

    1. Click the pipeline settings tab pipeline settings tab

    2. Scroll down to locate the memory settings and set the values as shown below:

    set pipeline memory

  8. Start the pipeline by clicking Save & Deploy

  9. The pipeline can be torn down once all the parquet files are processed.

Query the data

  1. Create a new Query window

  2. Adjust the time range to any date-time range in February 2022, and click 'Get Data'

    Query View

  3. Data will be displayed in the console as shown below Data in cosole

The steps above show how easy it is to populate a database with a fixed set of parquet files using a Parquet Reader node.

Daily delivery of parquet files from vendor

We can extend the walkthrough further to facilitate ingestion of parquet files as they are delivered to an object store bucket. One use case for this would be to ingest the data files from the previous day once they are copied into the bucket at a particular time each morning.

You can turn on the Watching feature on your Parquet Reader node to ensure new files that match the file pattern are ingested as they are delivered to the bucket.

This example continues to use the New York City taxi ride data. The assumption is that a vendor is pushing parquet files to your object storage location daily starting from March 1st 2022.

Configuration

  1. Create a copy of the pipeline above by clicking the Duplicate button
  2. Update the following values in the Parquet Reader node:

    variable value details
    Parquet URl URl to the storage bucket For example, ms:///trip_data/taxi=green/year=2022/month=03/green_tripdata*.parquet. Note the * present in the URl, * will match any string in a directory or file name. Glob patterns provide a way to specify a path that can match one or more files
    Use Watching Yes Watch for new storage bucket objects. This is checked here as the pipeline is intended to poll for new parquet files

    Note

    The URL chosen means that any files for March 2022 will be ingested as they arrive in the bucket. The pipeline could be left running for a month.

  3. Update the following variable in the Database Writer node:.

    variable value details
    Write Direct to HDB No This option is not supported when using the File Watcher

This pipeline shows how you can add data to the database for new data as it arrives in the storage bucket.