Skip to content

Stream Processor

An example end-to-end workflow is presented here. See here for more configuration options, and here for more ways to run.

1) Pull the Stream Processor images

Pull down the kxi-sp-controller and kxi-sp-worker images required to run the Stream Processor.

docker login registry.dl.kx.com -u username -p password

2) Create a pipeline specification

Create a specification of the stream analytic pipeline that should be run.

The pipeline below will listen for data coming into upd (a function which will be created at startup of the pipeline) by using the callback reader, window incoming data into five second periods, then store the maximum value from the window in a named operator. The value will also be written to the console logs.

// spec.q

.qsp.run
    .qsp.read.fromCallback[`upd]                    // read data from the 'upd' function
    .qsp.window.timer[00:00:05]                     // buffer received data into 5 second buckets
    .qsp.map[{[op;md;data]
        .qsp.set[op;md] exec max val from data      // store and emit the max value for each window
        }; .qsp.use`name`state!(`maxval; 0f)]       // explicitly name this stateful operator for reference later
    .qsp.write.toConsole[]                          // write output to console for debugging

3) Create and run a Docker Compose file

Wrap up the kxi-sp-controller and kxi-sp-worker images into a single Docker Compose file. This is a starter for this example that can be customized as needed. Alternatively, Docker could be used directly, or any other image orchestration. See the running documentation for examples of running just with Docker.

In this example, the Worker uses an ephemeral host port binding. This is to avoid host port conflicts when scaling later. See https://docs.docker.com/compose/compose-file/compose-file-v3/#ports for more information.

# docker-compose.yaml

version: "3.3"
services:
  controller:
    image: registry.dl.kx.com/kxi-sp-controller:0.8.2
    ports:
      - 6000:6000
    environment:
      - KDB_LICENSE_B64                        # Which kdb+ license to use, see note below
    command: ["-p", "6000"]

  worker:
    image: registry.dl.kx.com/kxi-sp-worker:0.8.2
    ports:
      - 5000
    volumes:
      - .:/app                                 # Bind in the spec.q file
    environment:
      - KXI_SP_SPEC=/app/spec.q                # Point to the bound spec.q file
      - KXI_SP_PARENT_HOST=controller:6000     # Point to the parent Controller
      - KDB_LICENSE_B64
    command: ["-p", "5000"]

Set license

A license for kdb+ Cloud Edition is required and is provided through the environment variable $KDB_LICENSE_B64. It can be generated from a valid kc.lic file with base64 encoding. In a unix based system, we can create the environment variable with the following command.

Note

The kc.lic used must be for kdb+ Cloud Edition. A regular kc.lic for On-Demand kdb+ will surface a licensing error during startup.

export KDB_LICENSE_B64=$(base64 path-to/kc.lic)

With the license set, start the containers.

$ docker-compose up -d

4) Get pipeline status from REST

To know whether the pipeline is up and running, query the /status endpoint on the Controller.

$ curl localhost:6000/status
# READY

5) Send some data to the Worker

Now that the pipeline is up and running, find the port of the Worker. The advertised port on the host is available in the 0.0.0.0:59978 section below. Since this binding is dynamic in this example, the port here may be different than yours. If connecting to the Worker from within the Docker Compose, the address would be example_worker_1:5000. Alternatively, the host port binding could be pre-configured in the Docker Compose file.

$ docker ps

CONTAINER ID   ...  STATUS              PORTS                                       NAMES
54972266d382   ...  Up About a minute   0.0.0.0:59978->5000/tcp                     example_worker_1
4f0bb8f22086   ...  Up About a minute   0.0.0.0:6000->6000/tcp, :::6000->6000/tcp   example_controller_1

With the port of the Worker, connect a q process and send it some data at a few different timestamps to see the output written to the console.

$ q
q)h: hopen 59978
q)neg[h](`upd; ([] timestamp: .z.p; val: 5?1f))
q)neg[h](`upd; ([] timestamp: .z.p; val: 5?1f))
q)neg[h](`upd; ([] timestamp: .z.p; val: 5?1f))

After seeing the output, stop the pipeline by stopping the Docker Compose so that the pipeline can be scaled up.

$ docker-compose stop

6) Scale up to multiple Workers

Now, to run the same example as before with more Workers, set the expected Worker count in the Docker Compose file.

  controller:
    ..
    environment:
      ..
      - KXI_SP_MIN_WORKERS=5
    ..

When set, start up the Docker Compose in the same way as before, but scale the Workers to five instances.

$ docker-compose up -d --scale worker=5

To see the Workers status, use the REST API on the Controller. Additionally, tools such as jq could be used to format the output for readability.

$ curl localhost:6000/workers | jq
[
  {
    "id": null,
    "name": "worker-9cb142671cc9",
    "address": "55c0b30757bc:5000",
    "partitions": null,
    "startTime": "2021-06-22T22:52:53.436Z"
  },
  {
    "id": null,
    "name": "worker-e5c9489a95f0",
    "address": "d12413705efc:5000",
    "partitions": null,
    "startTime": "2021-06-22T22:52:57.074Z"
  },

 ..
]

7) Send some more data

With the scaled pipeline up and running, use the same method as above (docker ps) to select one of the Workers to send data to.

Note

Since this example uses the callback reader, Workers must be selected logically externally to the pipeline. If using a reader with a partition abstraction, such as Kafka, the Controller will assign partitions to readers automatically.

$ q
q)h1:hopen 50762
q)h2:hopen 50765
q)h3:hopen 50769
q)h4:hopen 50950
q)h5:hopen 51230
q)
q)neg[h1](`upd; ([] timestamp: .z.p; part: 0i; val: 5?1f))
q)neg[h2](`upd; ([] timestamp: .z.p; part: 1i; val: 5?1f))
q)neg[h3](`upd; ([] timestamp: .z.p; part: 2i; val: 5?1f))
q)neg[h4](`upd; ([] timestamp: .z.p; part: 4i; val: 5?1f))
q)neg[h5](`upd; ([] timestamp: .z.p; part: 5i; val: 5?1f))

8) Inspect aggregated performance

Performance metrics for each individual Worker, and aggregated metrics for the scaled pipeline as a whole can be obtained through the REST API on the Controller. The reported metrics are normalized to 'per-second' values.

$ curl localhost:6000/metrics | jq
[
  {
    "name": "worker-9cb142671cc9",
    "ts": "2021-06-22T23:07:32.343Z",
    "eventRate": 6.00009,                 # events measured as count of each batch sent
    "bytesRate": 105.8016,                # bytes per second processed
    "latency": 0.2131                     # processing latency, in milliseconds
  },

  ..

  {
    "name": "_total",
    "ts": "2021-06-22T23:07:34.213Z",
    "eventRate": 6.00009,
    "bytesRate": 105.8016,
    "latency": 0.2131
  }
]

9) Query state

With the scaled pipeline running, and some data in one worker, use the REST API or q IPC to get the state of the Worker.

$ curl localhost:50762/state/maxval     # Using REST
0.7148779
$ q
q)`::50762 (`.qsp.get; `maxval; ::)    / Or using q IPC
0.7148779

10) Next steps

From here, learn more about the declarative pipeline API through operators, examine other data sources and sinks in readers and writers, and operators, or learn more information on running or configuring pipelines.