Docker quickstart
An end-to-end workflow example with the Stream Processor using a Docker deployment
Pull the Stream Processor images
Pull down the kxi-sp-controller
and kxi-sp-worker
images required to run the Stream Processor.
docker login portal.dl.kx.com -u username -p password
Create a pipeline specification
Create a specification of the stream analytic pipeline to run.
The pipeline below will
- listen for data coming into
upd
(a function which will be created at startup of the pipeline) by using the callback reader - window incoming data into five-second periods
- store the maximum value from the window in a named operator
The value will also be written to the console logs.
spec.q
:
.qsp.run
.qsp.read.fromCallback[`upd] /read data from the upd function
.qsp.window.timer[00:00:05] /buffer rcvd data into 5-second buckets
.qsp.map[{[op;md;data]
.qsp.set[op;md] exec max val from data /store and emit max value for each window
}; .qsp.use`name`state!(`maxval; 0f)] /name this stateful operator for reference later
.qsp.write.toConsole[] /write output to console for debugging
Create a Docker Compose file
Wrap the kxi-sp-controller
and kxi-sp-worker
images into a single Docker Compose file. This is a starter for this example that can be customized as needed. Alternatively, you can use Docker directly, or with any other image orchestration.
In this example, the Worker uses an ephemeral host port binding. This is to avoid host port conflicts when scaling later.
docker-compose.yaml
:
version: "3.3"
services:
controller:
image: portal.dl.kx.com/kxi-sp-controller:1.10.0
ports:
- 6000:6000
environment:
- KDB_LICENSE_B64 # Which kdb+ license to use, see note below
command: ["-p", "6000"]
worker:
image: portal.dl.kx.com/kxi-sp-worker:1.10.0
ports:
- 5000
volumes:
- .:/app # Bind in the spec.q file
environment:
- KXI_SP_SPEC=/app/spec.q # Point to the bound spec.q file
- KXI_SP_PARENT_HOST=controller:6000 # Point to the parent Controller
- KDB_LICENSE_B64
command: ["-p", "5000"]
Provide a license
A license for kdb+ Cloud Edition is required and is provided through the environment variable KDB_LICENSE_B64
. It can be generated from a valid kc.lic
file with base64 encoding. In a *nix based system, we can create the environment variable with the following command.
export KDB_LICENSE_B64=$(base64 path-to/kc.lic)
The kc.lic
used must be for kdb+ Cloud Edition. A regular kc.lic
for On-Demand kdb+ will signal a licensing error during startup.
Start the containers
$ docker-compose up -d
Get pipeline status from REST
To know whether the pipeline is up and running, query the /status
endpoint on the Controller.
curl localhost:6000/status
"RUNNING"
Send some data to the Worker
Now the pipeline is up and running, find the port of the Worker. The advertised port on the host is available in the 0.0.0.0:59978
section below. Since this binding is dynamic in this example, the port here may be different than yours. If connecting to the Worker from within Docker Compose, the address would be example_worker_1:5000
. Alternatively, the host port binding could be preconfigured in the Docker Compose file.
docker ps
CONTAINER ID ... STATUS PORTS NAMES
54972266d382 ... Up About a minute 0.0.0.0:59978->5000/tcp example_worker_1
4f0bb8f22086 ... Up About a minute 0.0.0.0:6000->6000/tcp, :::6000->6000/tcp example_controller_1
With the port of the Worker, connect a q process and send it some data at a few different timestamps to see the output written to the console.
h: hopen 59978
neg[h](`upd; ([] timestamp: .z.p; val: 5?1f))
neg[h](`upd; ([] timestamp: .z.p; val: 5?1f))
neg[h](`upd; ([] timestamp: .z.p; val: 5?1f))
Stop the pipeline
After seeing the output, stop the pipeline by stopping the Docker Compose so that the pipeline can be scaled up.
docker-compose stop
Scale up to multiple Workers
Now, to run the same example as before with more Workers, set the expected Worker count in the Docker Compose file.
controller:
..
environment:
..
- KXI_SP_MIN_WORKERS=5
..
When set, start up the Docker Compose in the same way as before, but scale the Workers to five instances.
docker-compose up -d --scale worker=5
To see the Workers’ status, use the REST API on the Controller. Additionally, tools such as jq
could be used to format the output for readability.
curl localhost:6000/workers | jq
[
{
"id": null,
"name": "worker-9cb142671cc9",
"address": "55c0b30757bc:5000",
"partitions": null,
"startTime": "2021-06-22T22:52:53.436Z"
},
{
"id": null,
"name": "worker-e5c9489a95f0",
"address": "d12413705efc:5000",
"partitions": null,
"startTime": "2021-06-22T22:52:57.074Z"
},
..
]
Send some more data
With the scaled pipeline up and running, use the same method as above (docker ps
) to select one of the Workers to send data to.
Selecting workers
Since this example uses the callback reader
, Workers must be selected logically externally to the pipeline. If using a reader with a partition abstraction, such as Kafka, the Controller will assign partitions to readers automatically.
h1:hopen 50762
h2:hopen 50765
h3:hopen 50769
h4:hopen 50950
h5:hopen 51230
neg[h1](`upd; ([] timestamp: .z.p; part: 0i; val: 5?1f))
neg[h2](`upd; ([] timestamp: .z.p; part: 1i; val: 5?1f))
neg[h3](`upd; ([] timestamp: .z.p; part: 2i; val: 5?1f))
neg[h4](`upd; ([] timestamp: .z.p; part: 4i; val: 5?1f))
neg[h5](`upd; ([] timestamp: .z.p; part: 5i; val: 5?1f))
Inspect aggregated performance
Performance metrics for each individual Worker, and aggregated metrics for the scaled pipeline as a whole, can be obtained through the REST API on the Controller. The reported metrics are normalized to per-second values.
curl localhost:6000/metrics | jq
[
{
"name": "worker-9cb142671cc9",
"ts": "2021-06-22T23:07:32.343Z",
"eventRate": 6.00009, # events measured as count of each batch sent
"bytesRate": 105.8016, # bytes per second processed
"latency": 0.2131 # processing latency, in milliseconds
},
..
{
"name": "_total",
"ts": "2021-06-22T23:07:34.213Z",
"eventRate": 6.00009,
"bytesRate": 105.8016,
"latency": 0.2131
}
]
Query state
With the scaled pipeline running, and some data in one worker, use the REST API or q IPC to get the state of the Worker.
$ curl localhost:50762/state/maxval
0.7148779
q)`::50762 (`.qsp.get; `maxval; ::)
0.7148779
Next steps
- Learn more about the declarative pipeline API through operators.
- Examine other data sources and sinks in readers and writers, and operators.
- More configuration options
- More ways to run