Batch S3 Ingest
This page describes how to read bulk data from S3 and write to an Insights SDK Database
Arbitrary data files can be read from S3, transformed and written to downstream consumers from the Stream Processor. This example illustrates how to ingest data to an Insights Database in Docker.
Prerequisites
Follow the instructions to
download the Docker bundle and docker login portal.dl.kx.com
.
After extracting the tarball, place your kdb+ license in the kxi-db/lic
directory.
Building the walkthrough
Ensure you are in the kxi-db
directory to run these commands.
Firstly create a Docker compose file compose-batch-ingest.yaml
.
---
# include the base database bundle services
include:
- ./compose.yaml
services:
# define an SP controller service
kxi-controller:
image: ${kxi_img_spc}
hostname: kxi-controller
ports:
- ${kxi_port_spc}:${kxi_port_spc}
volumes:
- ${kxi_dir_logs}:/mnt/logs
- ${kxi_dir_sp}:/sp
- ${kxi_dir_lic}:/opt/kx/lic
- ${kxi_dir_config}:/mnt/config
networks: [kx]
environment:
- KDB_LICENSE_B64
command: ["-p", '${kxi_port_spc}']
# define an SP worker service
kxi-worker:
image: ${kxi_img_spw}
hostname: kxi-sp
ports:
- ${kxi_port_spw}:${kxi_port_spw}
volumes:
- ${kxi_dir_logs}:/mnt/logs
- ${kxi_dir_sp}:/sp
- ${kxi_dir_lic}:/opt/kx/lic
- ${kxi_dir_config}:/mnt/config
- ${kxi_dir_db}:/mnt/db
networks: [kx]
environment:
- KXI_SP_SPEC=/mnt/config/${kxi_sp_spec} # Point to the bound spec.q file
- KXI_SP_PARENT_HOST=kxi-controller:${kxi_port_spc} # Point to the parent controller service
- RT_TOPIC_PREFIX=rt-
- RT_REPLICAS=1
- KXI_SP_ORDINAL=1
- KXI_SP_GROUP=grp
command: ["-p", '${kxi_port_spw}']
Details about the environment variables referenced above are available here.
Create a spec file called config/ingest.q
containing the pipeline definition.
It does the following;
- reads data from a public S3 bucket
- uses a CSV decoder to format the data using the
schema
object - renames some columns
- transforms the data to match the database schema by adding an empty
fees
column and reordering - writes the data to the
taxi
schema in the database using atarget
ofkxi-sm:10001
- the
target
value corresponds to SM service's endpoint directWrite
is enabled to specify that data should be written directly to the database instead of streamed
schema:(!) . flip (
(`pickup_datetime ; "p");
(`dropoff_datetime ; "p");
(`vendor ; "s");
(`passengers ; "h");
(`distance ; "e");
(`rate_type ; "*");
(`store_and_fwd ; "*");
(`pickup_borough ; "*");
(`pickup_zone ; "*");
(`pickup_service_zone ; "*");
(`dropoff_borough ; "*");
(`dropoff_zone ; "*");
(`dropoff_service_zone ; "*");
(`payment_type ; "s");
(`fare ; "e");
(`extra ; "e");
(`tax ; "e");
(`tip ; "e");
(`toll ; "e");
(`improvement_surcharge ; "*");
(`congestion_surcharge ; "*");
(`total ; "e"));
exclude:`rate_type`store_and_fwd`pickup_borough`pickup_zone`pickup_service_zone`dropoff_borough`dropoff_zone`dropoff_service_zone`improvement_surcharge`congestion_surcharge;
.qsp.run
.qsp.read.fromAmazonS3["s3://kxs-prd-cxt-twg-roinsightsdemo/ny_taxi_data_2021_12.csv"; "eu-west-1"]
.qsp.decode.csv[schema; .qsp.use enlist[`exclude]!enlist exclude]
.qsp.transform.renameColumns[`pickup_datetime`dropoff_datetime`toll!`pickup`dropoff`tolls]
.qsp.map[{ :`vendor`pickup`dropoff`passengers`distance`fare`extra`tax`tip`tolls`fees`total`payment_type xcols update fees:0e from x }]
.qsp.v2.write.toDatabase[`taxi; .qsp.use `directWrite`target!(1b; "kxi-sm:10001")]
Create a variable file batch-ingest.env
# images
kxi_registry=portal.dl.kx.com
kxi_sp_release=1.12.0
kxi_img_spc=${kxi_registry}/kxi-sp-controller:1.12.0${kxi_sp_release}
kxi_img_spw=${kxi_registry}/kxi-sp-worker:1.12.0${kxi_sp_release}
# networking
kxi_port_spc=6000
kxi_port_spw=7000
# paths
kxi_dir_lic="./lic"
kxi_dir_config="./config"
kxi_dir_sp="./data/sp"
kxi_dir_logs="./data/logs"
kxi_dir_db="./data/db"
kxi_sp_spec="ingest.q"
Read more about the Stream Processor q API.
Create a spec file called config/ingest.py
containing the pipeline definition which does the following;
- reads data from a public S3 bucket
- uses a CSV decoder to format the data using the
schema
object - renames some columns
- transforms the data to match the database schema by adding an empty
fees
column and reordering - writes the data to the
taxi
schema in the database using atarget
ofkxi-sm:10001
- the
target
value corresponds to SM service's endpoint directWrite
is enabled to specify that data should be written directly to the database instead of streamed
import pykx as kx
from kxi import sp
schema = {
'pickup_datetime': 'p',
'dropoff_datetime': 'p',
'vendor': 's',
'passengers': 'h',
'distance': 'e',
'rate_type': '*',
'store_and_fwd': '*',
'pickup_borough': '*',
'pickup_zone': '*',
'pickup_service_zone': '*',
'dropoff_borough': '*',
'dropoff_zone': '*',
'dropoff_service_zone': '*',
'payment_type': 's',
'fare': 'e',
'extra': 'e',
'tax': 'e',
'tip': 'e',
'toll': 'e',
'improvement_surcharge': '*',
'congestion_surcharge': '*',
'total': 'e'
}
exclude = ['rate_type', 'store_and_fwd', 'pickup_borough', 'pickup_zone', 'pickup_service_zone', 'dropoff_borough', 'dropoff_zone', 'dropoff_service_zone', 'improvement_surcharge', 'congestion_surcharge']
def reorder(data):
order = ['vendor', 'pickup', 'dropoff', 'passengers', 'distance', 'fare', 'extra', 'tax', 'tip', 'tolls', 'fees', 'total', 'payment_type']
data = kx.q.qsql.update(data, {'fees': '0e'})
return kx.q.xcols(order, data)
sp.run(sp.read.from_amazon_s3('s3://kxs-prd-cxt-twg-roinsightsdemo/ny_taxi_data_2021_12.csv', region='eu-west-1')
| sp.decode.csv(schema, exclude=exclude)
| sp.transform.rename_columns({'pickup_datetime': 'pickup', 'dropoff_datetime': 'dropoff', 'toll': 'tolls'})
| sp.map(reorder)
| sp.write.to_database('taxi', target='kxi-sm:10001', directWrite=True, api_version=2))
Create a variable file batch-ingest.env
# images
kxi_registry=portal.dl.kx.com
kxi_sp_release=1.12.0
kxi_img_spc=${kxi_registry}/kxi-sp-controller:1.12.0${kxi_sp_release}
kxi_img_spw=${kxi_registry}/kxi-sp-python:1.12.0${kxi_sp_release}
# networking
kxi_port_spc=6000
kxi_port_spw=7000
# paths
kxi_dir_lic="./lic"
kxi_dir_config="./config"
kxi_dir_sp="./data/sp"
kxi_dir_logs="./data/logs"
kxi_dir_db="./data/db"
kxi_sp_spec="ingest.py"
Read more about the Stream Processor Python API.
Running the ingest
Run the deployment using the following command.
docker compose -f compose-batch-ingest.yaml --env-file batch-ingest.env up
You can check status of the pipeline using the command.
curl http://localhost:6000/details
This outputs a JSON payload of the form
{
"state": "RUNNING",
"error": "",
"metrics": {
"eventRate": 0,
"bytesRate": 0,
"latency": -1.7976931348623157e+308,
"inputRate": 8746320,
"outputRate": 41618.86
},
"logCounts": {
"trace": 0,
"debug": 0,
"info": 63,
"warn": 0,
"error": 0,
"fatal": 0
},
"readerMetadata": []
}
The pipeline has completed when the state
field is set to FINISHED
.
This indicates all of the data has been written to the database and successfully ingested.
You can check the ingest session in the database using the following command.
curl http://localhost:10001/ingest
The session is marked with a status
of pending
while the SP pipeline is writing data.
[
{
"name": "pipeline-482f128ffe-0",
"pipeline": "",
"database": "kdb Insights",
"updtype": "ingest",
"status": "pending",
"details": [],
"tbls": [],
"dates": [],
"progress": {
"cmdCurrent": "",
"cmdIndex": null,
"cmdTotal": null,
"subCurrent": "",
"subIndex": null,
"subTotal": null
},
"error": [],
"updated": "2024-11-22T12:12:25.518555535"
}
]
This updates to processing
when the database is ingesting the data.
[
{
"name": "pipeline-482f128ffe-0",
"pipeline": "",
"database": "kdb Insights",
"updtype": "ingest",
"status": "processing",
"details": {
"kxSessionInfo.752fb820-e0db-6c26-0803-c194e45ed5a8": {
"pipelineName": [],
"pipelineID": "pipeline-482f128ffe",
"workerName": "spwork-kxi-sp",
"operatorID": "database.writer_taxi",
"ingestStartTime": "2024-11-22T12:12:17.580482856",
"ingestEndTime": "2024-11-22T12:14:14.123753107"
},
"subsessions": [
"752fb820-e0db-6c26-0803-c194e45ed5a8"
],
"dates": [
"2021-12-01",
..
"2021-12-31"
],
"tables": [
"taxi"
]
},
"tbls": [
"taxi"
],
"dates": [
"2021-12-01",
"2021-12-02",
"2021-12-03",
"2021-12-04",
"2021-12-05",
"2021-12-06",
"2021-12-07",
"2021-12-08",
"2021-12-09",
"2021-12-10",
"2021-12-11",
"2021-12-12",
"2021-12-13",
"2021-12-14"
],
"progress": {
"cmdCurrent": "2021.12.14",
"cmdIndex": 13,
"cmdTotal": 31,
"subCurrent": "taxi",
"subIndex": 0,
"subTotal": 1
},
"error": [],
"updated": "2024-11-22T12:14:14.140172780"
}
]
Once the database has finished ingesting it, the status
updates to completed
and the data is available for querying.
You can query the first 30 minutes of the dataset to verify this.
curl -X POST http://localhost:8080/data -H "Content-Type: application/json" -H "Accept: application/json" -d '{table: "taxi",startTS: "2021-12-01T00:00:00.0", endTS: "2021-12-01T00:30:00.0" }'
{
"header": {
"rcvTS": "2024-11-22T12:15:22.941000000",
"corr": "69e1aa73-f069-4854-a0b1-28c439c55e47",
"logCorr": "69e1aa73-f069-4854-a0b1-28c439c55e47",
"http": "json",
"api": ".kxi.getData",
"agg": ":172.18.0.7:5060",
"refVintage": -9223372036854776000,
"rc": 0,
"ac": 0,
"ai": "",
"limitApplied": false
},
"payload": [
{
"vendor": "Creative Mobile Tecnologies, LLC",
"pickup": "2021-12-01T00:00:07.000000000",
"dropoff": "2021-12-01T00:06:39.000000000",
"passengers": 1,
"distance": 1.8,
"fare": 7.5,
"extra": 3,
"tax": 0.5,
"tip": 2.8,
"tolls": 0,
"fees": 0,
"total": 14.1,
"payment_type": "Credit card"
},
{
"vendor": "Creative Mobile Tecnologies, LLC",
"pickup": "2021-12-01T00:00:19.000000000",
"dropoff": "2021-12-01T00:07:24.000000000",
"passengers": 1,
"distance": 1.4,
"fare": 7,
"extra": 3.5,
"tax": 0.5,
"tip": 2,
"tolls": 0,
"fees": 0,
"total": 13.3,
"payment_type": "Credit card"
},
..