Streaming Kafka Ingest
Streams data from Kafka and writes to a data sink
Setting up an example Kafka broker
The example below uses a sample Kafka broker provided as a how to guide.
Deploying the Stream Processor
Below is an example spec.q
Stream Processor application that processes the trades
topic that is being produced by the example producer above. The stream is decoded and windowed into 5 second buckets before being converted to a table and printed to the console for demonstration. This could also publish to a kdb+ tickerplant, write a message back to Kafka, or send the data to one of the other available data sinks.
.qsp.run
.qsp.read.fromKafka["trades"]
.qsp.decode.json[]
.qsp.map[{enlist `time`sym`bid`ask!"PSff"$'x}]
.qsp.window.timer[00:00:05]
.qsp.write.toConsole[]
To deploy the pipeline above to Docker, add a worker to the docker-compose.yaml
configuration.
services:
.. # Indicates excerpt from from previous `docker-compose.yaml` examples
worker:
image: registry.dl.kx.com/kxi-sp-worker:1.6.1
networks:
- app
depends_on:
- kafka
volumes:
- .:/opt/kx/app/data
environment:
- KDB_LICENSE_B64
- KXI_SP_SPEC=/opt/kx/app/data/spec.q
- KXI_SP_KAFKA_BROKERS=kafka:9092
The final configuration for the docker-compose.yaml
should be one of the following
depending on the number of workers configured. For simple, one-off jobs, a single Worker
may be sufficient. For health checks and parallelism or production deployments, it is
recommended that a Controller is deployed to monitor any Workers.
Full configuration:
version: "3.6"
networks:
app:
driver: bridge
services:
zookeeper:
image: "bitnami/zookeeper:latest"
networks:
- app
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: "bitnami/kafka:latest"
networks:
- app
depends_on:
- zookeeper
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
producer:
build:
context: .
dockerfile: Dockerfile
networks:
- app
depends_on:
- kafka
environment:
- KX_KAFKA_BROKERS=kafka:9092
worker:
image: registry.dl.kx.com/kxi-sp-worker:1.6.1
networks:
- app
depends_on:
- kafka
volumes:
- .:/opt/kx/app/data
environment:
- KDB_LICENSE_B64
- KXI_SP_SPEC=/opt/kx/app/data/spec.q
- KXI_SP_KAFKA_BROKERS=kafka:9092
version: "3.6"
networks:
app:
driver: bridge
services:
zookeeper:
image: "bitnami/zookeeper:latest"
networks:
- app
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: "bitnami/kafka:latest"
networks:
- app
depends_on:
- zookeeper
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_NUM_PARTITIONS=3
producer:
build:
context: .
dockerfile: Dockerfile
networks:
- app
depends_on:
- kafka
environment:
- KX_KAFKA_BROKERS=kafka:9092
controller:
image: registry.dl.kx.com/kxi-sp-controller:1.6.1
command: ["-p", "6000"]
networks:
- app
volumes:
- .:/opt/kx/app/data
environment:
- KDB_LICENSE_B64
- KXI_SP_MIN_WORKERS=3
worker:
image: registry.dl.kx.com/kxi-sp-worker:1.6.1
networks:
- app
deploy:
replicas: 3
depends_on:
- kafka
- controller
volumes:
- .:/opt/kx/app/data
environment:
- KDB_LICENSE_B64
- KXI_SP_SPEC=/opt/kx/app/data/spec.q
- KXI_SP_PARENT_HOST=controller:6000
- KXI_SP_KAFKA_BROKERS=kafka:9092
To run the configuration, execute the following.
docker-compose up
To deploy the above pipeline in Kubernetes, first follow the setup for Kubernetes guide. The pipeline can be deployed using a port forwarded Coordinator service.
Multiple Workers
The Stream Processor will automatically scale the number of Workers to balance
the available Kafka partitions across Workers. When deploying the Kafka broker
chart, add --set numPartitions=<number>
to increase the parallelism of the
Kafka ingestion. The Stream Processor will not create more Workers than the specified maxWorkers
.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "kafka",
type : "spec",
settings : {
minWorkers : 1,
maxWorkers : 10
},
config : { content: $spec },
env : { KXI_SP_KAFKA_BROKERS: "kafka:9092" }
}' | jq -asR .)"