Skip to content

Streaming Kafka Ingest

Refer to the example overview for details about this example.

Below is an example spec.q Stream Processor application that processes the trades topic that is being produced by the example producer above. The stream is decoded and windowed into 5 second buckets before being converted to a table and printed to the console for demonstration. This could also publish to a kdb+ tickerplant, write a message back to Kafka, or send the data to one of the other available data sinks.

.qsp.run
  .qsp.read.fromKafka["trades"]
  .qsp.decode.json[]
  .qsp.map[{enlist `time`sym`bid`ask!"PSff"$'x}]
  .qsp.window.timer[00:00:05]
  .qsp.write.toDatabase["trades"; "equities"]
import sp from kxi
import pykx as kx
import pandas as pd
import dateutil
from datetime import timedelta

def parse_data(data):
    return pd.DataFrame([{
        "time": kx.TimestampAtom(dateutil.parser.isoparse(data[0])),
        "sym": kx.SymbolAtom(data[1]),
        "bid": kx.FloatAtom(float(data[2])),
        "ask": kx.FloatAtom(float(data[3]))
    }])

sp.run(sp.read.from_kafka("trades")
  | sp.decode.json()
  | sp.map(parse_data)
  | sp.window.timer(timedelta(seconds=5))
  | sp.write.to_database("trades", "equities"))

To run an ingestion pipeline using the kdb Insights Enterprise, first deploy the base system. Next, download the kafka_assembly.yaml assembly by following the download instructions. Deploy the assembly with the following kdb Insights CLI command.

kxi assembly create --filepath kafka_assembly.yaml

Uninstalling the assembly

Remove the assembly with the following kdb Insights CLI command:

kxi assembly teardown --name kafka_assembly