Kafka with TLS
Deploy a Kafka reader with TLS certificates
For more information about TLS such as creating certificates and configuring a Kafka broker, refer to the Kafka with TLS tutorial.
To read from a topic trades using TLS, the certificates above can be mounted into a
Worker and loaded by the Stream Processor. The following is an example program specification
that could be used to ingest from Kafka.
Example program called spec.q:
.qsp.run
    .qsp.read.fromKafka[.qsp.use (!) . flip (
        (`brokers                  ; "kafka:9092");
        (`topic                    ; "trades");
        (`options; (!) . flip (
            (`security.protocol        ; "SSL");
            (`ssl.ca.location          ; "/certs/ca-cert.pem");
            (`ssl.certificate.location ; "/certs/cert.pem");
            (`ssl.key.location         ; "/certs/key.pem");
            (`ssl.key.password         ; "iamsecure"))))]
    .qsp.decode.json[]
    .qsp.map[{ flip `time`sym`bid`ask!"PSff"$'flip x }]
    .qsp.window.timer[00:00:10]
    .qsp.write.toProcess[.qsp.use `handle`mode`target!(`:localhost:4000; `function; `publish)]
Example docker-compose.yml:
version: "3.6"
services:
  zookeeper:
    image: "bitnami/zookeeper:latest"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: "bitnami/kafka:latest"
    depends_on:
      - zookeeper
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_TLS_TYPE=JKS
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SSL,CLIENT:SSL
      - KAFKA_CFG_LISTENERS=INTERNAL://:9093,CLIENT://:9092
      - KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
      - KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka:9093,CLIENT://kafka:9092
      - KAFKA_CERTIFICATE_PASSWORD=iamsecure
    volumes:
      - ./keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro
      - ./truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro
  controller:
    image: registry.dl.kx.com/kxi-sp-controller:1.6.1
    environment:
      - KDB_LICENSE_B64
    command: ["-p", "6000"]
  worker:
    image: registry.dl.kx.com/kxi-sp-worker:1.6.1
    environment:
      - KXI_SP_SPEC=/opt/kx/app/data/spec.q
      - KXI_SP_KAFKA_BROKERS=kafka:9092
      - KXI_SP_KAFKA_SSL_CA_LOCATION=/opt/kx/app/data/ca-cert.pem
      - KXI_SP_KAFKA_SSL_KEY_LOCATION=/opt/kx/app/data/key.pem
      - KXI_SP_KAFKA_SSL_CERT_LOCATION=/opt/kx/app/data/cert.pem
      - KXI_SP_KAFKA_SSL_KEY_PASSWORD=iamsecure
    volumes:
      - .:/opt/kx/app/data
    command: ["-p", "5000"]
To read a topic trades using TLS, a
Kubernetes TLS secret
can be used to load the client certificate and key into a Worker.
Create a secret named kafka-certs.
kubectl create secret tls kafka-certs --cert=cert.pem --key=key.pem
Optionally create a secret for the certificate password.
kubectl create secret generic kafka-pass --from-literal=password=iamsecure
Example Kafka streaming program.
.qsp.run
    .qsp.read.fromKafka[.qsp.use (!) . flip (
        (`brokers                  ; "kafka:9092");
        (`topic                    ; "trades");
        (`options; (!) . flip (
            (`security.protocol        ; "SSL");
            (`ssl.certificate.location ; .qsp.configPath["kafka-certs"],"/tls.crt");
            (`ssl.key.location         ; .qsp.configPath["kafka-certs"],"/tls.key");
            (`ssl.key.password         ; "c"$read1 hsym `$.qsp.configPath["kafka-pass"],"/password"))))]
    .qsp.decode.json[]
    .qsp.map[{ flip `time`sym`bid`ask!"PSff"$'flip x }]
    .qsp.window.timer[00:00:10]
    .qsp.write.toProcess[.qsp.use `handle`mode`target!(`:localhost:4000; `function; `publish)]
Deploy the program. The Stream Processor Coordinator must be deployed and accessible.
curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "kafka",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets: ["kafka-certs", "kafka-pass"] }
    }' | jq -asR .)"
Turn off certificate verification for self-signed certificates
When using self-signed certificates the CA root will not be installed in the worker image.
This will result in a certificate verification failure. In this case,
KXI_SP_KAFKA_SSL_VERIFY_CERTIFICATES must be set to "false". This is not recommended
for production installations.
This can be added to the deployment request under an env field.
curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "kafka",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets: ["kafka-certs", "kafka-pass"] },
        env        : { KXI_SP_KAFKA_SSL_VERIFY_CERTIFICATES: "false" }
    }' | jq -asR .)"