Kubernetes quickstart
An end-to-end workflow example with the Stream Processor using a Kubernetes deployment
Note
This example assumes a Kafka broker has been set up, accessible at kafka:9092
, along with a trades
topic. Data will be sent to the broker within the example using standard Kafka command line tools. For more information about setting up an example broker, refer to the Kafka tutorial.
Note
This example also assumes a kdb+ Tickerplant is running within the cluster, accessible at tp:5000
, configured to accept a trade
table with the following columns: time: timestamp
, sym: symbol
, price: float
, and size: long
.
Before running, follow the setup for Kubernetes. This will result in the Stream Processor Coordinator service running in your Kubernetes cluster, ready to accept jobs.
1) Port-forward the Coordinator for local REST access
kubectl port-forward sp-kxi-sp-0 5000
This command will continue to run. In another terminal, confirm that the Coordinator can be accessed by querying for running pipelines.
curl localhost:5000/pipelines
[]
The empty array []
indicates that no pipelines are currently registered.
2) Create a pipeline specification
Create a specification of the stream analytic pipeline that should be run.
This pipeline subscribes to a Kafka topic trades
, and decodes the incoming data from JSON (in this example, JSON dictionary events are expected). Decoded data will be batched into 5-second event-time based micro-batches to increase decoding efficiency of the downstream Tickerplant and database/RDB. When emitted, each micro-batch is converted to the kdb+ schema expected to be received by the Tickerplant. Finally, each micro-batch is written to the Tickerplant for logging and further publishing to downstream RDBs.
// spec.q
.qsp.run
.qsp.read.fromKafka[.qsp.use (!) . flip ( // read data from the `trades` topic
(`topic ; `trades);
(`brokers ; "kafka:9092");
(`offset ; `end))]
.qsp.map[.j.k]
.qsp.map[{ update "P"$time from enlist`time`sym`price`size!x }] // convert record to table to batch
.qsp.window.tumbling[00:00:05; `time] // buffer received data into 5 second batches
.qsp.map[{ value flip update `$sym, "j"$size from x }] // convert types to kdb+ schema
.qsp.write.toProcess[.qsp.use (!) . flip ( // write micro-batch out to kdb+ tickerplant
(`handle ; `:tp:5000);
(`target ; `.u.upd);
(`params ; enlist `trade))]
3) Deploy the job
To deploy the streaming program into the cluster, issue a REST call to /pipeline/create
on the Coordinator.
jobname=$(curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "kafka-in",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" }
}' | jq -asR .)" | jq -r .id)
The above call will start the streaming job, and negotiate with the Kafka brokers for the optimal number of Workers to deploy, based on the number of partitions in the topic. If the topic has more than 10 partitions, 10 Workers (maxWorkers
) will be created, each being assigned multiple partitions. If there are less than 10 partitions, 1 Worker will be created per partition, down to a minimum of 1 (minWorkers
).
The Coordinator will create all necessary Kubernetes resources and workloads to run the job.
4) Get pipeline status from REST
To know whether the pipeline is up and running, query the /pipeline/status/<id>
endpoint on the Coordinator.
curl "localhost:5000/pipeline/status/$jobname"
{"state":"RUNNING","error":""}
5) Send some data to the Workers
If the Kafka trade
topic is not already hooked up to a producer, send some data through the topic using the Kafka console producer. This example has data with the following JSON format being published to the topic.
cat trade.json
{ "time": "2021.01.01D09:00:00", sym: "ABCD", price: 10.00, size: 100 }
kafka-console-producer.sh --broker-list kafka:9092 --topic trades < trade.json
6) Query state
With data now flowing from Kafka through the Stream Processor and into downstream RDBs, the data is available for query using regular RDB query logic.
7) Teardown pipeline
To remove the running pipeline, issue a REST call to /pipeline/teardown
on the Coordinator.
curl -X POST "localhost:5000/pipeline/teardown/$jobname"
8) Next steps
From here, learn more about the declarative pipeline API through operators, examine other data sources and sinks in readers and writers, or learn more information on running or configuring pipelines.