Skip to content

Kafka with TLS

Deploy a Kafka reader with TLS certificates

The TLS setup page showed how to create the keys and certificates for configuring the broker and SP client. The next step is to deploy Kafka and a Stream Processor pipeline.

To read from a topic trades using TLS, the certificates above can be mounted into a Worker and loaded by the Stream Processor. The following is an example program specification that could be used to ingest from Kafka.

Example program called spec.q:

.qsp.run
    .qsp.read.fromKafka[.qsp.use (!) . flip (
        (`brokers                  ; "kafka:9092");
        (`topic                    ; "trades");
        (`options; (!) . flip (
            (`security.protocol        ; "SSL");
            (`ssl.ca.location          ; "/certs/ca-cert.pem");
            (`ssl.certificate.location ; "/certs/cert.pem");
            (`ssl.key.location         ; "/certs/key.pem");
            (`ssl.key.password         ; "iamsecure"))))]
    .qsp.decode.json[]
    .qsp.map[{ flip `time`sym`bid`ask!"PSff"$'flip x }]
    .qsp.window.timer[00:00:10]
    .qsp.write.toProcess[.qsp.use `handle`mode`target!(`:localhost:4000; `function; `publish)]

Example docker-compose.yml:

version: "3.6"

services:
  zookeeper:
    image: "bitnami/zookeeper:latest"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: "bitnami/kafka:latest"
    depends_on:
      - zookeeper
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_TLS_TYPE=JKS
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SSL,CLIENT:SSL
      - KAFKA_CFG_LISTENERS=INTERNAL://:9093,CLIENT://:9092
      - KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
      - KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka:9093,CLIENT://kafka:9092
      - KAFKA_CERTIFICATE_PASSWORD=iamsecure
    volumes:
      - ./keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro
      - ./truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro
  controller:
    image: portal.dl.kx.com/kxi-sp-controller:1.13.0
    environment:
      - KDB_LICENSE_B64
    command: ["-p", "6000"]
  worker:
    image: portal.dl.kx.com/kxi-sp-worker:1.13.0
    environment:
      - KXI_SP_SPEC=/opt/kx/app/data/spec.q
      - KXI_SP_KAFKA_BROKERS=kafka:9092
      - KXI_SP_KAFKA_SSL_CA_LOCATION=/opt/kx/app/data/ca-cert.pem
      - KXI_SP_KAFKA_SSL_KEY_LOCATION=/opt/kx/app/data/key.pem
      - KXI_SP_KAFKA_SSL_CERT_LOCATION=/opt/kx/app/data/cert.pem
      - KXI_SP_KAFKA_SSL_KEY_PASSWORD=iamsecure
    volumes:
      - .:/opt/kx/app/data
    command: ["-p", "5000"]

To read a topic trades using TLS, a Kubernetes TLS secret can be used to load the client certificate and key into a Worker.

Create a secret named kafka-certs.

kubectl create secret tls kafka-certs --cert=cert.pem --key=key.pem

Optionally create a secret for the certificate password.

kubectl create secret generic kafka-pass --from-literal=password=iamsecure

Example Kafka streaming program.

.qsp.run
    .qsp.read.fromKafka[.qsp.use (!) . flip (
        (`brokers                  ; "kafka:9092");
        (`topic                    ; "trades");
        (`options; (!) . flip (
            (`security.protocol        ; "SSL");
            (`ssl.certificate.location ; .qsp.configPath["kafka-certs"],"/tls.crt");
            (`ssl.key.location         ; .qsp.configPath["kafka-certs"],"/tls.key");
            (`ssl.key.password         ; "c"$read1 hsym `$.qsp.configPath["kafka-pass"],"/password"))))]
    .qsp.decode.json[]
    .qsp.map[{ flip `time`sym`bid`ask!"PSff"$'flip x }]
    .qsp.window.timer[00:00:10]
    .qsp.write.toProcess[.qsp.use `handle`mode`target!(`:localhost:4000; `function; `publish)]

Deploy the program. The Stream Processor Coordinator must be deployed and accessible.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "kafka",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets: ["kafka-certs", "kafka-pass"] }
    }' | jq -asR .)"

Turn off certificate verification for self-signed certificates

When using self-signed certificates the CA root will not be installed in the worker image. This will result in a certificate verification failure. In this case, KXI_SP_KAFKA_SSL_VERIFY_CERTIFICATES must be set to "false". This is not recommended for production installations.

This can be added to the deployment request under an env field.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "kafka",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets: ["kafka-certs", "kafka-pass"] },
        env        : { KXI_SP_KAFKA_SSL_VERIFY_CERTIFICATES: "false" }
    }' | jq -asR .)"