Skip to content

Batch S3 Ingest

This page describes how to read bulk data from S3 and write to an Insights Enterprise Database

To do this you will download and deploy a sample workload on a running Insights Enterprise system. It will show how data can be ingested into the database via the Stream Processor in a memory-efficient by writing data directly to the database.

The example assumes the following as a prerequisites;

  • access to the KX Downloads Portal https://portal.dl.kx.com
  • a username and bearer token for the portal saved in your terminal session as $KX_USER and $KX_DL_BEARER respectively
  • a running Insights Enterprise system with the hostname URL stored as $KX_HOSTNAME
  • the relevant version of the sample from here saved as $KX_VERSION
  • you have the CLI configured for your deployment with authentication credentials

For more information on the CLI:

Downloading the sample

You can download the sample with the command below.

curl -s --fail-with-body -D /dev/stderr -u ${KX_USER}:${KX_DL_BEARER} -L -OJ https://portal.dl.kx.com/assets/raw/assembly-samples/${KX_VERSION}/s3-ingest.yaml

AWS credential discovery

The S3 reader supports custom AWS credentials. For more information, see the reader configuration.

The pipeline definition, contained in the s3-ingest.yaml file is contained in the following block. It does the following;

  • reads data from a public S3 bucket
  • uses a CSV decoder to format the data using the schema object
  • renames some columns
  • transforms the data to match the database schema by adding an empty fees column and reordering
  • writes the data to the greentrips schema in the database using a database called s3-ingest
  • this value corresponds to the metadata.name field in the s3-ingest.yaml file
  • directWrite is enabled to specify that data should be written directly to the database instead of streamed
      pipelines:
        transform:
          base: q
          env:
            - name: KX_KURL_DISABLE_AUTO_REGISTER
              value: "1"
          spec: |-
            schema:(!) . flip (
                (`pickup_datetime       ; "p");
                (`dropoff_datetime      ; "p");
                (`vendor                ; "s");
                (`passengers            ; "h");
                (`distance              ; "e");
                (`rate_type             ; "*");
                (`store_and_fwd         ; "*");
                (`pickup_borough        ; "*");
                (`pickup_zone           ; "*");
                (`pickup_service_zone   ; "*");
                (`dropoff_borough       ; "*");
                (`dropoff_zone          ; "*");
                (`dropoff_service_zone  ; "*");
                (`payment_type          ; "s");
                (`fare                  ; "e");
                (`extra                 ; "e");
                (`tax                   ; "e");
                (`tip                   ; "e");
                (`toll                  ; "e");
                (`improvement_surcharge ; "*");
                (`congestion_surcharge  ; "*");
                (`total                 ; "e"));

            exclude:`rate_type`store_and_fwd`pickup_borough`pickup_zone`pickup_service_zone`dropoff_borough`dropoff_zone`dropoff_service_zone`improvement_surcharge`congestion_surcharge;

            .qsp.run
                .qsp.read.fromAmazonS3["s3://kxs-prd-cxt-twg-roinsightsdemo/ny_taxi_data_2021_12.csv"; "eu-west-1"]
                .qsp.decode.csv[schema; .qsp.use enlist[`exclude]!enlist exclude]
                .qsp.transform.renameColumns[`pickup_datetime`dropoff_datetime`toll!`pickup`dropoff`tolls]
                .qsp.map[{ :`vendor`pickup`dropoff`passengers`distance`fare`extra`tax`tip`tolls`fees`total`payment_type xcols update fees:0e from x }]
                .qsp.v2.write.toDatabase[`greentrips; .qsp.use `directWrite`database!(1b; "s3-ingest")]

Deploying

Then authenticate with Insights Enterprise and deploy the sample.

kxi auth login
kxi assembly deploy -f s3-ingest.yaml

This deploys a database, a stream, and an SP pipeline. It contains a taxi CSV dataset from Amazon S3 and ingests it into the deployed database. Once deployed, the pipeline will begin to ingest data and write to the database.

Checking progress

The ingestion may take a few minutes to complete. You can check the progress using the Insights Enterprise REST APIs. Firstly you should obtain the token generated when you authenticated against the system.

export KX_TOKEN=$(kxi auth print-token)

Note

If this token expires, you can regenerate it by running kxi auth login again and re-storing with the command above.

You can check the progress of the SP pipeline using the details API.

curl -H "Authorization: Bearer $KX_TOKEN" ${KX_HOSTNAME}/streamprocessor/details

The pipeline state is RUNNING indicating that it is processing data.

[
  {
    "id": "s3-ingest-transform",
    "name": "transform",
    "group": "g-785606756774",
    "mode": "default",
    "created": "2024-11-22T16:05:56.000000000",
    "lastSeen": "2024-11-22T16:06:44.826631122",
    "state": "RUNNING",
    "error": "",
    "metrics": {
      "eventRate": 0,
      "bytesRate": 0,
      "latency": null,
      "inputRate": 0,
      "outputRate": 0
    },
    "logCounts": {
      "trace": 0,
      "debug": 0,
      "info": 0,
      "warn": 0,
      "error": 0,
      "fatal": 0
    },
    "packageName": "__UNPACKAGED__",
    "reportedMetadata": []
  }
]

You can also check the state of the ingest at the database.

curl -H "Authorization: Bearer $KX_TOKEN" ${KX_HOSTNAME}/servicegateway/database/s3-ingest/ingest | jq .

This has a state of pending while SP is still writing data and updates to progressing once SP has finished writing.

[
  {
    "name": "s3-ingest-transform-0",
    "pipeline": "",
    "database": "s3-ingest",
    "updtype": "ingest",
    "status": "pending",
    "details": [],
    "tbls": [],
    "dates": [],
    "progress": {
      "cmdCurrent": "",
      "cmdIndex": null,
      "cmdTotal": null,
      "subCurrent": "",
      "subIndex": null,
      "subTotal": null
    },
    "error": [],
    "updated": "2024-11-22T16:06:44.819755661"
  }
]

When it has finished loading the data, the SP and ingest status APIs will return FINISHED and completed respectively.

[
  {
    "id": "s3-ingest-transform",
    "name": "transform",
    "group": "g-785606756774",
    "mode": "default",
    "created": "2024-11-22T16:05:56.000000000",
    "lastSeen": "2024-11-22T16:38:31.378461195",
    "state": "FINISHED",
    "error": "",
    "metrics": {
      "eventRate": 0,
      "bytesRate": 0,
      "latency": null,
      "inputRate": 0,
      "outputRate": 0
    },
    "logCounts": {
      "trace": 0,
      "debug": 0,
      "info": 0,
      "warn": 0,
      "error": 0,
      "fatal": 0
    },
    "packageName": "__UNPACKAGED__",
    "reportedMetadata": []
  }
]
curl -H "Authorization: Bearer $KX_TOKEN" ${KX_HOSTNAME}/servicegateway/database/s3-ingest/ingest | jq .
[
  {
    "name": "s3-ingest-transform-0",
    "pipeline": "transform",
    "database": "s3-ingest",
    "updtype": "ingest",
    "status": "completed",
    "details": {
      "kxSessionInfo.3922c758-5450-4877-a61b-9c9ef01d4511": {
        "pipelineName": "transform",
        "pipelineID": "s3-ingest-transform",
        "workerName": "s3-ingest-transform-1-spwork",
        "operatorID": "database.writer_greentrips",
        "ingestStartTime": "2024-11-22T16:06:44.809727654",
        "ingestEndTime": "2024-11-22T16:08:34.247927644"
      },
      "subsessions": [
        "3922c758-5450-4877-a61b-9c9ef01d4511"
      ],
      "dates": [
        "2021-12-01",
        ..
        "2021-12-31"
      ],
      "tables": [
        "greentrips"
      ]
    },
    "tbls": [
      "greentrips"
    ],
    "dates": [
      "2021-12-01",
      ..
      "2021-12-31"
    ],
    "progress": {
      "cmdCurrent": "",
      "cmdIndex": 31,
      "cmdTotal": 31,
      "subCurrent": "",
      "subIndex": 0,
      "subTotal": 0
    },
    "error": "",
    "updated": "2024-11-22T16:09:23.128889708"
  }
]

Python

The pipeline can be updated to a Python version by replacing the pipelines: block with the code below.

The logic is identical to the q version defined above;

  • reads file from Amazon S3
  • applies a CSV decoder using the schema object to format correctly
  • renames, reorders and adds a missing fees column
  • writes to the greentrips table in the s3-ingest database
  • directWrite is enabled to specify that data should be written directly to the database instead of streamed
      pipelines:
        transform:
          base: py
          env:
            - name: KX_KURL_DISABLE_AUTO_REGISTER
              value: "1"
          spec: |-
            import pykx as kx
            from kxi import sp

            schema = {
                'pickup_datetime': 'p',
                'dropoff_datetime': 'p',
                'vendor': 's',
                'passengers': 'h',
                'distance': 'e',
                'rate_type': '*',
                'store_and_fwd': '*',
                'pickup_borough': '*',
                'pickup_zone': '*',
                'pickup_service_zone': '*',
                'dropoff_borough': '*',
                'dropoff_zone': '*',
                'dropoff_service_zone': '*',
                'payment_type': 's',
                'fare': 'e',
                'extra': 'e',
                'tax': 'e',
                'tip': 'e',
                'toll': 'e',
                'improvement_surcharge': '*',
                'congestion_surcharge': '*',
                'total': 'e'
            }

            exclude = ['rate_type', 'store_and_fwd', 'pickup_borough', 'pickup_zone', 'pickup_service_zone', 'dropoff_borough', 'dropoff_zone', 'dropoff_service_zone', 'improvement_surcharge', 'congestion_surcharge']

            def reorder(data):
                order = ['vendor', 'pickup', 'dropoff', 'passengers', 'distance', 'fare', 'extra', 'tax', 'tip', 'tolls', 'fees', 'total', 'payment_type']
                data = kx.q.qsql.update(data, {'fees': '0e'})
                return kx.q.xcols(order, data)

            sp.run(sp.read.from_amazon_s3('s3://kxs-prd-cxt-twg-roinsightsdemo/ny_taxi_data_2021_12.csv', region='eu-west-1')
                | sp.decode.csv(schema, exclude=exclude)
                | sp.transform.rename_columns({'pickup_datetime': 'pickup', 'dropoff_datetime': 'dropoff', 'toll': 'tolls'})
                | sp.map(reorder)
                | sp.write.to_database('greentrips', database='s3-ingest', directWrite=True, api_version=2))

The same instructions can be used to deploy the sample and check the status.

Further Reading