Kafka with TLS
Deploy a Kafka reader with TLS certificates
For more information about TLS such as creating certificates and configuring a Kafka broker, refer to the Kafka with TLS tutorial.
To read a topic trades
using TLS, a
Kubernetes TLS secret
can be used to load the client certificate and key into a Worker.
Create a secret named kafka-certs
.
kubectl create secret tls kafka-certs --cert=cert.pem --key=key.pem
Optionally create a secret for the certificate password.
kubectl create secret generic kafka-pass --from-literal=password=iamsecure
Example Kafka streaming program.
.qsp.run
.qsp.read.fromKafka[.qsp.use (!) . flip (
(`brokers ; "kafka:9092");
(`topic ; "trades");
(`options; (!) . flip (
(`security.protocol ; "SSL");
(`ssl.certificate.location ; .qsp.configPath["kafka-certs"],"/tls.crt");
(`ssl.key.location ; .qsp.configPath["kafka-certs"],"/tls.key");
(`ssl.key.password ; "c"$read1 hsym `$.qsp.configPath["kafka-pass"],"/password"))))]
.qsp.decode.json[]
.qsp.map[{ flip `time`sym`bid`ask!"PSff"$'flip x }]
.qsp.window.timer[00:00:10]
.qsp.write.toProcess[.qsp.use `handle`mode`target!(`:localhost:4000; `function; `publish)]
Deploy the program. The Stream Processor Coordinator must be deployed and accessible.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "kafka",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
kubeConfig : { secrets: ["kafka-certs", "kafka-pass"] }
}' | jq -asR .)"
Turn off certificate verification for self-signed certificates
When using self-signed certificates the CA root will not be installed in the worker image.
This will result in a certificate verification failure. In this case,
KXI_SP_KAFKA_SSL_VERIFY_CERTIFICATES
must be set to "false"
. This is not recommended
for production installations.
This can be added to the deployment request under an env
field.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "kafka",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
kubeConfig : { secrets: ["kafka-certs", "kafka-pass"] },
env : { KXI_SP_KAFKA_SSL_VERIFY_CERTIFICATES: "false" }
}' | jq -asR .)"