Skip to content

Kafka with TLS

Deploy a Kafka reader with TLS certificates

For more information about TLS such as creating certificates and configuring a Kafka broker, refer to the Kafka with TLS tutorial.

To read from a topic trades using TLS, the certificates above can be mounted into a Worker and loaded by the Stream Processor. The following is an example program specification that could be used to ingest from Kafka.

Example program called spec.q:

.qsp.run
    .qsp.read.fromKafka[.qsp.use (!) . flip (
        (`brokers                  ; "kafka:9092");
        (`topic                    ; "trades");
        (`options; (!) . flip (
            (`security.protocol        ; "SSL");
            (`ssl.ca.location          ; "/certs/ca-cert.pem");
            (`ssl.certificate.location ; "/certs/cert.pem");
            (`ssl.key.location         ; "/certs/key.pem");
            (`ssl.key.password         ; "iamsecure"))))]
    .qsp.decode.json[]
    .qsp.map[{ flip `time`sym`bid`ask!"PSff"$'flip x }]
    .qsp.window.timer[00:00:10]
    .qsp.write.toProcess[.qsp.use `handle`mode`target!(`:localhost:4000; `function; `publish)]

Example docker-compose.yml:

version: "3.6"

services:
  zookeeper:
    image: "bitnami/zookeeper:latest"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: "bitnami/kafka:latest"
    depends_on:
      - zookeeper
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_TLS_TYPE=JKS
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SSL,CLIENT:SSL
      - KAFKA_CFG_LISTENERS=INTERNAL://:9093,CLIENT://:9092
      - KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
      - KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka:9093,CLIENT://kafka:9092
      - KAFKA_CERTIFICATE_PASSWORD=iamsecure
    volumes:
      - ./keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro
      - ./truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro
  controller:
    image: registry.dl.kx.com/kxi-sp-controller:1.8.1
    environment:
      - KDB_LICENSE_B64
    command: ["-p", "6000"]
  worker:
    image: registry.dl.kx.com/kxi-sp-worker:1.8.1
    environment:
      - KXI_SP_SPEC=/opt/kx/app/data/spec.q
      - KXI_SP_KAFKA_BROKERS=kafka:9092
      - KXI_SP_KAFKA_SSL_CA_LOCATION=/opt/kx/app/data/ca-cert.pem
      - KXI_SP_KAFKA_SSL_KEY_LOCATION=/opt/kx/app/data/key.pem
      - KXI_SP_KAFKA_SSL_CERT_LOCATION=/opt/kx/app/data/cert.pem
      - KXI_SP_KAFKA_SSL_KEY_PASSWORD=iamsecure
    volumes:
      - .:/opt/kx/app/data
    command: ["-p", "5000"]

To read a topic trades using TLS, a Kubernetes TLS secret can be used to load the client certificate and key into a Worker.

Create a secret named kafka-certs.

kubectl create secret tls kafka-certs --cert=cert.pem --key=key.pem

Optionally create a secret for the certificate password.

kubectl create secret generic kafka-pass --from-literal=password=iamsecure

Example Kafka streaming program.

.qsp.run
    .qsp.read.fromKafka[.qsp.use (!) . flip (
        (`brokers                  ; "kafka:9092");
        (`topic                    ; "trades");
        (`options; (!) . flip (
            (`security.protocol        ; "SSL");
            (`ssl.certificate.location ; .qsp.configPath["kafka-certs"],"/tls.crt");
            (`ssl.key.location         ; .qsp.configPath["kafka-certs"],"/tls.key");
            (`ssl.key.password         ; "c"$read1 hsym `$.qsp.configPath["kafka-pass"],"/password"))))]
    .qsp.decode.json[]
    .qsp.map[{ flip `time`sym`bid`ask!"PSff"$'flip x }]
    .qsp.window.timer[00:00:10]
    .qsp.write.toProcess[.qsp.use `handle`mode`target!(`:localhost:4000; `function; `publish)]

Deploy the program. The Stream Processor Coordinator must be deployed and accessible.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "kafka",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets: ["kafka-certs", "kafka-pass"] }
    }' | jq -asR .)"

Turn off certificate verification for self-signed certificates

When using self-signed certificates the CA root will not be installed in the worker image. This will result in a certificate verification failure. In this case, KXI_SP_KAFKA_SSL_VERIFY_CERTIFICATES must be set to "false". This is not recommended for production installations.

This can be added to the deployment request under an env field.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "kafka",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets: ["kafka-certs", "kafka-pass"] },
        env        : { KXI_SP_KAFKA_SSL_VERIFY_CERTIFICATES: "false" }
    }' | jq -asR .)"