Skip to content


Kubernetes based workflow

This quickstart follows closely the Stream Processor quickstart guide for deployment of an Stream Processor specification to Kubernetes. Before running follow the setup for Kubernetes provided with the Stream Processor documentation. This will result in the generation of a Stream Processor Coordinator service running on your Kubernetes cluster, ready to accept jobs.


This example assumes a Kafka broker has been set up, accessible at `kafka:9092`, along with a `data` topic. Data will be sent to the broker within the example using standard Kafka command line tools.

1) Port-forward the Coordinator for local REST access

kubectl port-forward sp-coordinator-kxi-sp-0 5000

This command will continue to run. In another terminal, confirm that the Coordinator can be accessed by querying for running pipelines

curl localhost:5000/pipelines


The return of an empty array [] indicates that no pipelines are currently registered.

2) Create a pipeline specification

Create a specification containing machine learning functionality that should be run.

This pipeline operates as follows:

  1. Subscribe to a Kafka topic data
  2. Decode the incoming JSON data.
  3. Fit a sequential K-Means clustering model on the first 20 data points provided and make predictions on all subsequent batches evolving the cluster centers over time.
  4. Publishes the results to the console.
// spec.q[(!) . flip (
        (`topic   ; `data);
        (`brokers ; "kafka:9092"))][.j.k][
        .qsp.use (!) . flip (
            (`k; 5);
            (`bufferSize; 20))]


import kxi.sp as sp
import pykx as kx'data', 'kafka:9092')
    |['x', 'x1', 'x2'], centroids=5, buffer_size=10)
    | sp.write.to_console())

3) Deploy the job using the SP ML Worker image

To deploy the program to the cluster issue a REST call to /pipeline/create on the Coordinator.


To change the below curl to publish the Python specification rather than the q specification defined below a user must

  1. Change spec.q to in the below command
  2. Change the base image from q-ml to py-ml
jobname=$(curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
        name     : "ml-example",
        type     : "spec",
        base     : "q-ml",
        config   : { content: $spec },
        settings : { minWorkers: "1", maxWorkers: "1" }
    }' | jq -asR .)" | jq -r .id)

The above call will start the streaming job with 1 worker. The Coordinator will create all necessary Kubernetes resources and workloads to run the job

4) Get pipeline status from REST

To know whether the pipeline is up and running, query the /pipeline/status/ endpoint on the Coordinator.

curl "localhost:5000/pipeline/status/$jobname"


On receipt of this message the pipeline can now be interacted with in line with the stream-processor more generally.

5) Send some data to the Workers

If the Kafka trade topic is not hooked up to a producer, send some data through the topic using the Kafka console producer. This example expects data with the following JSON format to be published to the topic data.

cat data.json

{ x: 0.1, y: 0.2, z:100 }

Publish to the trades data --broker-list kafka:9092 --topic data < data.json

6) Teardown pipeline

To remove the running pipeline, issue a REST call to /pipeline/teardown on the Coordinator.

curl -X POST "localhost:5000/pipeline/teardown/$jobname"