Skip to content

Streaming Kafka Ingest

Streams data from Kafka and writes to a data sink

Kafka organizes streaming data into topics which are subdivided by numeric independent partitions. This example walks through creating a simple Java Kafka producer that publishes data into a Kafka broker to be consumed by the KX Insights Stream Processor.

Use TLS in production

It is recommended for production deployments that Kafka be configured with TLS encryption. For a guide on setting up TLS with Kafka and the KX Stream Processor, refer to the Kafka with TLS guide.

Deploying the Stream Processor

Below is an example spec.q Stream Processor application that processes the trades topic that is being produced by the example producer above. The stream is decoded and windowed into 5 second buckets before being converted to a table and printed to the console for demonstration. This could also publish to a kdb+ tickerplant, write a message back to Kafka, or send the data to one of the other available data sinks.

.qsp.run
  .qsp.read.fromKafka["trades"]
  .qsp.decode.json[]
  .qsp.map[{enlist `time`sym`bid`ask!"PSff"$'x}]
  .qsp.window.timer[00:00:05]
  .qsp.write.toConsole[]

To run an ingestion pipeline using the KX Insights platform, first deploy the base system. Next, download the kafka_assembly.yaml assembly by following the download instructions. Deploy the assembly with the following command.

kubectl apply -f kafka_assembly.yaml

Uninstalling the assembly

To remove the assembly, delete the Kubernetes assembly resource.

kubectl delete assembly.insights.kx.com/kafka-assembly