Batch S3 Ingest
Read bulk data from S3 and write to downstream consumers
Arbitrary data files can be read from S3, transformed and written to downstream consumers from the Stream Processor. This example illustrates the variety in deployment configurations that are required when deploying to Docker or Kubernetes.
Configuration
The following ingest.q
file will read bulk CSV data from S3 and write to a
downstream consumer using IPC.
schema: flip $[;()]@' (!) . flip (
(`vendor ; "j");
(`pickup ; "p");
(`dropoff ; "p");
(`store_and_fwd ; "b");
(`ratecode ; "j");
(`pu_location ; "j");
(`do_location ; "j");
(`passengers ; "j");
(`distance ; "f");
(`fare ; "f");
(`extra ; "f");
(`mta_tax ; "f");
(`tip ; "f");
(`tolls ; "f");
(`ehail_fee ; "f");
(`improvement_surcharge ; "f");
(`total ; "f");
(`payment_type ; "h");
(`trip_type ; "h");
(`congestion_surcharge ; "f")
);
.qsp.run
.qsp.read.fromAmazonS3["s3://nyc-tlc/trip data/green_tripdata_2020-01.csv"; "us-west-1"]
.qsp.decode.csv[schema]
.qsp.map[{(`greentrips; x)}]
.qsp.write.toProcess[.qsp.use `handle`mode`target!`:consumer:5000`function`.u.upd]
To run this example using Python, a few modifications are required to the deployment commands below. For Docker deployments, change the image
field to point to registry.dl.kx.com/kxi-sp-python:1.8.1
. For Kubernetes deployments, add a base: "py"
configuration option to the REST request.
from kxi import sp
schema = {
'vendor': 'j',
'pickup': 'p',
'dropoff': 'p',
'store_and_fwd': 'b',
'ratecode': 'j',
'pu_location': 'j',
'do_location': 'j',
'passengers': 'j',
'distance': 'f',
'fare': 'f',
'extra': 'f',
'mta_tax': 'f',
'tip': 'f',
'tolls': 'f',
'ehail_fee': 'f',
'improvement_surcharge': 'f',
'total': 'f',
'payment_type': 'h',
'trip_type': 'h',
'congestion_surcharge': 'f',
}
sp.run(sp.read.from_s3('s3://nyc-tlc/trip data/green_tripdata_2020-01.csv')
| sp.decode.csv(schema)
| sp.map(lambda x: ('greentrips', x))
| sp.write.to_process(handle=':consumer:5000', mode='function', target='.u.upd'))
To run in a single worker
To run the ingestion locally to a table, replace .qsp.write.toProcess
with
.qsp.write.toVariable
using upsert
mode. This will write the content to
a table in the local process.
.qsp.run
.qsp.read.fromAmazonS3["s3://nyc-tlc/trip data/green_tripdata_2020-01.csv"; "us-west-1"]
.qsp.decode.csv[schema]
.qsp.write.toVariable[`output; `upsert]
Deployment
A standalone stream processor can be deployed to Docker using just a worker image and writing
to another worker for consumption. For example purposes, we can assume the consumer.q
file
is as follows.
.qsp.run .qsp.read.fromCallback[`.u.upd] .qsp.write.toConsole[]
Configuration:
AWS credential discovery
The S3 reader supports custom AWS credentials. For more information, see the reader configuration.
This docker-compose.yaml
defines an ingest worker that reads data using a single
worker process.
version: "3.3"
services:
ingest:
image: registry.dl.kx.com/kxi-sp-worker:1.8.1
volumes:
- .:/app
environment:
- KDB_LICENSE_B64
- KXI_SP_SPEC=/app/ingest.q
This docker-compose.yaml
defines an ingest
worker to read data from S3 and a
consumer
worker that writes the content to the console.
version: "3.3"
services:
ingest:
image: registry.dl.kx.com/kxi-sp-worker:1.8.1
volumes:
- .:/app
environment:
- KDB_LICENSE_B64
- KXI_SP_SPEC=/app/ingest.q
consumer:
image: registry.dl.kx.com/kxi-sp-worker:1.8.1
volumes:
- .:/app
environment:
- KDB_LICNSE_B64
- KXI_SP_SPEC=/app/consumer.q
command: ["-p", "5000"]
The deployment directory should now contain the standalone ingest.q
file, consumer.q
and docker-compose.yaml
.
Running:
Run the ingestion pipeline by running the Docker configuration.
docker-compose up
To run an ingestion pipeline in Kubernetes, first follow the setup for Kubernetes guide. The ingestion pipeline can be deployed using a port forwarded Coordinator process.
AWS credential discovery
The S3 reader supports custom AWS credentials. For more information, see the reader configuration.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat ingest.q)" \
'{
name : "s3-ingest",
type : "spec",
config : { content : $spec }
}' | jq -asR .)"