Batch S3 Ingest
Read bulk data from S3 and write to downstream consumers
To run an ingestion pipeline using the kdb Insights Enterprise, first deploy the base system. Next, download the s3-ingest.yaml
assembly by following the download instructions.
Deploy the assembly with the following kdb Insights CLI command.
kxi assembly create --filepath s3-ingest.yaml
This will setup Data Access (DA) processes and a Storage Manager (SM) for a data warehouse. A Reliable Transport (RT) stream labeled green-taxi
will publish data into the Storage Manager.
Once deployed, the ingest.q
script can be deployed to the Stream Processor Coordinator.
AWS credential discovery
The S3 reader supports custom AWS credentials. For more information, see the reader configuration.
The streaming program ingest.q
should be as follows:
schema: flip $[;()]@' (!) . flip (
(`vendor ; "j");
(`pickup ; "p");
(`dropoff ; "p");
(`store_and_fwd ; "b");
(`ratecode ; "j");
(`pu_location ; "j");
(`do_location ; "j");
(`passengers ; "j");
(`distance ; "f");
(`fare ; "f");
(`extra ; "f");
(`mta_tax ; "f");
(`tip ; "f");
(`tolls ; "f");
(`ehail_fee ; "f");
(`improvement_surcharge ; "f");
(`total ; "f");
(`payment_type ; "h");
(`trip_type ; "h");
(`congestion_surcharge ; "f")
);
.qsp.run
.qsp.read.fromAmazonS3["s3://nyc-tlc/trip data/green_tripdata_2020-01.csv"; "us-west-1"]
.qsp.decode.csv[schema]
.qsp.map[{(`greentrips; x)}]
.qsp.write.toRT[]
The streaming program ingest.py
should be as follows:
from kxi import sp
schema = {
'vendor': 'j',
'pickup': 'p',
'dropoff': 'p',
'store_and_fwd': 'b',
'ratecode': 'j',
'pu_location': 'j',
'do_location': 'j',
'passengers': 'j',
'distance': 'f',
'fare': 'f',
'extra': 'f',
'mta_tax': 'f',
'tip': 'f',
'tolls': 'f',
'ehail_fee': 'f',
'improvement_surcharge': 'f',
'total': 'f',
'payment_type': 'h',
'trip_type': 'h',
'congestion_surcharge': 'f',
}
sp.run(sp.read.from_s3('s3://nyc-tlc/trip data/green_tripdata_2020-01.csv')
| sp.decode.csv(schema)
| sp.map(lambda x: ('greentrips', x))
| sp.write.to_rt())
Finally, submit the pipeline create request to the Coordinator using the cluster's ingress path to /streamprocessor/pipeline/create
. The request below uses a port forward to the kxi-sp
service to submit the pipeline.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat ingest.q)" \
'{
name : "s3-ingest",
type : "spec",
config : { content : $spec },
env : { RT_PUB_TOPIC: "s3-ingest-green-taxi", RT_TOPIC_PREFIX: "rt-" }
}' | jq -asR .)"
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat ingest.py)" \
'{
name : "s3-ingest",
type : "spec",
base : "py",
config : { content : $spec },
env : { RT_PUB_TOPIC: "s3-ingest-green-taxi", RT_TOPIC_PREFIX: "rt-" }
}' | jq -asR .)"
Reliable Transport configuration
In the request above, the request configures the Worker with RT related environmental variables such that the deployed pipeline can connect to other kdb Insights components through RT.
RT_TOPIC_PREFIX
is set to the same value set in the assembly file in the topicPrefix
fields. This field must be consistent across the assembly and all components.
RT_PUB_TOPIC
is set to the topic name of the streaming data. The topic is composed
from fields set in the assembly in the semantic format: <assembly-name>-<sequencer-topic-name>
.
When the ingestion pipeline is complete, it will terminate and cleanup its resources.