Skip to content

Streaming Kafka Ingest

Refer to the example overview for details about this example.

Below is an example spec.q Stream Processor application that processes the trades topic that is being produced by the example producer above. The stream is decoded and windowed into 5 second buckets before being converted to a table and printed to the console for demonstration. This could also publish to a kdb+ tickerplant, write a message back to Kafka, or send the data to one of the other available data sinks.

.qsp.run
  .qsp.read.fromKafka["trades"]
  .qsp.decode.json[]
  .qsp.map[{enlist `time`sym`bid`ask!"PSff"$'x}]
  .qsp.window.timer[00:00:05]
  .qsp.write.toDatabase["trades"; "equities"]
import sp from kxi
import pykx as kx
import pandas as pd
import dateutil
from datetime import timedelta

def parse_data(data):
    return pd.DataFrame([{
        "time": kx.TimestampAtom(dateutil.parser.isoparse(data[0])),
        "sym": kx.SymbolAtom(data[1]),
        "bid": kx.FloatAtom(float(data[2])),
        "ask": kx.FloatAtom(float(data[3]))
    }])

sp.run(sp.read.from_kafka("trades")
  | sp.decode.json()
  | sp.map(parse_data)
  | sp.window.timer(timedelta(seconds=5))
  | sp.write.to_database("trades", "equities"))

To run an ingestion pipeline using the KX Insights Enterprise, first deploy the base system. Next, download the kafka_assembly.yaml assembly by following the download instructions. Deploy the assembly with the following command.

kubectl apply -f kafka_assembly.yaml

Uninstalling the assembly

To remove the assembly, delete the Kubernetes assembly resource.

kubectl delete asm kafka-assembly