Skip to content

Kafka setup

A guide on setting up a Kafka broker installation with a simple Java finance producer

Kafka organizes streaming data into topics which are subdivided by numeric independent partitions. This example walks through creating a simple Java Kafka producer that publishes data into a Kafka broker to be consumed by the KX Insights Stream Processor.

Use TLS in production

It is recommended for production deployments that Kafka be configured with TLS encryption. For a guide on setting up TLS with Kafka and the KX Stream Processor, refer to the Kafka with TLS guide.

Deploying a Kafka broker

This example uses the bitnami/kafka images for deploying to both Docker and Kubernetes.

To deploy a broker within a Docker configuration, add a ZooKeeper and Kafka container to a docker-compose.yaml.

version: "3.6"
networks:
  app:
    driver: bridge
services:
  zookeeper:
    image: "bitnami/zookeeper:latest"
    networks:
      - app
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: "bitnami/kafka:latest"
    networks:
      - app
    depends_on:
      - zookeeper
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181

This will deploy a single broker in a Docker Compose configuration. Next, a producer and SP worker will be added to the configuration to process a data feed.

To install a Kafka broker to Kubernetes, simply add the bitnami/kafka helm repository. This deployment will launch a three node broker cluster.

Kafka may restart several times

While the Kafka brokers are initializing, they may restart several times before they determine which node will be the leader. Once the brokers elect a leader they will stabilize.

helm repo add bitnami https://charts.bitnami.com/bitnami

helm install kafka \
   --set replicaCount=3  \
   --set livenessProbe.initialDelaySeconds=60 \
   --set readinessProbe.initialDelaySeconds=60 \
   bitnami/kafka

Deploying a producer

Market producer

A sample Java based market data generator is provided below for creating sample data for this Kafka example.

Download example code

Compiling the producer:

The example code was written using Kafka 2.7.1 and Java 1.8 or newer. To compile the JAR file, run javac with the classpath set to include the Kafka development libraries. Download the latest Kafka package from the Kafka downloads page. Make sure to use the prebuilt one and not the src package.

mkdir -p build

# This command assumes that the Kafka package was downloaded and unzipped into the
# `market-producer` directory as `kafka`
javac -cp 'kafka/libs/*:.' *.java -d build
jar cf market-generator.jar -C build .

The producer outputs two sample topics; trades and quotes. For each topic, the data is encoded as JSON arrays.

trades:

# [time, ticker, price, size]
["2021-10-08 16:16:21", "ABC", 123.45, 10]

quotes:

# [time, ticker, bid, bidSize, ask, askSize]
["2021-10-08 16:16:21", "ABC", 123.45, 10, 124.56, 20]

If desired, column names can also be sent along with the data when performance is not critical by setting the KX_INCLUDE_JSON_COLUMNS variable to "true":

trades:

[{"time": "2021-10-08 16:16:21", "ticker": "ABC", "price": 123.45, "size": 10}]

quotes:

[{"time": "2021-10-08 16:16:21", "ticker": "ABC", "bid": 123.45, "bid_size": 10, "ask": 124.56, "ask_size": 20}]

Set KX_INCLUDE_JSON_COLUMNS when using data with the kdb Insights UI

If ingesting this data using the KX Insights UI, make sure to enable column names by setting KX_INCLUDE_JSON_COLUMNS="true".

Producer configuration

The producer exposes configuration parameters with environment variables. The table below outlines the variables that can be used to configure the producer.

variable description
KX_INCLUDE_JSON_COLUMNS Include column names when publishing JSON data (default 'false').
KX_KAFKA_BROKERS The host and port of the Kafka broker to connect to.
KX_KAFKA_ACKS Indicates if the producer should wait for acknowledgements before sending more data (default '0').
KX_KAFKA_RETRIES Indicates if failed messages should be retried or dropped (default '0').
KX_KAFKA_WAIT_TIME Number of milliseconds to wait between publishes (default '1000'). Increase to reduce throughput.
KX_KAFKA_SAMPLE_ITERATIONS Number of publish iterations to sample for performance before logging (default '10').
KX_KAFKA_TRUSTSTORE_LOCATION Path to TLS truststore.jks file when using TLS encryption.
KX_KAFKA_KEYSTORE_LOCATION Path to TLS keystore.jks file when using TLS encryption.
KX_KAFKA_USE_SSL If TLS/SSL encryption should be used for communication (default 'false').
KX_KAFKA_CERTIFICATE_PASSWORD Password for TLS certificate if used.
KX_KAFKA_SSL_CERTIFICATE_VERIFICATION Indicates if TLS/SSL certificate verification should be used (default 'true').

Containerized Kafka producer

With the Kafka release downloaded to a folder titled kafka and market-generator.jar in the same folder, the following Dockerfile configuration will run the producer in a container.

FROM openjdk:17-jdk-alpine3.13
WORKDIR /opt/kx
COPY kafka kafka
COPY market-generator.jar market-generator.jar
CMD ["sh", "-c", "java -cp market-generator.jar:kafka/libs/* MarketProducer"]

Deploying the producer

To add the producer to the Docker deployment, add a producer service to the configuration that uses the Dockerfile described above

services:
    .. # Indicates excerpt from from previous `docker-compose.yaml` example
  producer:
    build:
      context: .
      dockerfile: Dockerfile
    networks:
      - app
    depends_on:
      - kafka
    environment:
      - KX_KAFKA_BROKERS=kafka:9092

To deploy the producer to a Kubernetes pod, the Dockerfile above needs to be built and published to a container registry so it can be pulled into the Kubernetes cluster. In this example we push an example Docker Hub registry called myorg.

docker build --tag myorg/kx-kafka-producer:1.0.0 .
docker push myorg/kx-kafka-producer:1.0.0

Create the following producer.yaml deployment configuration. Remember to replace myorg with the repository used in the command above.

apiVersion: v1
kind: Pod
metadata:
  name: kx-kafka-producer
  labels:
    app.kubernetes.io/name: Kafka_Producer
    app.kubernetes.io/component: kx-kafka-producer
spec:
  containers:
  - name: producer
    image: myorg/kx-kafka-producer:1.0.0
    env:
    - name: KX_KAFKA_BROKERS
      value: "kafka:9092"

Deploy the pod by applying the pod configuration with kubectl.

kubectl apply -f producer.yaml

Uninstalling the producer

To remove the producer deployment, delete the pod configuration with kubectl

kubectl delete pod kx-kafka-producer

Now that a Kafka broker and producer has been deployed, follow one of the guides below to connect Kafka with the kdb Insights Stream Processor.

Ingest Kafka topics with the Stream Processor