Skip to content

Streaming Kafka Ingest

This page describes how to read data from Kafka and visualize it in a kdb Insights View.

Apache Kafka is an event streaming platform that seamlessly integrates with kdb Insights Enterprise, enabling real-time data processing through pipelines connected to Kafka data sources. For this example, we have provided a Kafka subway feed that generates live alerts for NYC Subway trains, including arrival times, station coordinates, direction, and route details.

This example shows how to use the CLI to do the following:

Before you begin, you must complete the prerequisites described in the following section.

Streaming subway data

Refer to the Streaming subway data to a data grid in Views walkthrough for instructions on setting up and deploying a package to ingest and view Kafka data using the web interface.

Prerequisites

  1. Configure a username and bearer token, for the KX Downloads Portal, as environment variables. To do this:

    • Click the KX Downloads Portal link and follow the steps to login.
    • Click your login name, in the top right-hand corner, and click Token Management.
    • Click Add New Token.
    • Copy the value in the Bearer field and save it for use later.
    • Use the following commands to set the environment variables, where KX_USER is the username you used to login to the KX Downloads Portal, and KX_DL_BEARER is the bearer token you just generated.
        export KX_USER=<USERNAME>
        export KX_DL_BEARER=<TOKEN>
    
  2. A running kdb Insights Enterprise system with the hostname URL configured as the value for the KX_HOSTNAME environment variable.

    export KX_HOSTNAME=<URL>
    
  3. The CLI configured for your deployment with authentication credentials

Downloading the package

  1. Run the following command to set KX_VERSION environment variable in your terminal.

    export KX_VERSION=<VERSION>
    
  2. Download the sample package with the command below.

    curl -s --fail-with-body -D /dev/stderr -u ${KX_USER}:${KX_DL_BEARER} -L -OJ https://portal.dl.kx.com/assets/raw/package-samples/${KX_VERSION}/kafka-ingest-${KX_VERSION}.kxi
    
  3. Run the following command to unpack and view the contents of the kafka-ingest sample package.

    export PKG=kafka-ingest
    kxi package unpack $PKG-$KX_VERSION.kxi
    

    The kafka-ingest sample package contains the files listed below and described in the following table:

    kafka-ingest/
    ├── manifest.yaml
    ├── pipelines
    │   └── subway.yaml
    ├── src
    │   ├── ml.py
    │   ├── ml.q
    │   ├── subway.py
    │   └── subway.q
    └── views
        └── subway.yaml
    
    3 directories, 7 files
    
    Artefact Description
    manifest.yaml Used by the CLI for package management. Do not modify the contents of this file.
    pipelines The pipeline configuration files, subway.yaml.
    src Pipeline spec files.
    subway.q: this is default pipeline spec, written in q. This is located under kafka-ingest/src/subway.q in the unpacked package.
    subway.py: The equivalent Python pipeline spec is located under kafka-ingest/src/subway.py.
    ml.q/ml.py: ML q and Python examples are also contained within the package for reference.
    views The view configuration files, subway.yaml.

Click on the q or Python tab below to view the contents of the q and Python versions of the subway pipeline spec.

kafka-ingest/src/subway.q

subway: ([] trip_id: `symbol$(); arrival_time: `timestamp$(); stop_id: `symbol$(); stop_sequence: `long$(); stop_name: `symbol$(); stop_lat: `float$(); stop_lon: `float$(); route_id: `long$(); trip_headsign: `symbol$(); direction_id: `symbol$(); route_short_name: `symbol$(); route_long_name: `symbol$(); route_desc: `char$(); route_type: `long$(); route_url: `symbol$(); route_color: `symbol$())

.qsp.run
    .qsp.read.fromKafka[.qsp.use (!) . flip (
        (`brokers ; "kafka.trykdb.kx.com:443");
        (`topic   ; "subway");
        (`options; (!) . flip (
            (`security.protocol ; "SASL_SSL");
            (`sasl.username     ; "demo");
            (`sasl.password     ; "demo");
            (`sasl.mechanism    ; "SCRAM-SHA-512"))))]
    .qsp.decode.json[]
    .qsp.map[{ enlist x }]
    .qsp.transform.schema[subway]
    .qsp.write.toSubscriber[`subway;`trip_id]

kafka-ingest/src/subway.py

schema = {'trip_id': pykx.SymbolAtom,
        'arrival_time': pykx.TimestampAtom,
        'stop_id': pykx.SymbolAtom,
        'stop_sequence': pykx.ShortAtom,
        'stop_name': pykx.SymbolAtom,
        'stop_lat': pykx.FloatAtom,
        'stop_lon': pykx.FloatAtom,
        'route_id': pykx.ShortAtom,
        'trip_headsign': pykx.SymbolAtom,
        'direction_id': pykx.SymbolAtom,
        'route_short_name': pykx.SymbolAtom,
        'route_long_name': pykx.SymbolAtom,
        'route_desc': pykx.List,
        'route_type': pykx.ShortAtom,
        'route_url': pykx.SymbolAtom,
        'route_color': pykx.SymbolAtom}

sp.run(sp.read.from_kafka('subway',
                        brokers='kafka.trykdb.kx.com:443',
                        options={'sasl.username': 'demo',
                                'sasl.password': 'demo',
                                'sasl.mechanism': 'SCRAM-SHA-512',
                                'security.protocol': 'SASL_SSL'})
    | sp.decode.json()
    | sp.transform.schema(schema)
    | sp.write.to_subscriber('subway', 'trip_id'))

The subway pipeline spec file can be summarised as follows:

Object/Node q Object Python Object Description
Schema table pykx.Dict The schema definition used for type conversion when parsing incoming data.
Read from Kafka .qsp.read.fromKafka sp.read.from_kafka Setup Kafka reader with topic, broker and security config.
Decode JSON .qsp.decode.json sp.decode.json Use JSON decoder to decode incoming data.
Transform Schema .qsp.transform.schema sp.transform.schema Parse the incoming data using the defined schema.
Write to Subscriber .qsp.write.toSubscriber sp.write.to_subscribe Write the data to a subscriber named subway with trip_id key column.

Deploying the package

Next, authenticate with kdb Insights Enterprise and deploy the package to begin reading the subway data from Kafka and writing it to the subscriber/web-socket.

  1. Run the following command to authenticate with kdb Insights Enterprise:

    kxi auth login
    
  2. Perform the following setup for Python (skip to 3 for q):

    Unpack the sample package.

    kxi package unpack $PKG-$KX_VERSION.kxi
    

    Replace the values of base and spec in pipelines/subway.yaml to use the Python spec file instead of the q one.

    # base: q
    base: py
    
    # spec: src/subway.q
    spec: src/subway.py
    
  3. Run the following command to deploy the package:

    kxi pm push $PKG
    kxi pm deploy $PKG
    

    Cleaning up resources

    It may be necessary to teardown any previously installed version of the package and clean up resources before deploying a new version of the package.

Viewing output

The subway View is included in the kafka-ingest sample package. To view data for all inbound trains on the 6, 7 or 8th Avenue Express routes:

  1. Log into the kdb Insights Enterprise Web Interface
  2. Navigate to the Views tab and click on the subway View.

Subway View

Checking progress

To check the progress of the running pipeline, use the kdb Insights Enterprise REST APIs.

  1. Get the token generated when you authenticated against the system.

    export KX_TOKEN=$(kxi auth print-token)
    

    Regenerate expired token

    If this token expires, you can regenerate it by running kxi auth login again and re-storing with the command above.

  2. Check the progress of the SP pipeline using the details API, as follows:

    curl -H "Authorization: Bearer $KX_TOKEN" ${KX_HOSTNAME}/streamprocessor/details
    

    A pipeline state of RUNNING indicates that it is processing data.

    [
    {
        "id":"kafka-ingest-subway",
        "name":"subway",
        "group":"g-794319749428",
        "mode":"default",
        "created":"2025-03-03T12:22:29.000000000",
        "lastSeen":"2025-03-03T12:27:11.389548559",
        "state":"RUNNING",
        "error":"",
        "metrics":{
            "eventRate":0,
            "bytesRate":0,
            "latency":null,
            "inputRate":1328.754,
            "outputRate":2.745616
        },
        "logCounts":{
            "trace":0,
            "debug":0,
            "info":157,
            "warn":3,
            "error":0,
            "fatal":0
        },
        "packageName":"kafka-ingest",
        "reportedMetadata":[
            {
                "id":1,
                "name":"kafka-ingest-subway-1-spwork",
                "reportedMetadata":[
                {
                    "operatorID":"subscriber_subway",
                    "plugin":"subscriber",
                    "cacheLimit":2000,
                    "keyCol":"trip_id",
                    "publishFrequency":null,
                    "table":"subway"
                }
                ],
                "pipeID":"kafka-ingest-subway"
            }
        ]
    }
    ]
    

Teardown

You must teardown a previously installed version of the package and clean up resources before you can deploy a new version of the package.

  1. To teardown packages and their resources, run the command below.

    kxi pm teardown --rm-data $PKG
    

Further Reading