Skip to content

Kafka with TLS

Deploy a Kafka reader with TLS certificates

For more information about TLS such as creating certificates and configuring a Kafka broker, refer to the Kafka with TLS tutorial.

To read a topic trades using TLS, a Kubernetes TLS secret can be used to load the client certificate and key into a Worker.

Create a secret named kafka-certs.

kubectl create secret tls kafka-certs --cert=cert.pem --key=key.pem

Optionally create a secret for the certificate password.

kubectl create secret generic kafka-pass --from-literal=password=iamsecure

Example Kafka streaming program.

.qsp.run
    .qsp.read.fromKafka[.qsp.use (!) . flip (
        (`brokers                  ; "kafka:9092");
        (`topic                    ; "trades");
        (`options; (!) . flip (
            (`security.protocol        ; "SSL");
            (`ssl.certificate.location ; .qsp.configPath["kafka-certs"],"/tls.crt");
            (`ssl.key.location         ; .qsp.configPath["kafka-certs"],"/tls.key");
            (`ssl.key.password         ; "c"$read1 hsym `$.qsp.configPath["kafka-pass"],"/password"))))]
    .qsp.decode.json[]
    .qsp.map[{ flip `time`sym`bid`ask!"PSff"$'flip x }]
    .qsp.window.timer[00:00:10]
    .qsp.write.toProcess[.qsp.use `handle`mode`target!(`:localhost:4000; `function; `publish)]

Deploy the program. The Stream Processor Coordinator must be deployed and accessible.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "kafka",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets: ["kafka-certs", "kafka-pass"] }
    }' | jq -asR .)"

Turn off certificate verification for self-signed certificates

When using self-signed certificates the CA root will not be installed in the worker image. This will result in a certificate verification failure. In this case, KXI_SP_KAFKA_SSL_VERIFY_CERTIFICATES must be set to "false". This is not recommended for production installations.

This can be added to the deployment request under an env field.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "kafka",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets: ["kafka-certs", "kafka-pass"] },
        env        : { KXI_SP_KAFKA_SSL_VERIFY_CERTIFICATES: "false" }
    }' | jq -asR .)"