Kafka with TLS
Deploy a Kafka reader with TLS certificates
For more information about TLS such as creating certificates and configuring a Kafka broker, refer to the Kafka with TLS tutorial.
To read a topic trades using TLS, a
Kubernetes TLS secret
can be used to load the client certificate and key into a Worker.
Create a secret named kafka-certs.
kubectl create secret tls kafka-certs --cert=cert.pem --key=key.pem
Optionally create a secret for the certificate password.
kubectl create secret generic kafka-pass --from-literal=password=iamsecure
Example Kafka streaming program.
.qsp.run
    .qsp.read.fromKafka[.qsp.use (!) . flip (
        (`brokers                  ; "kafka:9092");
        (`topic                    ; "trades");
        (`options; (!) . flip (
            (`security.protocol        ; "SSL");
            (`ssl.certificate.location ; .qsp.configPath["kafka-certs"],"/tls.crt");
            (`ssl.key.location         ; .qsp.configPath["kafka-certs"],"/tls.key");
            (`ssl.key.password         ; "c"$read1 hsym `$.qsp.configPath["kafka-pass"],"/password"))))]
    .qsp.decode.json[]
    .qsp.map[{ flip `time`sym`bid`ask!"PSff"$'flip x }]
    .qsp.window.timer[00:00:10]
    .qsp.write.toProcess[.qsp.use `handle`mode`target!(`:localhost:4000; `function; `publish)]
Deploy the program. The Stream Processor Coordinator must be deployed and accessible.
curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "kafka",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets: ["kafka-certs", "kafka-pass"] }
    }' | jq -asR .)"
Turn off certificate verification for self-signed certificates
When using self-signed certificates the CA root will not be installed in the worker image.
This will result in a certificate verification failure. In this case,
KXI_SP_KAFKA_SSL_VERIFY_CERTIFICATES must be set to "false". This is not recommended
for production installations.
This can be added to the deployment request under an env field.
curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "kafka",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets: ["kafka-certs", "kafka-pass"] },
        env        : { KXI_SP_KAFKA_SSL_VERIFY_CERTIFICATES: "false" }
    }' | jq -asR .)"