Data ingest from parquet files
Initial data import
Parquet is an efficient data storage format. This tutorial demonstrates how to ingest data from Apache Parquet files stored in a Microsoft Azure storage bucket to the kdb Insights Enterprise database. The parquet files used in this walkthrough are the New York City taxi ride data from February 2022. These files are provided by the NYC Taxi and Limousine Commission.
In cases where you wish to ingest your own data, stored in Parquet file format, the pipeline created as part of this walkthrough can be used as a template.
Prerequisites
- kdb Insights Enterprise is installed
- A copy of the February 2022 New York City taxi Parquet files stored in an accessible Microsoft Azure storage bucket.
-
A Kubernetes secret to access the Microsoft Azure storage bucket containing the Parquet files. The Parquet Reader q API documentation includes details on how to set up this secret.
Suggested secret name
The rest of this tutorial assumes the secret name is
pqtcreds
The steps in the process are:
Create and deploy the database
-
Select Build a Database in the Overview page of the kdb Insights Enteprise UI.
Detailed database creation walkthrough
A detailed walkthrough of how to create and deploy a database is available here.
-
Name the database
taxidb
. -
Open the Schema Settings tab and click on the right-hand-side.
-
Paste the following JSON schema into the code editor:
JSON schema
Paste into the code editor:
[ { "name": "taxitable", "type": "partitioned", "primaryKeys": [], "prtnCol": "lpep_dropoff_datetime", "sortColsDisk": [ "VendorID" ], "sortColsMem": [ "VendorID" ], "sortColsOrd": [ "VendorID" ], "columns": [ { "type": "long", "attrDisk": "parted", "attrOrd": "parted", "name": "VendorID", "attrMem": "", "foreign": "" }, { "type": "timestamp", "attrDisk": "", "attrMem": "", "attrOrd": "", "name": "lpep_pickup_datetime", "foreign": "" }, { "type": "timestamp", "attrDisk": "", "attrMem": "", "attrOrd": "", "name": "lpep_dropoff_datetime", "foreign": "" }, { "type": "string", "attrDisk": "", "attrMem": "", "attrOrd": "", "name": "store_and_fwd_flag", "foreign": "" }, { "type": "float", "attrDisk": "", "attrMem": "", "attrOrd": "", "name": "RatecodeID", "foreign": "" }, { "type": "long", "attrDisk": "", "attrMem": "", "attrOrd": "", "name": "PULocationID", "foreign": "" }, { "type": "long", "attrDisk": "", "attrMem": "", "attrOrd": "", "name": "DOLocationID", "foreign": "" }, { "type": "float", "attrDisk": "", "attrMem": "", "attrOrd": "", "name": "passenger_count", "foreign": "" }, { "type": "float", "attrDisk": "", "attrMem": "", "attrOrd": "", "name": "trip_distance", "foreign": "" }, { "type": "float", "attrDisk": "", "attrMem": "", "attrOrd": "", "name": "fare_amount", "foreign": "" }, { "type": "float", "attrDisk": "", "attrMem": "", "attrOrd": "", "name": "extra", "foreign": "" }, { "type": "float", "attrDisk": "", "attrMem": "", "attrOrd": "", "name": "mta_tax", "foreign": "" }, { "type": "float", "attrDisk": "", "attrMem": "", "attrOrd": "", "name": "tip_amount", "foreign": "" }, { "type": "float", "attrDisk": "", "attrMem": "", "attrOrd": "", "name": "tolls_amount", "foreign": "" }, { "type": "", "attrDisk": "", "attrMem": "", "attrOrd": "", "name": "ehail_fee", "foreign": "" }, { "type": "float", "attrDisk": "", "attrMem": "", "attrOrd": "", "name": "improvement_surcharge", "foreign": "" }, { "type": "float", "attrDisk": "", "attrMem": "", "attrOrd": "", "name": "total_amount", "foreign": "" }, { "type": "float", "attrDisk": "", "attrMem": "", "attrOrd": "", "name": "payment_type", "foreign": "" }, { "type": "float", "attrDisk": "", "attrMem": "", "attrOrd": "", "name": "trip_type", "foreign": "" }, { "type": "float", "attrDisk": "", "attrMem": "", "attrOrd": "", "name": "congestion_surcharge", "foreign": "" } ] } ]
-
Apply the JSON.
-
Save the database.
-
Deploy the database.
When the database status changes to Active
, it is ready to use.
Create and deploy a Parquet reader pipeline
To create the pipeline:
-
Hover over Pipelines in the menu on the left pane and click the + symbol to add a pipeline.
-
Name this pipeline
taxipipeline
-
Add a Parquet Reader node:
- Type
parquet
in the search box - Drag and drop the Parquet node into the pipeline pane.
The screen will update as illustrated:
- Type
-
Configure the Parquet Reader node:
- Click on Parquet node in the pipeline pane
- Input the configuration parameters, described in the following table, in the Configure Parquet Node screen
variable value details Path Type Azure Select from dropdown Parquet URl URl to your storage bucket For example, ms:// /trip_data/taxi=green/year=2022/month=02/green_tripdata*.parquet. Note the *
present in the URl,*
will match any string in a directory or file name. Glob patterns provide a way to specify a path that can match one or more filesDecode Modality Table The format to which the parquet file is decoded, select from dropdown Use Watching No Watch for new storage bucket objects. This is unchecked here as the pipeline is intended to pick up a fixed set of parquet files Use Authentication Yes Use a Kubernetes secret with the cloud provider Kubernetes Secret pqtcreds Name of the Kubernetes secret Use Certificates No -
Add a Database Writer node to the pipeline:
- Type
writers
in the search box. - Drag and drop the kdb Insights Database node into the pipeline pane.
- Connect the parquet reader node to the database writer node by clicking and drag the dot that represents the data output terminal on the Parquet node to the dot that represents the data input terminal on the kdb Insights Database node.
You screen will be updated as illustrated:
- Type
-
Configure the Database Writer to point to the database and table created previously. Use the variables described in the following table to complete the Configure kdb Insights Database Node screen.
variable value details Database taxidb kdb Insights Database to write the data to Table taxitable Table to write the data to in the selected database Write Direct to HDB Yes When enabled, data is directly written to the database Deduplicate Stream No Data will be deduplicated. Useful if running multiple publishers that are generating the same data Set Timeout Valuable No Overwrite Yes Overwrites content within a each date partition with the new batch ingest table data Write Direct to HDB
See here for more details on the kdb Insights Database writer.
-
Configure the pipeline memory. To ensure sufficient memory in your pipeline to ingest parquet files you must have a memory size allocated that is 8 times the size of the biggest parquet file ingested.
-
Click the pipeline settings tab
-
Scroll down to locate the memory settings and set the values as shown below:
-
-
Start the pipeline by clicking Save & Deploy
-
The pipeline can be torn down once all the parquet files are processed.
Query the data
-
Create a new Query window
-
Adjust the time range to any date-time range in February 2022, and click 'Get Data'
-
Data will be displayed in the console as shown below
The steps above show how easy it is to populate a database with a fixed set of parquet files using a Parquet Reader node.
Daily delivery of parquet files from vendor
We can extend the walkthrough further to facilitate ingestion of parquet files as they are delivered to an object store bucket. One use case for this would be to ingest the data files from the previous day once they are copied into the bucket at a particular time each morning.
You can turn on the Watching feature on your Parquet Reader node to ensure new files that match the file pattern are ingested as they are delivered to the bucket.
This example continues to use the New York City taxi ride data. The assumption is that a vendor is pushing parquet files to your object storage location daily starting from March 1st 2022.
Configuration
- Create a copy of the pipeline above by clicking the Duplicate button
-
Update the following values in the Parquet Reader node:
variable value details Parquet URl URl to the storage bucket For example, ms:// /trip_data/taxi=green/year=2022/month=03/green_tripdata*.parquet. Note the *
present in the URl,*
will match any string in a directory or file name. Glob patterns provide a way to specify a path that can match one or more filesUse Watching Yes Watch for new storage bucket objects. This is checked here as the pipeline is intended to poll for new parquet files Note
The URL chosen means that any files for March 2022 will be ingested as they arrive in the bucket. The pipeline could be left running for a month.
-
Update the following variable in the Database Writer node:.
variable value details Write Direct to HDB No This option is not supported when using the File Watcher
This pipeline shows how you can add data to the database for new data as it arrives in the storage bucket.