Kafka with TLS
Deploy a Kafka reader with TLS certificates
For more information about TLS such as creating certificates and configuring a Kafka broker, refer to the Kafka with TLS tutorial.
To read from a topic trades
using TLS, the certificates above can be mounted into a
Worker and loaded by the Stream Processor. The following is an example program specification
that could be used to ingest from Kafka.
Example program called spec.q
:
.qsp.run
.qsp.read.fromKafka[.qsp.use (!) . flip (
(`brokers ; "kafka:9092");
(`topic ; "trades");
(`options; (!) . flip (
(`security.protocol ; "SSL");
(`ssl.ca.location ; "/certs/ca-cert.pem");
(`ssl.certificate.location ; "/certs/cert.pem");
(`ssl.key.location ; "/certs/key.pem");
(`ssl.key.password ; "iamsecure"))))]
.qsp.decode.json[]
.qsp.map[{ flip `time`sym`bid`ask!"PSff"$'flip x }]
.qsp.window.timer[00:00:10]
.qsp.write.toProcess[.qsp.use `handle`mode`target!(`:localhost:4000; `function; `publish)]
Example docker-compose.yml
:
version: "3.6"
services:
zookeeper:
image: "bitnami/zookeeper:latest"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: "bitnami/kafka:latest"
depends_on:
- zookeeper
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_TLS_TYPE=JKS
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SSL,CLIENT:SSL
- KAFKA_CFG_LISTENERS=INTERNAL://:9093,CLIENT://:9092
- KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
- KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka:9093,CLIENT://kafka:9092
- KAFKA_CERTIFICATE_PASSWORD=iamsecure
volumes:
- ./keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro
- ./truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro
controller:
image: registry.dl.kx.com/kxi-sp-controller:1.8.1
environment:
- KDB_LICENSE_B64
command: ["-p", "6000"]
worker:
image: registry.dl.kx.com/kxi-sp-worker:1.8.1
environment:
- KXI_SP_SPEC=/opt/kx/app/data/spec.q
- KXI_SP_KAFKA_BROKERS=kafka:9092
- KXI_SP_KAFKA_SSL_CA_LOCATION=/opt/kx/app/data/ca-cert.pem
- KXI_SP_KAFKA_SSL_KEY_LOCATION=/opt/kx/app/data/key.pem
- KXI_SP_KAFKA_SSL_CERT_LOCATION=/opt/kx/app/data/cert.pem
- KXI_SP_KAFKA_SSL_KEY_PASSWORD=iamsecure
volumes:
- .:/opt/kx/app/data
command: ["-p", "5000"]
To read a topic trades
using TLS, a
Kubernetes TLS secret
can be used to load the client certificate and key into a Worker.
Create a secret named kafka-certs
.
kubectl create secret tls kafka-certs --cert=cert.pem --key=key.pem
Optionally create a secret for the certificate password.
kubectl create secret generic kafka-pass --from-literal=password=iamsecure
Example Kafka streaming program.
.qsp.run
.qsp.read.fromKafka[.qsp.use (!) . flip (
(`brokers ; "kafka:9092");
(`topic ; "trades");
(`options; (!) . flip (
(`security.protocol ; "SSL");
(`ssl.certificate.location ; .qsp.configPath["kafka-certs"],"/tls.crt");
(`ssl.key.location ; .qsp.configPath["kafka-certs"],"/tls.key");
(`ssl.key.password ; "c"$read1 hsym `$.qsp.configPath["kafka-pass"],"/password"))))]
.qsp.decode.json[]
.qsp.map[{ flip `time`sym`bid`ask!"PSff"$'flip x }]
.qsp.window.timer[00:00:10]
.qsp.write.toProcess[.qsp.use `handle`mode`target!(`:localhost:4000; `function; `publish)]
Deploy the program. The Stream Processor Coordinator must be deployed and accessible.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "kafka",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
kubeConfig : { secrets: ["kafka-certs", "kafka-pass"] }
}' | jq -asR .)"
Turn off certificate verification for self-signed certificates
When using self-signed certificates the CA root will not be installed in the worker image.
This will result in a certificate verification failure. In this case,
KXI_SP_KAFKA_SSL_VERIFY_CERTIFICATES
must be set to "false"
. This is not recommended
for production installations.
This can be added to the deployment request under an env
field.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "kafka",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
kubeConfig : { secrets: ["kafka-certs", "kafka-pass"] },
env : { KXI_SP_KAFKA_SSL_VERIFY_CERTIFICATES: "false" }
}' | jq -asR .)"