Skip to content

Batch S3 Ingest

This page describes how to read bulk data from S3 and write to an Insights SDK Database

Arbitrary data files can be read from S3, transformed and written to downstream consumers from the Stream Processor. This example illustrates how to ingest data to an Insights Database in Docker.

Prerequisites

Follow the instructions to download the Docker bundle and docker login portal.dl.kx.com. After extracting the tarball, place your kdb+ license in the kxi-db/lic directory.

Building the walkthrough

Ensure you are in the kxi-db directory to run these commands.

Firstly create a Docker compose file compose-batch-ingest.yaml.

---
# include the base database bundle services
include:
- ./compose.yaml

services:
  # define an SP controller service
  kxi-controller:
    image: ${kxi_img_spc}
    hostname: kxi-controller
    ports:
    - ${kxi_port_spc}:${kxi_port_spc}
    volumes:
    - ${kxi_dir_logs}:/mnt/logs
    - ${kxi_dir_sp}:/sp
    - ${kxi_dir_lic}:/opt/kx/lic
    - ${kxi_dir_config}:/mnt/config
    networks: [kx]
    environment:
    - KDB_LICENSE_B64
    command: ["-p", '${kxi_port_spc}']

  # define an SP worker service
  kxi-worker:
    image: ${kxi_img_spw}
    hostname: kxi-sp
    ports:
    - ${kxi_port_spw}:${kxi_port_spw}
    volumes:
    - ${kxi_dir_logs}:/mnt/logs
    - ${kxi_dir_sp}:/sp
    - ${kxi_dir_lic}:/opt/kx/lic
    - ${kxi_dir_config}:/mnt/config
    - ${kxi_dir_db}:/mnt/db
    networks: [kx]
    environment:
    - KXI_SP_SPEC=/mnt/config/${kxi_sp_spec}                # Point to the bound spec.q file
    - KXI_SP_PARENT_HOST=kxi-controller:${kxi_port_spc}     # Point to the parent controller service
    - RT_TOPIC_PREFIX=rt-
    - RT_REPLICAS=1
    - KXI_SP_ORDINAL=1
    - KXI_SP_GROUP=grp
    command: ["-p", '${kxi_port_spw}']

Details about the environment variables referenced above are available here.

Create a spec file called config/ingest.q containing the pipeline definition. It does the following;

  • reads data from a public S3 bucket
  • uses a CSV decoder to format the data using the schema object
  • renames some columns
  • transforms the data to match the database schema by adding an empty fees column and reordering
  • writes the data to the taxi schema in the database using a target of kxi-sm:10001
  • the target value corresponds to SM service's endpoint
  • directWrite is enabled to specify that data should be written directly to the database instead of streamed
schema:(!) . flip (
    (`pickup_datetime       ; "p");
    (`dropoff_datetime      ; "p");
    (`vendor                ; "s");
    (`passengers            ; "h");
    (`distance              ; "e");
    (`rate_type             ; "*");
    (`store_and_fwd         ; "*");
    (`pickup_borough        ; "*");
    (`pickup_zone           ; "*");
    (`pickup_service_zone   ; "*");
    (`dropoff_borough       ; "*");
    (`dropoff_zone          ; "*");
    (`dropoff_service_zone  ; "*");
    (`payment_type          ; "s");
    (`fare                  ; "e");
    (`extra                 ; "e");
    (`tax                   ; "e");
    (`tip                   ; "e");
    (`toll                  ; "e");
    (`improvement_surcharge ; "*");
    (`congestion_surcharge  ; "*");
    (`total                 ; "e"));

exclude:`rate_type`store_and_fwd`pickup_borough`pickup_zone`pickup_service_zone`dropoff_borough`dropoff_zone`dropoff_service_zone`improvement_surcharge`congestion_surcharge;

.qsp.run
    .qsp.read.fromAmazonS3["s3://kxs-prd-cxt-twg-roinsightsdemo/ny_taxi_data_2021_12.csv"; "eu-west-1"]
    .qsp.decode.csv[schema; .qsp.use enlist[`exclude]!enlist exclude]
    .qsp.transform.renameColumns[`pickup_datetime`dropoff_datetime`toll!`pickup`dropoff`tolls]
    .qsp.map[{ :`vendor`pickup`dropoff`passengers`distance`fare`extra`tax`tip`tolls`fees`total`payment_type xcols update fees:0e from x }]
    .qsp.v2.write.toDatabase[`taxi; .qsp.use `directWrite`target!(1b; "kxi-sm:10001")]

Create a variable file batch-ingest.env

# images
kxi_registry=portal.dl.kx.com
kxi_sp_release=1.12.0
kxi_img_spc=${kxi_registry}/kxi-sp-controller:1.12.0${kxi_sp_release}
kxi_img_spw=${kxi_registry}/kxi-sp-worker:1.12.0${kxi_sp_release}

# networking
kxi_port_spc=6000
kxi_port_spw=7000

# paths
kxi_dir_lic="./lic"
kxi_dir_config="./config"
kxi_dir_sp="./data/sp"
kxi_dir_logs="./data/logs"
kxi_dir_db="./data/db"
kxi_sp_spec="ingest.q"

Read more about the Stream Processor q API.

Create a spec file called config/ingest.py containing the pipeline definition which does the following;

  • reads data from a public S3 bucket
  • uses a CSV decoder to format the data using the schema object
  • renames some columns
  • transforms the data to match the database schema by adding an empty fees column and reordering
  • writes the data to the taxi schema in the database using a target of kxi-sm:10001
  • the target value corresponds to SM service's endpoint
  • directWrite is enabled to specify that data should be written directly to the database instead of streamed
import pykx as kx
from kxi import sp

schema = {
    'pickup_datetime': 'p',
    'dropoff_datetime': 'p',
    'vendor': 's',
    'passengers': 'h',
    'distance': 'e',
    'rate_type': '*',
    'store_and_fwd': '*',
    'pickup_borough': '*',
    'pickup_zone': '*',
    'pickup_service_zone': '*',
    'dropoff_borough': '*',
    'dropoff_zone': '*',
    'dropoff_service_zone': '*',
    'payment_type': 's',
    'fare': 'e',
    'extra': 'e',
    'tax': 'e',
    'tip': 'e',
    'toll': 'e',
    'improvement_surcharge': '*',
    'congestion_surcharge': '*',
    'total': 'e'
}

exclude = ['rate_type', 'store_and_fwd', 'pickup_borough', 'pickup_zone', 'pickup_service_zone', 'dropoff_borough', 'dropoff_zone', 'dropoff_service_zone', 'improvement_surcharge', 'congestion_surcharge']

def reorder(data):
    order = ['vendor', 'pickup', 'dropoff', 'passengers', 'distance', 'fare', 'extra', 'tax', 'tip', 'tolls', 'fees', 'total', 'payment_type']
    data = kx.q.qsql.update(data, {'fees': '0e'})
    return kx.q.xcols(order, data)

sp.run(sp.read.from_amazon_s3('s3://kxs-prd-cxt-twg-roinsightsdemo/ny_taxi_data_2021_12.csv', region='eu-west-1')
    | sp.decode.csv(schema, exclude=exclude)
    | sp.transform.rename_columns({'pickup_datetime': 'pickup', 'dropoff_datetime': 'dropoff', 'toll': 'tolls'})
    | sp.map(reorder)
    | sp.write.to_database('taxi', target='kxi-sm:10001', directWrite=True, api_version=2))

Create a variable file batch-ingest.env

# images
kxi_registry=portal.dl.kx.com
kxi_sp_release=1.12.0
kxi_img_spc=${kxi_registry}/kxi-sp-controller:1.12.0${kxi_sp_release}
kxi_img_spw=${kxi_registry}/kxi-sp-python:1.12.0${kxi_sp_release}

# networking
kxi_port_spc=6000
kxi_port_spw=7000

# paths
kxi_dir_lic="./lic"
kxi_dir_config="./config"
kxi_dir_sp="./data/sp"
kxi_dir_logs="./data/logs"
kxi_dir_db="./data/db"
kxi_sp_spec="ingest.py"

Read more about the Stream Processor Python API.

Running the ingest

Run the deployment using the following command.

docker compose -f compose-batch-ingest.yaml --env-file batch-ingest.env up

You can check status of the pipeline using the command.

curl http://localhost:6000/details

This outputs a JSON payload of the form

{
  "state": "RUNNING",
  "error": "",
  "metrics": {
    "eventRate": 0,
    "bytesRate": 0,
    "latency": -1.7976931348623157e+308,
    "inputRate": 8746320,
    "outputRate": 41618.86
  },
  "logCounts": {
    "trace": 0,
    "debug": 0,
    "info": 63,
    "warn": 0,
    "error": 0,
    "fatal": 0
  },
  "readerMetadata": []
}

The pipeline has completed when the state field is set to FINISHED.

This indicates all of the data has been written to the database and successfully ingested.

You can check the ingest session in the database using the following command.

curl http://localhost:10001/ingest

The session is marked with a status of pending while the SP pipeline is writing data.

[
  {
    "name": "pipeline-482f128ffe-0",
    "pipeline": "",
    "database": "kdb Insights",
    "updtype": "ingest",
    "status": "pending",
    "details": [],
    "tbls": [],
    "dates": [],
    "progress": {
      "cmdCurrent": "",
      "cmdIndex": null,
      "cmdTotal": null,
      "subCurrent": "",
      "subIndex": null,
      "subTotal": null
    },
    "error": [],
    "updated": "2024-11-22T12:12:25.518555535"
  }
]

This updates to processing when the database is ingesting the data.

[
  {
    "name": "pipeline-482f128ffe-0",
    "pipeline": "",
    "database": "kdb Insights",
    "updtype": "ingest",
    "status": "processing",
    "details": {
      "kxSessionInfo.752fb820-e0db-6c26-0803-c194e45ed5a8": {
        "pipelineName": [],
        "pipelineID": "pipeline-482f128ffe",
        "workerName": "spwork-kxi-sp",
        "operatorID": "database.writer_taxi",
        "ingestStartTime": "2024-11-22T12:12:17.580482856",
        "ingestEndTime": "2024-11-22T12:14:14.123753107"
      },
      "subsessions": [
        "752fb820-e0db-6c26-0803-c194e45ed5a8"
      ],
      "dates": [
        "2021-12-01",
        ..
        "2021-12-31"
      ],
      "tables": [
        "taxi"
      ]
    },
    "tbls": [
      "taxi"
    ],
    "dates": [
      "2021-12-01",
      "2021-12-02",
      "2021-12-03",
      "2021-12-04",
      "2021-12-05",
      "2021-12-06",
      "2021-12-07",
      "2021-12-08",
      "2021-12-09",
      "2021-12-10",
      "2021-12-11",
      "2021-12-12",
      "2021-12-13",
      "2021-12-14"
    ],
    "progress": {
      "cmdCurrent": "2021.12.14",
      "cmdIndex": 13,
      "cmdTotal": 31,
      "subCurrent": "taxi",
      "subIndex": 0,
      "subTotal": 1
    },
    "error": [],
    "updated": "2024-11-22T12:14:14.140172780"
  }
]

Once the database has finished ingesting it, the status updates to completed and the data is available for querying.

You can query the first 30 minutes of the dataset to verify this.

curl -X POST http://localhost:8080/data -H "Content-Type: application/json" -H "Accept: application/json" -d '{table: "taxi",startTS: "2021-12-01T00:00:00.0", endTS: "2021-12-01T00:30:00.0" }'
{
  "header": {
    "rcvTS": "2024-11-22T12:15:22.941000000",
    "corr": "69e1aa73-f069-4854-a0b1-28c439c55e47",
    "logCorr": "69e1aa73-f069-4854-a0b1-28c439c55e47",
    "http": "json",
    "api": ".kxi.getData",
    "agg": ":172.18.0.7:5060",
    "refVintage": -9223372036854776000,
    "rc": 0,
    "ac": 0,
    "ai": "",
    "limitApplied": false
  },
  "payload": [
    {
      "vendor": "Creative Mobile Tecnologies, LLC",
      "pickup": "2021-12-01T00:00:07.000000000",
      "dropoff": "2021-12-01T00:06:39.000000000",
      "passengers": 1,
      "distance": 1.8,
      "fare": 7.5,
      "extra": 3,
      "tax": 0.5,
      "tip": 2.8,
      "tolls": 0,
      "fees": 0,
      "total": 14.1,
      "payment_type": "Credit card"
    },
    {
      "vendor": "Creative Mobile Tecnologies, LLC",
      "pickup": "2021-12-01T00:00:19.000000000",
      "dropoff": "2021-12-01T00:07:24.000000000",
      "passengers": 1,
      "distance": 1.4,
      "fare": 7,
      "extra": 3.5,
      "tax": 0.5,
      "tip": 2,
      "tolls": 0,
      "fees": 0,
      "total": 13.3,
      "payment_type": "Credit card"
    },
    ..