Skip to content

Batch S3 Ingest

Read bulk data from S3 and write to downstream consumers

Arbitrary data files can be read from S3, transformed and written to downstream consumers from the Stream Processor. This example illustrates the variety in deployment configurations that are required when deploying to Docker or Kubernetes.

Configuration

The following ingest.q file will read bulk CSV data from S3 and write to a downstream consumer using IPC.

schema: flip $[;()]@' (!) . flip (
    (`vendor                ; "j");
    (`pickup                ; "p");
    (`dropoff               ; "p");
    (`store_and_fwd         ; "b");
    (`ratecode              ; "j");
    (`pu_location           ; "j");
    (`do_location           ; "j");
    (`passengers            ; "j");
    (`distance              ; "f");
    (`fare                  ; "f");
    (`extra                 ; "f");
    (`mta_tax               ; "f");
    (`tip                   ; "f");
    (`tolls                 ; "f");
    (`ehail_fee             ; "f");
    (`improvement_surcharge ; "f");
    (`total                 ; "f");
    (`payment_type          ; "h");
    (`trip_type             ; "h");
    (`congestion_surcharge  ; "f")
    );

.qsp.run
    .qsp.read.fromAmazonS3["s3://nyc-tlc/trip data/green_tripdata_2020-01.csv"; "us-west-1"]
    .qsp.decode.csv[schema]
    .qsp.map[{(`greentrips; x)}]
    .qsp.write.toProcess[.qsp.use `handle`mode`target!`:consumer:5000`function`.u.upd]

To run this example using Python, a few modifications are required to the deployment commands below. For Docker deployments, change the image field to point to portal.dl.kx.com/kxi-sp-python:1.9.0. For Kubernetes deployments, add a base: "py" configuration option to the REST request.

from kxi import sp

schema = {
    'vendor': 'j',
    'pickup': 'p',
    'dropoff': 'p',
    'store_and_fwd': 'b',
    'ratecode': 'j',
    'pu_location': 'j',
    'do_location': 'j',
    'passengers': 'j',
    'distance': 'f',
    'fare': 'f',
    'extra': 'f',
    'mta_tax': 'f',
    'tip': 'f',
    'tolls': 'f',
    'ehail_fee': 'f',
    'improvement_surcharge': 'f',
    'total': 'f',
    'payment_type': 'h',
    'trip_type': 'h',
    'congestion_surcharge': 'f',
}

sp.run(sp.read.from_s3('s3://nyc-tlc/trip data/green_tripdata_2020-01.csv')
    | sp.decode.csv(schema)
    | sp.map(lambda x: ('greentrips', x))
    | sp.write.to_process(handle=':consumer:5000', mode='function', target='.u.upd'))

To run in a single worker

To run the ingestion locally to a table, replace .qsp.write.toProcess with .qsp.write.toVariable using upsert mode. This will write the content to a table in the local process.

.qsp.run
    .qsp.read.fromAmazonS3["s3://nyc-tlc/trip data/green_tripdata_2020-01.csv"; "us-west-1"]
    .qsp.decode.csv[schema]
    .qsp.write.toVariable[`output; `upsert]

Deployment

A standalone stream processor can be deployed to Docker using just a worker image and writing to another worker for consumption. For example purposes, we can assume the consumer.q file is as follows.

.qsp.run .qsp.read.fromCallback[`.u.upd] .qsp.write.toConsole[]

Configuration:

AWS credential discovery

The S3 reader supports custom AWS credentials. For more information, see the reader configuration.

This docker-compose.yaml defines an ingest worker that reads data using a single worker process.

version: "3.3"
services:
  ingest:
    image: portal.dl.kx.com/kxi-sp-worker:1.9.0
    volumes:
      - .:/app
    environment:
      - KDB_LICENSE_B64
      - KXI_SP_SPEC=/app/ingest.q

This docker-compose.yaml defines an ingest worker to read data from S3 and a consumer worker that writes the content to the console.

version: "3.3"
services:
  ingest:
    image: portal.dl.kx.com/kxi-sp-worker:1.9.0
    volumes:
      - .:/app
    environment:
      - KDB_LICENSE_B64
      - KXI_SP_SPEC=/app/ingest.q
  consumer:
    image: portal.dl.kx.com/kxi-sp-worker:1.9.0
    volumes:
      - .:/app
    environment:
      - KDB_LICNSE_B64
      - KXI_SP_SPEC=/app/consumer.q
    command: ["-p", "5000"]

The deployment directory should now contain the standalone ingest.q file, consumer.q and docker-compose.yaml.

Running:

Run the ingestion pipeline by running the Docker configuration.

docker-compose up

To run an ingestion pipeline in Kubernetes, first follow the setup for Kubernetes guide. The ingestion pipeline can be deployed using a port forwarded Coordinator process.

AWS credential discovery

The S3 reader supports custom AWS credentials. For more information, see the reader configuration.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n --arg spec "$(cat ingest.q)" \
    '{
        name       : "s3-ingest",
        type       : "spec",
        config     : { content : $spec }
    }' | jq -asR .)"