Batch S3 Ingest
This page describes how to read bulk data from S3 and write to an Insights Enterprise Database
Download and deploy a sample workload to a running kdb Insights Enterprise system using the Stream Processor.
Deploy a database, stream, and pipeline to read a taxi CSV dataset from Amazon S3 and write directly to the database for memory efficiency.
The example assumes the following prerequisites:
- access to the KX Downloads Portal
- a username and bearer token for the portal saved in your terminal session as
$KX_USER
and$KX_DL_BEARER
respectively - a running kdb Insights Enterprise system with the hostname URL stored as
$KX_HOSTNAME
- the relevant version of the sample from here saved as
$KX_VERSION
- you have the CLI configured for your deployment with authentication credentials
For more information on the CLI:
Downloading the sample
You can download the sample with the command below.
curl -s --fail-with-body -D /dev/stderr -u ${KX_USER}:${KX_DL_BEARER} -L -OJ https://portal.dl.kx.com/assets/raw/package-samples/${KX_VERSION}/s3-ingest-${KX_VERSION}.kxi
View the unpacked package.
export PKG=s3-ingest
kxi package unpack $PKG-$KX_VERSION.kxi
The default pipeline spec is written in q and is located under s3-ingest/src/s3-ingest.q
in the unpacked package.
The Python pipeline spec is located under s3-ingest/src/s3-ingest.py
.
The pipeline does the following:
- reads data from a public S3 bucket
- uses a CSV decoder to format the data using the
schema
object - renames some columns
- transforms the data to match the database schema by adding an empty
fees
column and reordering - writes the data to the
greentrips
schema in the database using a database calleds3-ingest
directWrite
is enabled to specify that data should be written directly to the database instead of streamed
s3-ingest/src/s3-ingest.q
schema:(!) . flip (
(`pickup_datetime ; "p");
(`dropoff_datetime ; "p");
(`vendor ; "s");
(`passengers ; "h");
(`distance ; "e");
(`rate_type ; "*");
(`store_and_fwd ; "*");
(`pickup_borough ; "*");
(`pickup_zone ; "*");
(`pickup_service_zone ; "*");
(`dropoff_borough ; "*");
(`dropoff_zone ; "*");
(`dropoff_service_zone ; "*");
(`payment_type ; "s");
(`fare ; "e");
(`extra ; "e");
(`tax ; "e");
(`tip ; "e");
(`toll ; "e");
(`improvement_surcharge ; "*");
(`congestion_surcharge ; "*");
(`total ; "e"));
exclude:`rate_type`store_and_fwd`pickup_borough`pickup_zone`pickup_service_zone`dropoff_borough`dropoff_zone`dropoff_service_zone`improvement_surcharge`congestion_surcharge;
.qsp.run
.qsp.read.fromAmazonS3["s3://kxs-prd-cxt-twg-roinsightsdemo/ny_taxi_data_2021_12.csv"; "eu-west-1"]
.qsp.decode.csv[schema; .qsp.use enlist[`exclude]!enlist exclude]
.qsp.transform.renameColumns[`pickup_datetime`dropoff_datetime`toll!`pickup`dropoff`tolls]
.qsp.map[{ :`vendor`pickup`dropoff`passengers`distance`fare`extra`tax`tip`tolls`fees`total`payment_type xcols update fees:0e from x }]
.qsp.v2.write.toDatabase[`greentrips; .qsp.use `directWrite`database!(1b; "s3-ingest")]
s3-ingest/src/s3-ingest.py
from kxi import sp
import pykx
schema = {
'pickup_datetime': 'p',
'dropoff_datetime': 'p',
'vendor': 's',
'passengers': 'h',
'distance': 'e',
'rate_type': '*',
'store_and_fwd': '*',
'pickup_borough': '*',
'pickup_zone': '*',
'pickup_service_zone': '*',
'dropoff_borough': '*',
'dropoff_zone': '*',
'dropoff_service_zone': '*',
'payment_type': 's',
'fare': 'e',
'extra': 'e',
'tax': 'e',
'tip': 'e',
'toll': 'e',
'improvement_surcharge': '*',
'congestion_surcharge': '*',
'total': 'e'
}
exclude = ['rate_type', 'store_and_fwd', 'pickup_borough', 'pickup_zone', 'pickup_service_zone', 'dropoff_borough', 'dropoff_zone', 'dropoff_service_zone', 'improvement_surcharge', 'congestion_surcharge']
def reorder(data):
order = ['vendor', 'pickup', 'dropoff', 'passengers', 'distance', 'fare', 'extra', 'tax', 'tip', 'tolls', 'fees', 'total', 'payment_type']
data = kx.q.qsql.update(data, {'fees': '0e'})
return kx.q.xcols(order, data)
sp.run(sp.read.from_amazon_s3('s3://kxs-prd-cxt-twg-roinsightsdemo/ny_taxi_data_2021_12.csv', region='eu-west-1')
| sp.decode.csv(schema, exclude=exclude)
| sp.transform.rename_columns({'pickup_datetime': 'pickup', 'dropoff_datetime': 'dropoff', 'toll': 'tolls'})
| sp.map(reorder)
| sp.write.to_database(table='greentrips', database='s3ingest-walkthrough', directWrite=True, api_version=2))
Deploying
Authenticate with kdb Insights Enterprise and deploy the sample package.
Cleaning up resources
Make sure to teardown any previously installed version of the package and clean up resources before deploying a new version.
Once deployed, the pipeline starts to ingest data and write to the database.
export PKG=s3-ingest
kxi auth login
kxi pm push $PKG
kxi pm deploy $PKG
Unpack the sample package and switch the pipeline to use the Python spec instead of the q spec.
export PKG=s3-ingest
kxi package unpack $PKG-$KX_VERSION.kxi
Replace the values of base
and spec
in s3-ingest/pipelines/transport.yaml
to use the Python spec file instead of the q one.
#base: q
base: py
#spec: src/spec.q
spec: src/spec.py
kxi auth login
kxi pm push $PKG
kxi pm deploy $PKG
Checking progress
The ingestion may take a few minutes to complete. You can check the progress using the Insights Enterprise REST APIs. Firstly, you should obtain the token generated when you authenticated against the system.
export KX_TOKEN=$(kxi auth print-token)
Note
If this token expires, you can regenerate it by running kxi auth login
again and restoring with the command above.
You can check the progress of the SP pipeline using the details API.
curl -H "Authorization: Bearer $KX_TOKEN" ${KX_HOSTNAME}/streamprocessor/details
The pipeline state
is RUNNING
indicating that it is processing data.
[
{
"id":"s3-ingest-transform",
"name":"transform",
"group":"g-793720690864",
"mode":"default",
"created":"2025-02-24T13:58:11.000000000",
"lastSeen":"2025-02-24T13:59:50.525017500",
"state":"RUNNING",
"error":"",
"metrics":{
"eventRate":0,
"bytesRate":0,
"latency":null,
"inputRate":0,
"outputRate":0
},
"logCounts":{
"trace":0,
"debug":0,
"info":0,
"warn":0,
"error":0,
"fatal":0
},
"packageName":"s3-ingest",
"reportedMetadata":[
]
}
]
You can also check the state of the ingest at the database.
curl -H "Authorization: Bearer $KX_TOKEN" ${KX_HOSTNAME}/servicegateway/database/s3-ingest/ingest | jq .
This has a state
of pending
while SP is still writing data.
[
{
"name": "s3-ingest-transform-0",
"pipeline": "",
"database": "s3-ingest",
"updtype": "ingest",
"status": "pending",
"details": [],
"tbls": [],
"dates": [],
"progress": {
"cmdCurrent": "",
"cmdIndex": null,
"cmdTotal": null,
"subCurrent": "",
"subIndex": null,
"subTotal": null
},
"error": [],
"updated": "2025-02-24T13:58:11.819755661"
}
]
The value of state
updates to processing
when the SP has finished writing and triggered the database ingest.
[
{
"name": "s3-ingest-transform-0",
"pipeline": "transform",
"database": "s3-ingest",
"updtype": "ingest",
"status": "processing",
"details": {
"": null,
"pipeline": "transform",
"kxSessionInfo.6dd0db7e-7c2d-6e58-ff57-ad6c2e46d714": {
"pipelineName": "transform",
"pipelineID": "s3-ingest-transform",
"workerName": "s3-ingest-transform-1-spwork",
"operatorID": "database.writer_greentrips",
"ingestStartTime": "2025-02-24T13:59:48.022041801",
"ingestEndTime": "2025-02-24T14:01:29.039912981"
},
"subsessions": [
"6dd0db7e-7c2d-6e58-ff57-ad6c2e46d714"
],
"dates": [
"2021-12-01",
"2021-12-02",
"2021-12-03",
"2021-12-04",
"2021-12-05",
"2021-12-06",
"2021-12-07",
....
],
"tables": [
"greentrips"
]
},
"tbls": [
"greentrips"
],
"dates": [
"2021-12-01",
"2021-12-02",
"2021-12-03",
"2021-12-04",
"2021-12-05",
"2021-12-06",
"2021-12-07",
....
],
"progress": {
"cmdCurrent": "2021.12.28",
"cmdIndex": 27,
"cmdTotal": 31,
"subCurrent": "greentrips",
"subIndex": 0,
"subTotal": 1
},
"error": [],
"updated": "2025-02-24T14:01:29.135005039"
}
]
When it has finished loading the data, the SP and ingest status APIs return the values of FINISHED
and completed
, respectively.
[
{
"name": "s3-ingest-transform-0",
"pipeline": "transform",
"database": "s3-ingest",
"updtype": "ingest",
"status": "completed",
"details": {
"": null,
"pipeline": "transform",
"kxSessionInfo.6dd0db7e-7c2d-6e58-ff57-ad6c2e46d714": {
"pipelineName": "transform",
"pipelineID": "s3-ingest-transform",
"workerName": "s3-ingest-transform-1-spwork",
"operatorID": "database.writer_greentrips",
"ingestStartTime": "2025-02-24T13:59:48.022041801",
"ingestEndTime": "2025-02-24T14:01:29.039912981"
},
"subsessions": [
"6dd0db7e-7c2d-6e58-ff57-ad6c2e46d714"
],
"dates": [
"2021-12-01",
"2021-12-02",
"2021-12-03",
"2021-12-04",
"2021-12-05",
"2021-12-06",
"2021-12-07",
....
],
"tables": [
"greentrips"
]
},
"tbls": [
"greentrips"
],
"dates": [
"2021-12-01",
"2021-12-02",
"2021-12-03",
"2021-12-04",
"2021-12-05",
"2021-12-06",
"2021-12-07",
....
],
"progress": {
"cmdCurrent": "",
"cmdIndex": 31,
"cmdTotal": 31,
"subCurrent": "",
"subIndex": 0,
"subTotal": 0
},
"error": "",
"updated": "2025-02-24T14:02:35.978496551"
}
]
Querying data
Once the ingest has finished, you can query the greentrips
table to
retrieve the persisted data.
This requires an auth token as before in the Checking progress section.
Query the greentrips
table.
curl -H "Authorization: Bearer $KX_TOKEN" -X POST "$KX_HOSTNAME/servicegateway/data" \
--header "Content-Type: application/json" \
--header "Accepted: application/json" \
-d '{table: "greentrips",startTS: "2021-12-01T00:00:00.0", endTS: "2021-12-01T00:30:00.0" }' | jq .
Returns a JSON object containing the result payload.
{
"header": {
"auditID": "a5a4e59c-35a2-46bf-af92-876cfd6f7330",
"rcvTS": "2025-03-05T15:27:52.922000000",
"corr": "9741abb1-4fb4-4aeb-b2e6-8d0940593a12",
"logCorr": "a5a4e59c-35a2-46bf-af92-876cfd6f7330",
"http": "json",
"api": ".kxi.getData",
"agg": ":10.7.150.80:5070",
"refVintage": -9223372036854776000,
"rc": 0,
"ac": 0,
"ai": "",
"limitApplied": false
},
"payload": [
{
"vendor": "VeriFone Inc.",
"pickup": "2021-12-01T00:00:00.000000000",
"dropoff": "2021-12-01T00:24:53.000000000",
"passengers": 1,
"distance": 8.43,
"fare": 28,
"extra": 0.5,
"tax": 0.5,
"tip": 7.33,
"tolls": 0,
"fees": 0,
"total": 37.88,
"payment_type": "Credit card"
},
{
"vendor": "VeriFone Inc.",
"pickup": "2021-12-01T00:00:00.000000000",
"dropoff": "2021-12-01T09:47:02.000000000",
"passengers": 6,
"distance": 2.07,
"fare": 12.5,
"extra": 0,
"tax": 0.5,
"tip": 0,
"tolls": 0,
"fees": 0,
"total": 15.8,
"payment_type": "Cash"
},
..
{
"vendor": "VeriFone Inc.",
"pickup": "2021-12-01T00:29:59.000000000",
"dropoff": "2021-12-01T00:47:12.000000000",
"passengers": 1,
"distance": 3.33,
"fare": 14.5,
"extra": 0.5,
"tax": 0.5,
"tip": 6.21,
"tolls": 6.55,
"fees": 0,
"total": 31.06,
"payment_type": "Credit card"
}
]
}
For more information on querying data, refer to Database Query Interface.
Teardown
Tear down the package and data using the command below.
kxi pm teardown --rm-data $PKG