Skip to content

Streaming Kafka Ingest

Streams data from Kafka and writes to a data sink

Setting up an example Kafka broker

The example below uses a sample Kafka broker provided as a how to guide.

Deploying the Stream Processor

Below is an example spec.q Stream Processor application that processes the trades topic that is being produced by the example producer above. The stream is decoded and windowed into 5 second buckets before being converted to a table and printed to the console for demonstration. This could also publish to a kdb+ tickerplant, write a message back to Kafka, or send the data to one of the other available data sinks.

.qsp.run
  .qsp.read.fromKafka["trades"]
  .qsp.decode.json[]
  .qsp.map[{enlist `time`sym`bid`ask!"PSff"$'x}]
  .qsp.window.timer[00:00:05]
  .qsp.write.toConsole[]

To deploy the pipeline above to Docker, add a worker to the docker-compose.yaml configuration.

services:
    .. # Indicates excerpt from from previous `docker-compose.yaml` examples
  worker:
    image: registry.dl.kx.com/kxi-sp-worker:1.6.1
    networks:
      - app
    depends_on:
      - kafka
    volumes:
      - .:/opt/kx/app/data
    environment:
      - KDB_LICENSE_B64
      - KXI_SP_SPEC=/opt/kx/app/data/spec.q
      - KXI_SP_KAFKA_BROKERS=kafka:9092

The final configuration for the docker-compose.yaml should be one of the following depending on the number of workers configured. For simple, one-off jobs, a single Worker may be sufficient. For health checks and parallelism or production deployments, it is recommended that a Controller is deployed to monitor any Workers.

Full configuration:

version: "3.6"
networks:
  app:
    driver: bridge
services:
  zookeeper:
    image: "bitnami/zookeeper:latest"
    networks:
      - app
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: "bitnami/kafka:latest"
    networks:
      - app
    depends_on:
      - zookeeper
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
  producer:
    build:
      context: .
      dockerfile: Dockerfile
    networks:
      - app
    depends_on:
      - kafka
    environment:
      - KX_KAFKA_BROKERS=kafka:9092
  worker:
    image: registry.dl.kx.com/kxi-sp-worker:1.6.1
    networks:
      - app
    depends_on:
      - kafka
    volumes:
      - .:/opt/kx/app/data
    environment:
      - KDB_LICENSE_B64
      - KXI_SP_SPEC=/opt/kx/app/data/spec.q
      - KXI_SP_KAFKA_BROKERS=kafka:9092
version: "3.6"
networks:
  app:
    driver: bridge
services:
  zookeeper:
    image: "bitnami/zookeeper:latest"
    networks:
      - app
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: "bitnami/kafka:latest"
    networks:
      - app
    depends_on:
      - zookeeper
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_NUM_PARTITIONS=3
  producer:
    build:
      context: .
      dockerfile: Dockerfile
    networks:
      - app
    depends_on:
      - kafka
    environment:
      - KX_KAFKA_BROKERS=kafka:9092
  controller:
    image: registry.dl.kx.com/kxi-sp-controller:1.6.1
    command: ["-p", "6000"]
    networks:
      - app
    volumes:
      - .:/opt/kx/app/data
    environment:
      - KDB_LICENSE_B64
      - KXI_SP_MIN_WORKERS=3
  worker:
    image: registry.dl.kx.com/kxi-sp-worker:1.6.1
    networks:
      - app
    deploy:
      replicas: 3
    depends_on:
      - kafka
      - controller
    volumes:
      - .:/opt/kx/app/data
    environment:
      - KDB_LICENSE_B64
      - KXI_SP_SPEC=/opt/kx/app/data/spec.q
      - KXI_SP_PARENT_HOST=controller:6000
      - KXI_SP_KAFKA_BROKERS=kafka:9092

To run the configuration, execute the following.

docker-compose up

To deploy the above pipeline in Kubernetes, first follow the setup for Kubernetes guide. The pipeline can be deployed using a port forwarded Coordinator service.

Multiple Workers

The Stream Processor will automatically scale the number of Workers to balance the available Kafka partitions across Workers. When deploying the Kafka broker chart, add --set numPartitions=<number> to increase the parallelism of the Kafka ingestion. The Stream Processor will not create more Workers than the specified maxWorkers.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n --arg spec "$(cat spec.q)" \
    '{
        name     : "kafka",
        type     : "spec",
        settings : {
            minWorkers : 1,
            maxWorkers : 10
        },
        config   : { content: $spec },
        env : { KXI_SP_KAFKA_BROKERS: "kafka:9092" }
    }' | jq -asR .)"