Skip to content

Docker quickstart

An end-to-end workflow example with the Stream Processor using a Docker deployment

Pull the Stream Processor images

Pull down the kxi-sp-controller and kxi-sp-worker images required to run the Stream Processor.

docker login registry.dl.kx.com -u username -p password

Create a pipeline specification

Create a specification of the stream analytic pipeline to run.

The pipeline below will

  • listen for data coming into upd (a function which will be created at startup of the pipeline) by using the callback reader
  • window incoming data into five-second periods
  • store the maximum value from the window in a named operator

The value will also be written to the console logs.

spec.q:

.qsp.run
  .qsp.read.fromCallback[`upd]              /read data from the upd function
  .qsp.window.timer[00:00:05]               /buffer rcvd data into 5-second buckets
  .qsp.map[{[op;md;data]
    .qsp.set[op;md] exec max val from data  /store and emit max value for each window
    }; .qsp.use`name`state!(`maxval; 0f)]   /name this stateful operator for reference later
  .qsp.write.toConsole[]                    /write output to console for debugging

Create a Docker Compose file

Wrap the kxi-sp-controller and kxi-sp-worker images into a single Docker Compose file. This is a starter for this example that can be customized as needed. Alternatively, you can use Docker directly, or with any other image orchestration.

In this example, the Worker uses an ephemeral host port binding. This is to avoid host port conflicts when scaling later.

docker-compose.yaml:

version: "3.3"
services:
  controller:
    image: registry.dl.kx.com/kxi-sp-controller:1.1.0
    ports:
      - 6000:6000
    environment:
      - KDB_LICENSE_B64                        # Which kdb+ license to use, see note below
    command: ["-p", "6000"]

  worker:
    image: registry.dl.kx.com/kxi-sp-worker:1.1.0
    ports:
      - 5000
    volumes:
      - .:/app                                 # Bind in the spec.q file
    environment:
      - KXI_SP_SPEC=/app/spec.q                # Point to the bound spec.q file
      - KXI_SP_PARENT_HOST=controller:6000     # Point to the parent Controller
      - KDB_LICENSE_B64
    command: ["-p", "5000"]

Provide a license

A license for kdb+ Cloud Edition is required and is provided through the environment variable KDB_LICENSE_B64. It can be generated from a valid kc.lic file with base64 encoding. In a *nix based system, we can create the environment variable with the following command.

export KDB_LICENSE_B64=$(base64 path-to/kc.lic)

The kc.lic used must be for kdb+ Cloud Edition. A regular kc.lic for On-Demand kdb+ will signal a licensing error during startup.

Start the containers

$ docker-compose up -d

Get pipeline status from REST

To know whether the pipeline is up and running, query the /status endpoint on the Controller.

curl localhost:6000/status
"RUNNING"

Send some data to the Worker

Now the pipeline is up and running, find the port of the Worker. The advertised port on the host is available in the 0.0.0.0:59978 section below. Since this binding is dynamic in this example, the port here may be different than yours. If connecting to the Worker from within Docker Compose, the address would be example_worker_1:5000. Alternatively, the host port binding could be preconfigured in the Docker Compose file.

docker ps
CONTAINER ID   ...  STATUS              PORTS                                       NAMES
54972266d382   ...  Up About a minute   0.0.0.0:59978->5000/tcp                     example_worker_1
4f0bb8f22086   ...  Up About a minute   0.0.0.0:6000->6000/tcp, :::6000->6000/tcp   example_controller_1

With the port of the Worker, connect a q process and send it some data at a few different timestamps to see the output written to the console.

h: hopen 59978
neg[h](`upd; ([] timestamp: .z.p; val: 5?1f))
neg[h](`upd; ([] timestamp: .z.p; val: 5?1f))
neg[h](`upd; ([] timestamp: .z.p; val: 5?1f))

Stop the pipeline

After seeing the output, stop the pipeline by stopping the Docker Compose so that the pipeline can be scaled up.

docker-compose stop

Scale up to multiple Workers

Now, to run the same example as before with more Workers, set the expected Worker count in the Docker Compose file.

  controller:
    ..
    environment:
      ..
      - KXI_SP_MIN_WORKERS=5
    ..

When set, start up the Docker Compose in the same way as before, but scale the Workers to five instances.

docker-compose up -d --scale worker=5

To see the Workers’ status, use the REST API on the Controller. Additionally, tools such as jq could be used to format the output for readability.

curl localhost:6000/workers | jq
[
  {
    "id": null,
    "name": "worker-9cb142671cc9",
    "address": "55c0b30757bc:5000",
    "partitions": null,
    "startTime": "2021-06-22T22:52:53.436Z"
  },
  {
    "id": null,
    "name": "worker-e5c9489a95f0",
    "address": "d12413705efc:5000",
    "partitions": null,
    "startTime": "2021-06-22T22:52:57.074Z"
  },

 ..
]

Send some more data

With the scaled pipeline up and running, use the same method as above (docker ps) to select one of the Workers to send data to.

Selecting workers

Since this example uses the callback reader, Workers must be selected logically externally to the pipeline. If using a reader with a partition abstraction, such as Kafka, the Controller will assign partitions to readers automatically.

h1:hopen 50762
h2:hopen 50765
h3:hopen 50769
h4:hopen 50950
h5:hopen 51230

neg[h1](`upd; ([] timestamp: .z.p; part: 0i; val: 5?1f))
neg[h2](`upd; ([] timestamp: .z.p; part: 1i; val: 5?1f))
neg[h3](`upd; ([] timestamp: .z.p; part: 2i; val: 5?1f))
neg[h4](`upd; ([] timestamp: .z.p; part: 4i; val: 5?1f))
neg[h5](`upd; ([] timestamp: .z.p; part: 5i; val: 5?1f))

Inspect aggregated performance

Performance metrics for each individual Worker, and aggregated metrics for the scaled pipeline as a whole, can be obtained through the REST API on the Controller. The reported metrics are normalized to per-second values.

curl localhost:6000/metrics | jq
[
  {
    "name": "worker-9cb142671cc9",
    "ts": "2021-06-22T23:07:32.343Z",
    "eventRate": 6.00009,                 # events measured as count of each batch sent
    "bytesRate": 105.8016,                # bytes per second processed
    "latency": 0.2131                     # processing latency, in milliseconds
  },

  ..

  {
    "name": "_total",
    "ts": "2021-06-22T23:07:34.213Z",
    "eventRate": 6.00009,
    "bytesRate": 105.8016,
    "latency": 0.2131
  }
]

Query state

With the scaled pipeline running, and some data in one worker, use the REST API or q IPC to get the state of the Worker.

$ curl localhost:50762/state/maxval
0.7148779
q)`::50762 (`.qsp.get; `maxval; ::)
0.7148779

Next steps