Skip to content

Batch S3 Ingest

This page describes how to read bulk data from S3 and write to an Insights Enterprise Database

Download and deploy a sample workload to a running kdb Insights Enterprise system using the Stream Processor.

Deploy a database, stream, and pipeline to read a taxi CSV dataset from Amazon S3 and write directly to the database for memory efficiency.

The example assumes the following prerequisites:

  • access to the KX Downloads Portal
  • a username and bearer token for the portal saved in your terminal session as $KX_USER and $KX_DL_BEARER respectively
  • a running kdb Insights Enterprise system with the hostname URL stored as $KX_HOSTNAME
  • the relevant version of the sample from here saved as $KX_VERSION
  • you have the CLI configured for your deployment with authentication credentials

For more information on the CLI:

Downloading the sample

You can download the sample with the command below.

curl -s --fail-with-body -D /dev/stderr -u ${KX_USER}:${KX_DL_BEARER} -L -OJ https://portal.dl.kx.com/assets/raw/package-samples/${KX_VERSION}/s3-ingest-${KX_VERSION}.kxi

View the unpacked package.

export PKG=s3-ingest
kxi package unpack $PKG-$KX_VERSION.kxi

The default pipeline spec is written in q and is located under s3-ingest/src/s3-ingest.q in the unpacked package. The Python pipeline spec is located under s3-ingest/src/s3-ingest.py.

The pipeline does the following:

  • reads data from a public S3 bucket
  • uses a CSV decoder to format the data using the schema object
  • renames some columns
  • transforms the data to match the database schema by adding an empty fees column and reordering
  • writes the data to the greentrips schema in the database using a database called s3-ingest
  • directWrite is enabled to specify that data should be written directly to the database instead of streamed

s3-ingest/src/s3-ingest.q

schema:(!) . flip (
  (`pickup_datetime       ; "p");
  (`dropoff_datetime      ; "p");
  (`vendor                ; "s");
  (`passengers            ; "h");
  (`distance              ; "e");
  (`rate_type             ; "*");
  (`store_and_fwd         ; "*");
  (`pickup_borough        ; "*");
  (`pickup_zone           ; "*");
  (`pickup_service_zone   ; "*");
  (`dropoff_borough       ; "*");
  (`dropoff_zone          ; "*");
  (`dropoff_service_zone  ; "*");
  (`payment_type          ; "s");
  (`fare                  ; "e");
  (`extra                 ; "e");
  (`tax                   ; "e");
  (`tip                   ; "e");
  (`toll                  ; "e");
  (`improvement_surcharge ; "*");
  (`congestion_surcharge  ; "*");
  (`total                 ; "e"));

exclude:`rate_type`store_and_fwd`pickup_borough`pickup_zone`pickup_service_zone`dropoff_borough`dropoff_zone`dropoff_service_zone`improvement_surcharge`congestion_surcharge;

.qsp.run
  .qsp.read.fromAmazonS3["s3://kxs-prd-cxt-twg-roinsightsdemo/ny_taxi_data_2021_12.csv"; "eu-west-1"]
  .qsp.decode.csv[schema; .qsp.use enlist[`exclude]!enlist exclude]
  .qsp.transform.renameColumns[`pickup_datetime`dropoff_datetime`toll!`pickup`dropoff`tolls]
  .qsp.map[{ :`vendor`pickup`dropoff`passengers`distance`fare`extra`tax`tip`tolls`fees`total`payment_type xcols update fees:0e from x }]
  .qsp.v2.write.toDatabase[`greentrips; .qsp.use `directWrite`database!(1b; "s3-ingest")]

s3-ingest/src/s3-ingest.py

from kxi import sp
import pykx

schema = {
  'pickup_datetime': 'p',
  'dropoff_datetime': 'p',
  'vendor': 's',
  'passengers': 'h',
  'distance': 'e',
  'rate_type': '*',
  'store_and_fwd': '*',
  'pickup_borough': '*',
  'pickup_zone': '*',
  'pickup_service_zone': '*',
  'dropoff_borough': '*',
  'dropoff_zone': '*',
  'dropoff_service_zone': '*',
  'payment_type': 's',
  'fare': 'e',
  'extra': 'e',
  'tax': 'e',
  'tip': 'e',
  'toll': 'e',
  'improvement_surcharge': '*',
  'congestion_surcharge': '*',
  'total': 'e'
}


exclude = ['rate_type', 'store_and_fwd', 'pickup_borough', 'pickup_zone', 'pickup_service_zone', 'dropoff_borough', 'dropoff_zone', 'dropoff_service_zone', 'improvement_surcharge', 'congestion_surcharge']


def reorder(data):
  order = ['vendor', 'pickup', 'dropoff', 'passengers', 'distance', 'fare', 'extra', 'tax', 'tip', 'tolls', 'fees', 'total', 'payment_type']
  data = kx.q.qsql.update(data, {'fees': '0e'})
  return kx.q.xcols(order, data)


sp.run(sp.read.from_amazon_s3('s3://kxs-prd-cxt-twg-roinsightsdemo/ny_taxi_data_2021_12.csv', region='eu-west-1')
  | sp.decode.csv(schema, exclude=exclude)
  | sp.transform.rename_columns({'pickup_datetime': 'pickup', 'dropoff_datetime': 'dropoff', 'toll': 'tolls'})
  | sp.map(reorder)
  | sp.write.to_database(table='greentrips', database='s3ingest-walkthrough', directWrite=True, api_version=2))

Deploying

Authenticate with kdb Insights Enterprise and deploy the sample package.

Cleaning up resources

Make sure to teardown any previously installed version of the package and clean up resources before deploying a new version.

Once deployed, the pipeline starts to ingest data and write to the database.

export PKG=s3-ingest
kxi auth login
kxi pm push $PKG
kxi pm deploy $PKG

Unpack the sample package and switch the pipeline to use the Python spec instead of the q spec.

export PKG=s3-ingest
kxi package unpack $PKG-$KX_VERSION.kxi

Replace the values of base and spec in s3-ingest/pipelines/transport.yaml to use the Python spec file instead of the q one.

#base: q
base: py
#spec: src/spec.q
spec: src/spec.py
kxi auth login
kxi pm push $PKG
kxi pm deploy $PKG

Checking progress

The ingestion may take a few minutes to complete. You can check the progress using the Insights Enterprise REST APIs. Firstly, you should obtain the token generated when you authenticated against the system.

export KX_TOKEN=$(kxi auth print-token)

Note

If this token expires, you can regenerate it by running kxi auth login again and restoring with the command above.

You can check the progress of the SP pipeline using the details API.

curl -H "Authorization: Bearer $KX_TOKEN" ${KX_HOSTNAME}/streamprocessor/details

The pipeline state is RUNNING indicating that it is processing data.

[
   {
      "id":"s3-ingest-transform",
      "name":"transform",
      "group":"g-793720690864",
      "mode":"default",
      "created":"2025-02-24T13:58:11.000000000",
      "lastSeen":"2025-02-24T13:59:50.525017500",
      "state":"RUNNING",
      "error":"",
      "metrics":{
         "eventRate":0,
         "bytesRate":0,
         "latency":null,
         "inputRate":0,
         "outputRate":0
      },
      "logCounts":{
         "trace":0,
         "debug":0,
         "info":0,
         "warn":0,
         "error":0,
         "fatal":0
      },
      "packageName":"s3-ingest",
      "reportedMetadata":[

      ]
   }
]

You can also check the state of the ingest at the database.

curl -H "Authorization: Bearer $KX_TOKEN" ${KX_HOSTNAME}/servicegateway/database/s3-ingest/ingest | jq .

This has a state of pending while SP is still writing data.

[
  {
    "name": "s3-ingest-transform-0",
    "pipeline": "",
    "database": "s3-ingest",
    "updtype": "ingest",
    "status": "pending",
    "details": [],
    "tbls": [],
    "dates": [],
    "progress": {
      "cmdCurrent": "",
      "cmdIndex": null,
      "cmdTotal": null,
      "subCurrent": "",
      "subIndex": null,
      "subTotal": null
    },
    "error": [],
    "updated": "2025-02-24T13:58:11.819755661"
  }
]

The value of state updates to processing when the SP has finished writing and triggered the database ingest.

[
  {
    "name": "s3-ingest-transform-0",
    "pipeline": "transform",
    "database": "s3-ingest",
    "updtype": "ingest",
    "status": "processing",
    "details": {
      "": null,
      "pipeline": "transform",
      "kxSessionInfo.6dd0db7e-7c2d-6e58-ff57-ad6c2e46d714": {
        "pipelineName": "transform",
        "pipelineID": "s3-ingest-transform",
        "workerName": "s3-ingest-transform-1-spwork",
        "operatorID": "database.writer_greentrips",
        "ingestStartTime": "2025-02-24T13:59:48.022041801",
        "ingestEndTime": "2025-02-24T14:01:29.039912981"
      },
      "subsessions": [
        "6dd0db7e-7c2d-6e58-ff57-ad6c2e46d714"
      ],
      "dates": [
        "2021-12-01",
        "2021-12-02",
        "2021-12-03",
        "2021-12-04",
        "2021-12-05",
        "2021-12-06",
        "2021-12-07",
        ....
      ],
      "tables": [
        "greentrips"
      ]
    },
    "tbls": [
      "greentrips"
    ],
    "dates": [
      "2021-12-01",
      "2021-12-02",
      "2021-12-03",
      "2021-12-04",
      "2021-12-05",
      "2021-12-06",
      "2021-12-07",
      ....
    ],
    "progress": {
      "cmdCurrent": "2021.12.28",
      "cmdIndex": 27,
      "cmdTotal": 31,
      "subCurrent": "greentrips",
      "subIndex": 0,
      "subTotal": 1
    },
    "error": [],
    "updated": "2025-02-24T14:01:29.135005039"
  }
]

When it has finished loading the data, the SP and ingest status APIs return the values of FINISHED and completed, respectively.

[
  {
    "name": "s3-ingest-transform-0",
    "pipeline": "transform",
    "database": "s3-ingest",
    "updtype": "ingest",
    "status": "completed",
    "details": {
      "": null,
      "pipeline": "transform",
      "kxSessionInfo.6dd0db7e-7c2d-6e58-ff57-ad6c2e46d714": {
        "pipelineName": "transform",
        "pipelineID": "s3-ingest-transform",
        "workerName": "s3-ingest-transform-1-spwork",
        "operatorID": "database.writer_greentrips",
        "ingestStartTime": "2025-02-24T13:59:48.022041801",
        "ingestEndTime": "2025-02-24T14:01:29.039912981"
      },
      "subsessions": [
        "6dd0db7e-7c2d-6e58-ff57-ad6c2e46d714"
      ],
      "dates": [
        "2021-12-01",
        "2021-12-02",
        "2021-12-03",
        "2021-12-04",
        "2021-12-05",
        "2021-12-06",
        "2021-12-07",
        ....
      ],
      "tables": [
        "greentrips"
      ]
    },
    "tbls": [
      "greentrips"
    ],
    "dates": [
      "2021-12-01",
      "2021-12-02",
      "2021-12-03",
      "2021-12-04",
      "2021-12-05",
      "2021-12-06",
      "2021-12-07",
      ....
    ],
    "progress": {
      "cmdCurrent": "",
      "cmdIndex": 31,
      "cmdTotal": 31,
      "subCurrent": "",
      "subIndex": 0,
      "subTotal": 0
    },
    "error": "",
    "updated": "2025-02-24T14:02:35.978496551"
  }
]

Querying data

Once the ingest has finished, you can query the greentrips table to retrieve the persisted data.

This requires an auth token as before in the Checking progress section.

Query the greentrips table.

curl -H "Authorization: Bearer $KX_TOKEN" -X POST "$KX_HOSTNAME/servicegateway/data" \
  --header "Content-Type: application/json" \
  --header "Accepted: application/json" \
  -d '{table: "greentrips",startTS: "2021-12-01T00:00:00.0", endTS: "2021-12-01T00:30:00.0" }' | jq .

Returns a JSON object containing the result payload.

{
  "header": {
    "auditID": "a5a4e59c-35a2-46bf-af92-876cfd6f7330",
    "rcvTS": "2025-03-05T15:27:52.922000000",
    "corr": "9741abb1-4fb4-4aeb-b2e6-8d0940593a12",
    "logCorr": "a5a4e59c-35a2-46bf-af92-876cfd6f7330",
    "http": "json",
    "api": ".kxi.getData",
    "agg": ":10.7.150.80:5070",
    "refVintage": -9223372036854776000,
    "rc": 0,
    "ac": 0,
    "ai": "",
    "limitApplied": false
  },
  "payload": [
    {
      "vendor": "VeriFone Inc.",
      "pickup": "2021-12-01T00:00:00.000000000",
      "dropoff": "2021-12-01T00:24:53.000000000",
      "passengers": 1,
      "distance": 8.43,
      "fare": 28,
      "extra": 0.5,
      "tax": 0.5,
      "tip": 7.33,
      "tolls": 0,
      "fees": 0,
      "total": 37.88,
      "payment_type": "Credit card"
    },
    {
      "vendor": "VeriFone Inc.",
      "pickup": "2021-12-01T00:00:00.000000000",
      "dropoff": "2021-12-01T09:47:02.000000000",
      "passengers": 6,
      "distance": 2.07,
      "fare": 12.5,
      "extra": 0,
      "tax": 0.5,
      "tip": 0,
      "tolls": 0,
      "fees": 0,
      "total": 15.8,
      "payment_type": "Cash"
    },
    ..
    {
      "vendor": "VeriFone Inc.",
      "pickup": "2021-12-01T00:29:59.000000000",
      "dropoff": "2021-12-01T00:47:12.000000000",
      "passengers": 1,
      "distance": 3.33,
      "fare": 14.5,
      "extra": 0.5,
      "tax": 0.5,
      "tip": 6.21,
      "tolls": 6.55,
      "fees": 0,
      "total": 31.06,
      "payment_type": "Credit card"
    }
  ]
}

For more information on querying data, refer to Database Query Interface.

Teardown

Tear down the package and data using the command below.

kxi pm teardown --rm-data $PKG

Further Reading