Kafka with TLS
Deploy a Kafka reader with TLS certificates
The TLS setup page showed how to create the keys and certificates for configuring the broker and SP client. The next step is to deploy Kafka and a Stream Processor pipeline.
To read from a topic trades
using TLS, the certificates above can be mounted into a
Worker and loaded by the Stream Processor. The following is an example program specification
that could be used to ingest from Kafka.
Example program called spec.q
:
.qsp.run
.qsp.read.fromKafka[.qsp.use (!) . flip (
(`brokers ; "kafka:9092");
(`topic ; "trades");
(`options; (!) . flip (
(`security.protocol ; "SSL");
(`ssl.ca.location ; "/certs/ca-cert.pem");
(`ssl.certificate.location ; "/certs/cert.pem");
(`ssl.key.location ; "/certs/key.pem");
(`ssl.key.password ; "iamsecure"))))]
.qsp.decode.json[]
.qsp.map[{ flip `time`sym`bid`ask!"PSff"$'flip x }]
.qsp.window.timer[00:00:10]
.qsp.write.toProcess[.qsp.use `handle`mode`target!(`:localhost:4000; `function; `publish)]
Example docker-compose.yml
:
version: "3.6"
services:
zookeeper:
image: "bitnami/zookeeper:latest"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: "bitnami/kafka:latest"
depends_on:
- zookeeper
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_TLS_TYPE=JKS
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SSL,CLIENT:SSL
- KAFKA_CFG_LISTENERS=INTERNAL://:9093,CLIENT://:9092
- KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
- KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka:9093,CLIENT://kafka:9092
- KAFKA_CERTIFICATE_PASSWORD=iamsecure
volumes:
- ./keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro
- ./truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro
controller:
image: portal.dl.kx.com/kxi-sp-controller:1.13.0
environment:
- KDB_LICENSE_B64
command: ["-p", "6000"]
worker:
image: portal.dl.kx.com/kxi-sp-worker:1.13.0
environment:
- KXI_SP_SPEC=/opt/kx/app/data/spec.q
- KXI_SP_KAFKA_BROKERS=kafka:9092
- KXI_SP_KAFKA_SSL_CA_LOCATION=/opt/kx/app/data/ca-cert.pem
- KXI_SP_KAFKA_SSL_KEY_LOCATION=/opt/kx/app/data/key.pem
- KXI_SP_KAFKA_SSL_CERT_LOCATION=/opt/kx/app/data/cert.pem
- KXI_SP_KAFKA_SSL_KEY_PASSWORD=iamsecure
volumes:
- .:/opt/kx/app/data
command: ["-p", "5000"]
To read a topic trades
using TLS, a
Kubernetes TLS secret
can be used to load the client certificate and key into a Worker.
Create a secret named kafka-certs
.
kubectl create secret tls kafka-certs --cert=cert.pem --key=key.pem
Optionally create a secret for the certificate password.
kubectl create secret generic kafka-pass --from-literal=password=iamsecure
Example Kafka streaming program.
.qsp.run
.qsp.read.fromKafka[.qsp.use (!) . flip (
(`brokers ; "kafka:9092");
(`topic ; "trades");
(`options; (!) . flip (
(`security.protocol ; "SSL");
(`ssl.certificate.location ; .qsp.configPath["kafka-certs"],"/tls.crt");
(`ssl.key.location ; .qsp.configPath["kafka-certs"],"/tls.key");
(`ssl.key.password ; "c"$read1 hsym `$.qsp.configPath["kafka-pass"],"/password"))))]
.qsp.decode.json[]
.qsp.map[{ flip `time`sym`bid`ask!"PSff"$'flip x }]
.qsp.window.timer[00:00:10]
.qsp.write.toProcess[.qsp.use `handle`mode`target!(`:localhost:4000; `function; `publish)]
Deploy the program. The Stream Processor Coordinator must be deployed and accessible.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "kafka",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
kubeConfig : { secrets: ["kafka-certs", "kafka-pass"] }
}' | jq -asR .)"
Turn off certificate verification for self-signed certificates
When using self-signed certificates the CA root will not be installed in the worker image.
This will result in a certificate verification failure. In this case,
KXI_SP_KAFKA_SSL_VERIFY_CERTIFICATES
must be set to "false"
. This is not recommended
for production installations.
This can be added to the deployment request under an env
field.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "kafka",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
kubeConfig : { secrets: ["kafka-certs", "kafka-pass"] },
env : { KXI_SP_KAFKA_SSL_VERIFY_CERTIFICATES: "false" }
}' | jq -asR .)"