Batch S3 Ingest
This page describes how to read bulk data from S3 and write to an Insights Enterprise Database
To do this you will download and deploy a sample workload on a running Insights Enterprise system. It will show how data can be ingested into the database via the Stream Processor in a memory-efficient by writing data directly to the database.
The example assumes the following as a prerequisites;
- access to the KX Downloads Portal https://portal.dl.kx.com
- a username and bearer token for the portal saved in your terminal session as
$KX_USER
and$KX_DL_BEARER
respectively - a running Insights Enterprise system with the hostname URL stored as
$KX_HOSTNAME
- the relevant version of the sample from here saved as
$KX_VERSION
- you have the CLI configured for your deployment with authentication credentials
For more information on the CLI:
Downloading the sample
You can download the sample with the command below.
curl -s --fail-with-body -D /dev/stderr -u ${KX_USER}:${KX_DL_BEARER} -L -OJ https://portal.dl.kx.com/assets/raw/assembly-samples/${KX_VERSION}/s3-ingest.yaml
AWS credential discovery
The S3 reader supports custom AWS credentials. For more information, see the reader configuration.
The pipeline definition, contained in the s3-ingest.yaml
file is contained in the following block.
It does the following;
- reads data from a public S3 bucket
- uses a CSV decoder to format the data using the
schema
object - renames some columns
- transforms the data to match the database schema by adding an empty
fees
column and reordering - writes the data to the
greentrips
schema in the database using a database calleds3-ingest
- this value corresponds to the
metadata.name
field in thes3-ingest.yaml
file directWrite
is enabled to specify that data should be written directly to the database instead of streamed
pipelines:
transform:
base: q
env:
- name: KX_KURL_DISABLE_AUTO_REGISTER
value: "1"
spec: |-
schema:(!) . flip (
(`pickup_datetime ; "p");
(`dropoff_datetime ; "p");
(`vendor ; "s");
(`passengers ; "h");
(`distance ; "e");
(`rate_type ; "*");
(`store_and_fwd ; "*");
(`pickup_borough ; "*");
(`pickup_zone ; "*");
(`pickup_service_zone ; "*");
(`dropoff_borough ; "*");
(`dropoff_zone ; "*");
(`dropoff_service_zone ; "*");
(`payment_type ; "s");
(`fare ; "e");
(`extra ; "e");
(`tax ; "e");
(`tip ; "e");
(`toll ; "e");
(`improvement_surcharge ; "*");
(`congestion_surcharge ; "*");
(`total ; "e"));
exclude:`rate_type`store_and_fwd`pickup_borough`pickup_zone`pickup_service_zone`dropoff_borough`dropoff_zone`dropoff_service_zone`improvement_surcharge`congestion_surcharge;
.qsp.run
.qsp.read.fromAmazonS3["s3://kxs-prd-cxt-twg-roinsightsdemo/ny_taxi_data_2021_12.csv"; "eu-west-1"]
.qsp.decode.csv[schema; .qsp.use enlist[`exclude]!enlist exclude]
.qsp.transform.renameColumns[`pickup_datetime`dropoff_datetime`toll!`pickup`dropoff`tolls]
.qsp.map[{ :`vendor`pickup`dropoff`passengers`distance`fare`extra`tax`tip`tolls`fees`total`payment_type xcols update fees:0e from x }]
.qsp.v2.write.toDatabase[`greentrips; .qsp.use `directWrite`database!(1b; "s3-ingest")]
Deploying
Then authenticate with Insights Enterprise and deploy the sample.
kxi auth login
kxi assembly deploy -f s3-ingest.yaml
This deploys a database, a stream, and an SP pipeline. It contains a taxi CSV dataset from Amazon S3 and ingests it into the deployed database. Once deployed, the pipeline will begin to ingest data and write to the database.
Checking progress
The ingestion may take a few minutes to complete. You can check the progress using the Insights Enterprise REST APIs. Firstly you should obtain the token generated when you authenticated against the system.
export KX_TOKEN=$(kxi auth print-token)
Note
If this token expires, you can regenerate it by running kxi auth login
again and re-storing with the command above.
You can check the progress of the SP pipeline using the details API.
curl -H "Authorization: Bearer $KX_TOKEN" ${KX_HOSTNAME}/streamprocessor/details
The pipeline state
is RUNNING
indicating that it is processing data.
[
{
"id": "s3-ingest-transform",
"name": "transform",
"group": "g-785606756774",
"mode": "default",
"created": "2024-11-22T16:05:56.000000000",
"lastSeen": "2024-11-22T16:06:44.826631122",
"state": "RUNNING",
"error": "",
"metrics": {
"eventRate": 0,
"bytesRate": 0,
"latency": null,
"inputRate": 0,
"outputRate": 0
},
"logCounts": {
"trace": 0,
"debug": 0,
"info": 0,
"warn": 0,
"error": 0,
"fatal": 0
},
"packageName": "__UNPACKAGED__",
"reportedMetadata": []
}
]
You can also check the state of the ingest at the database.
curl -H "Authorization: Bearer $KX_TOKEN" ${KX_HOSTNAME}/servicegateway/database/s3-ingest/ingest | jq .
This has a state
of pending
while SP is still writing data and updates to progressing
once SP has finished writing.
[
{
"name": "s3-ingest-transform-0",
"pipeline": "",
"database": "s3-ingest",
"updtype": "ingest",
"status": "pending",
"details": [],
"tbls": [],
"dates": [],
"progress": {
"cmdCurrent": "",
"cmdIndex": null,
"cmdTotal": null,
"subCurrent": "",
"subIndex": null,
"subTotal": null
},
"error": [],
"updated": "2024-11-22T16:06:44.819755661"
}
]
When it has finished loading the data, the SP and ingest status APIs will return FINISHED
and completed
respectively.
[
{
"id": "s3-ingest-transform",
"name": "transform",
"group": "g-785606756774",
"mode": "default",
"created": "2024-11-22T16:05:56.000000000",
"lastSeen": "2024-11-22T16:38:31.378461195",
"state": "FINISHED",
"error": "",
"metrics": {
"eventRate": 0,
"bytesRate": 0,
"latency": null,
"inputRate": 0,
"outputRate": 0
},
"logCounts": {
"trace": 0,
"debug": 0,
"info": 0,
"warn": 0,
"error": 0,
"fatal": 0
},
"packageName": "__UNPACKAGED__",
"reportedMetadata": []
}
]
curl -H "Authorization: Bearer $KX_TOKEN" ${KX_HOSTNAME}/servicegateway/database/s3-ingest/ingest | jq .
[
{
"name": "s3-ingest-transform-0",
"pipeline": "transform",
"database": "s3-ingest",
"updtype": "ingest",
"status": "completed",
"details": {
"kxSessionInfo.3922c758-5450-4877-a61b-9c9ef01d4511": {
"pipelineName": "transform",
"pipelineID": "s3-ingest-transform",
"workerName": "s3-ingest-transform-1-spwork",
"operatorID": "database.writer_greentrips",
"ingestStartTime": "2024-11-22T16:06:44.809727654",
"ingestEndTime": "2024-11-22T16:08:34.247927644"
},
"subsessions": [
"3922c758-5450-4877-a61b-9c9ef01d4511"
],
"dates": [
"2021-12-01",
..
"2021-12-31"
],
"tables": [
"greentrips"
]
},
"tbls": [
"greentrips"
],
"dates": [
"2021-12-01",
..
"2021-12-31"
],
"progress": {
"cmdCurrent": "",
"cmdIndex": 31,
"cmdTotal": 31,
"subCurrent": "",
"subIndex": 0,
"subTotal": 0
},
"error": "",
"updated": "2024-11-22T16:09:23.128889708"
}
]
Python
The pipeline can be updated to a Python version by replacing the pipelines:
block with the code below.
The logic is identical to the q version defined above;
- reads file from Amazon S3
- applies a CSV decoder using the
schema
object to format correctly - renames, reorders and adds a missing
fees
column - writes to the
greentrips
table in thes3-ingest
database directWrite
is enabled to specify that data should be written directly to the database instead of streamed
pipelines:
transform:
base: py
env:
- name: KX_KURL_DISABLE_AUTO_REGISTER
value: "1"
spec: |-
import pykx as kx
from kxi import sp
schema = {
'pickup_datetime': 'p',
'dropoff_datetime': 'p',
'vendor': 's',
'passengers': 'h',
'distance': 'e',
'rate_type': '*',
'store_and_fwd': '*',
'pickup_borough': '*',
'pickup_zone': '*',
'pickup_service_zone': '*',
'dropoff_borough': '*',
'dropoff_zone': '*',
'dropoff_service_zone': '*',
'payment_type': 's',
'fare': 'e',
'extra': 'e',
'tax': 'e',
'tip': 'e',
'toll': 'e',
'improvement_surcharge': '*',
'congestion_surcharge': '*',
'total': 'e'
}
exclude = ['rate_type', 'store_and_fwd', 'pickup_borough', 'pickup_zone', 'pickup_service_zone', 'dropoff_borough', 'dropoff_zone', 'dropoff_service_zone', 'improvement_surcharge', 'congestion_surcharge']
def reorder(data):
order = ['vendor', 'pickup', 'dropoff', 'passengers', 'distance', 'fare', 'extra', 'tax', 'tip', 'tolls', 'fees', 'total', 'payment_type']
data = kx.q.qsql.update(data, {'fees': '0e'})
return kx.q.xcols(order, data)
sp.run(sp.read.from_amazon_s3('s3://kxs-prd-cxt-twg-roinsightsdemo/ny_taxi_data_2021_12.csv', region='eu-west-1')
| sp.decode.csv(schema, exclude=exclude)
| sp.transform.rename_columns({'pickup_datetime': 'pickup', 'dropoff_datetime': 'dropoff', 'toll': 'tolls'})
| sp.map(reorder)
| sp.write.to_database('greentrips', database='s3-ingest', directWrite=True, api_version=2))
The same instructions can be used to deploy the sample and check the status.