Kafka setup
A guide on setting up a Kafka broker installation with a simple Java finance producer
Kafka organizes streaming data into topics which are subdivided by numeric independent partitions. This example walks through creating a simple Java Kafka producer that publishes data into a Kafka broker to be consumed by the kdb Insights Stream Processor.
Use TLS in production
It is recommended for production deployments that Kafka be configured with TLS encryption. For a guide on setting up TLS with Kafka and the KX Stream Processor, refer to the Kafka with TLS guide.
Deploying a Kafka broker
This example uses the bitnami/kafka
images
for deploying to both Docker and Kubernetes.
To deploy a broker within a Docker configuration, add a ZooKeeper and Kafka container
to a docker-compose.yaml
.
version: "3.6"
networks:
app:
driver: bridge
services:
zookeeper:
image: "bitnami/zookeeper:latest"
networks:
- app
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: "bitnami/kafka:latest"
networks:
- app
depends_on:
- zookeeper
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
This will deploy a single broker in a Docker Compose configuration. Next, a producer and SP worker will be added to the configuration to process a data feed.
To install a Kafka broker to Kubernetes, simply add the
bitnami/kafka
helm repository. This deployment will
launch a three node broker cluster.
Kafka may restart several times
While the Kafka brokers are initializing, they may restart several times before they determine which node will be the leader. Once the brokers elect a leader they will stabilize.
helm repo add bitnami https://charts.bitnami.com/bitnami
helm install kafka \
--set replicaCount=3 \
--set livenessProbe.initialDelaySeconds=60 \
--set readinessProbe.initialDelaySeconds=60 \
bitnami/kafka
Deploying a producer
Market producer
A sample Java based market data generator is provided below for creating sample data for this Kafka example.
Compiling the producer:
The example code was written using Kafka 2.7.1 and Java 1.8 or newer. To compile
the JAR file, run javac
with the classpath set to include the Kafka development
libraries. Download the latest Kafka package from the
Kafka downloads page. Make sure to use the prebuilt one and not the src
package.
mkdir -p build
# This command assumes that the Kafka package was downloaded and unzipped into the
# `market-producer` directory as `kafka`
javac -cp 'kafka/libs/*:.' *.java -d build
jar cf market-generator.jar -C build .
The producer outputs two sample topics; trades
and quotes
. For each topic, the data is encoded
as JSON arrays.
trades:
# [time, ticker, price, size]
["2021-10-08 16:16:21", "ABC", 123.45, 10]
quotes:
# [time, ticker, bid, bidSize, ask, askSize]
["2021-10-08 16:16:21", "ABC", 123.45, 10, 124.56, 20]
If desired, column names can also be sent along with the data when performance is not critical by setting the KX_INCLUDE_JSON_COLUMNS
variable to "true"
:
trades:
[{"time": "2021-10-08 16:16:21", "ticker": "ABC", "price": 123.45, "size": 10}]
quotes:
[{"time": "2021-10-08 16:16:21", "ticker": "ABC", "bid": 123.45, "bid_size": 10, "ask": 124.56, "ask_size": 20}]
Set KX_INCLUDE_JSON_COLUMNS
when using data with the kdb Insights UI
If ingesting this data using the kdb Insights Enterprise UI, make sure to enable column names by setting KX_INCLUDE_JSON_COLUMNS="true".
Producer configuration
The producer exposes configuration parameters with environment variables. The table below outlines the variables that can be used to configure the producer.
variable | description |
---|---|
KX_INCLUDE_JSON_COLUMNS |
Include column names when publishing JSON data (default 'false'). |
KX_KAFKA_BROKERS |
The host and port of the Kafka broker to connect to. |
KX_KAFKA_ACKS |
Indicates if the producer should wait for acknowledgements before sending more data (default '0'). |
KX_KAFKA_RETRIES |
Indicates if failed messages should be retried or dropped (default '0'). |
KX_KAFKA_WAIT_TIME |
Number of milliseconds to wait between publishes (default '1000'). Increase to reduce throughput. |
KX_KAFKA_SAMPLE_ITERATIONS |
Number of publish iterations to sample for performance before logging (default '10'). |
KX_KAFKA_TRUSTSTORE_LOCATION |
Path to TLS truststore.jks file when using TLS encryption. |
KX_KAFKA_KEYSTORE_LOCATION |
Path to TLS keystore.jks file when using TLS encryption. |
KX_KAFKA_USE_SSL |
If TLS/SSL encryption should be used for communication (default 'false'). |
KX_KAFKA_CERTIFICATE_PASSWORD |
Password for TLS certificate if used. |
KX_KAFKA_SSL_CERTIFICATE_VERIFICATION |
Indicates if TLS/SSL certificate verification should be used (default 'true'). |
Containerized Kafka producer
With the Kafka release downloaded to a folder titled kafka
and market-generator.jar
in the same
folder, the following Dockerfile
configuration will run the producer in a container.
FROM openjdk:17-jdk-alpine3.13
WORKDIR /opt/kx
COPY kafka kafka
COPY market-generator.jar market-generator.jar
CMD ["sh", "-c", "java -cp market-generator.jar:kafka/libs/* MarketProducer"]
Deploying the producer
To add the producer to the Docker deployment, add a producer service to the configuration that
uses the Dockerfile
described above
services:
.. # Indicates excerpt from from previous `docker-compose.yaml` example
producer:
build:
context: .
dockerfile: Dockerfile
networks:
- app
depends_on:
- kafka
environment:
- KX_KAFKA_BROKERS=kafka:9092
To deploy the producer to a Kubernetes pod, the Dockerfile
above needs to be built and published
to a container registry so it can be pulled into the Kubernetes cluster. In this example we push an example
Docker Hub registry called myorg
.
docker build --tag myorg/kx-kafka-producer:1.0.0 .
docker push myorg/kx-kafka-producer:1.0.0
Create the following producer.yaml
deployment configuration. Remember to replace myorg
with the
repository used in the command above.
apiVersion: v1
kind: Pod
metadata:
name: kx-kafka-producer
labels:
app.kubernetes.io/name: Kafka_Producer
app.kubernetes.io/component: kx-kafka-producer
spec:
containers:
- name: producer
image: myorg/kx-kafka-producer:1.0.0
env:
- name: KX_KAFKA_BROKERS
value: "kafka:9092"
Deploy the pod by applying the pod configuration with kubectl
.
kubectl apply -f producer.yaml
Uninstalling the producer
To remove the producer deployment, delete the pod configuration with kubectl
kubectl delete pod kx-kafka-producer
Now that a Kafka broker and producer has been deployed, follow one of the guides below to connect Kafka with the kdb Insights Stream Processor.