Kubernetes
Kubernetes based workflow
This quickstart follows closely the Stream Processor quickstart guide for deployment of an Stream Processor specification to Kubernetes. Before running follow the setup for Kubernetes provided with the Stream Processor documentation. This will result in the generation of a Stream Processor Coordinator service running on your Kubernetes cluster, ready to accept jobs.
Note
This example assumes a Kafka broker has been set up, accessible at kafka:9092
, along with a data
topic. Data will be sent to the broker within the example using standard Kafka command line tools.
1) Port-forward the Coordinator for local REST access
kubectl port-forward sp-coordinator-kxi-sp-0 5000
This command will continue to run. In another terminal, confirm that the Coordinator can be accessed by querying for running pipelines
curl localhost:5000/pipelines
[]
The return of an empty array []
indicates that no pipelines are currently registered.
2) Create a pipeline specification
Create a specification containing machine learning functionality that should be run.
This pipeline operates as follows:
- Subscribe to a Kafka topic
data
- Decode the incoming JSON data.
- Fit a sequential K-Means clustering model on the first 20 data points provided and make predictions on all subsequent batches evolving the cluster centers over time.
- Publishes the results to the console.
// spec.q
.qsp.run
.qsp.read.fromKafka[(!) . flip (
(`topic ; `data);
(`brokers ; "kafka:9092"))]
.qsp.map[.j.k]
.qsp.ml.sequentialKMeans[
`x`y`z;
.qsp.use (!) . flip (
(`k; 5);
(`bufferSize; 20))]
.qsp.write.toConsole[]
// spec.py
import kxi.sp as sp
import pykx as kx
sp.run(sp.read.from_kafka('data', 'kafka:9092')
| sp.map(kx.q('.j.k'))
| sp.ml.sequential_k_means(['x', 'x1', 'x2'], centroids=5, buffer_size=10)
| sp.write.to_console())
3) Deploy the job using the SP ML Worker image
To deploy the program to the cluster issue a REST call to /pipeline/create on the Coordinator.
Note
To change the below curl
to publish the Python specification rather than the q specification defined below a user must
- Change
spec.q
tospec.py
in the below command - Change the base image from
q-ml
topy-ml
jobname=$(curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "ml-example",
type : "spec",
base : "q-ml",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "1" }
}' | jq -asR .)" | jq -r .id)
The above call will start the streaming job with 1 worker. The Coordinator will create all necessary Kubernetes resources and workloads to run the job
4) Get pipeline status from REST
To know whether the pipeline is up and running, query the /pipeline/status/
curl "localhost:5000/pipeline/status/$jobname"
{"state":"RUNNING","error":""}
On receipt of this message the pipeline can now be interacted with in line with the stream-processor more generally.
5) Send some data to the Workers
If the Kafka trade topic is not hooked up to a producer, send some data through the topic using the Kafka console producer. This example expects data with the following JSON format to be published to the topic data
.
cat data.json
{ x: 0.1, y: 0.2, z:100 }
Publish to the trades data
kafka-console-producer.sh --broker-list kafka:9092 --topic data < data.json
6) Teardown pipeline
To remove the running pipeline, issue a REST call to /pipeline/teardown on the Coordinator.
curl -X POST "localhost:5000/pipeline/teardown/$jobname"