Skip to content

Configuration

Configuring and submitting a streaming pipeline is significantly different whether done in Kubernetes or within Docker.

Within Docker, Controller and Worker containers must be configured and managed manually, either with Docker, Docker Compose, or a similar container management system.

Within Kubernetes, deployment and installation is greatly simplified through a Helm chart which installs the Stream Processor Coordinator (a Kubernetes Operator) into a cluster which accepts streaming jobs through a REST API. Being an Operator, the Coordinator will configure and launch all required Kubernetes workloads and resources.

Docker

Pipelines within Docker can be configured using a combination of environment variables and YAML configuration.

User code for streaming pipelines can either be embedded into the YAML configuration, or mounting a directory containing the user code and setting the path to the pipeline q file with the KXI_SP_SPEC_PATH environment variable.

See Running for information about running with different configurations within Docker.

YAML configuration

Each field of the YAML configuration can be overridden by environment variables. If all fields of the YAML configuration are overridden this way, the YAML configuration file itself is not needed.

If present, the configuration must contain an id field as well as a settings field, itself containing a minWorkers field.

id: pipeline-id  # A human-readable way of identifying the pipeline within service discovery
                 # (default: code generated id with prefix "pipeline-")
                 # (env var override: KXI_SP_ID)
settings:
  minWorkers: n  # The minumum number of worker processes needed to be online to start the pipeline
                 # (default: 1)
                 # (env var override: KXI_SP_MIN_WORKERS)

Environment variables

Controller

The Controller can be configured with the following optional environment variables:

KXI_SP_ID                      # Human readable identifier (default: [generated] pipeline-[0-9a-f]{10})
KXI_SP_CONFIG                  # Path to YAML configuration file
                               #   Only required if a config file is used
KXI_SP_CHECKPOINT_FREQ         # Set the frequency of checkpointing - set to 0 to disable
                               #   In milliseconds (default 5000)
KXI_SP_SILENCE_CHECKPOINTS     # If set, silence warnings when Controller checkpointing has not been enabled
KXI_SP_DISABLE_REST            # If set, disable REST interface
KXI_SP_DISABLE_METRICS         # If set, disable metrics reporting - this can reduce incurred latency
KXI_SP_MIN_WORKERS             # Minimum number of Workers required to be online to start (default: 1)

KXI_CONFIG_FILE                # If a sidecar is used, path to the sidecar configuration
KXI_ERROR_MODE                 # Error trap mode, by default is 2
                               #   See for more details: https://code.kx.com/q/basics/syscmds/#e-error-trap-clients

Worker

The Workers can be configured with the following optional environment variables. The pipeline ID will be inherited from the Controller, and will be post-fixed with the Worker unique id.

KXI_SP_PARENT_HOST         # Required. Host:Port for this Worker's Controller
KXI_SP_SPEC                # Required. Path the the pipeline spec file
KXI_SP_CHECKPOINT_FREQ     # Set the frequency of checkpointing - set to 0 to disable
                           #   In milliseconds (default 5000)
KXI_SP_REPORTING_FREQ      # Set the frequency of heartbeats from Worker to Controller
                           #   In milliseconds (default 5000)
KXI_SP_DISABLE_METRICS     # If set, disable metrics reporting, this can reduce incurred latency
KXI_SP_DISABLE_REST        # If set, disable REST interface

KXI_RT_LIB                 # Path to mounted Kx Insights Reliable Transport client (if used)
KXI_CONFIG_FILE            # If a sidecar is used, path to the sidecar configuration
KXI_PROTECTED_EXECUTION    # Logs name of failing operator in errors, set to 'false' to disable
KXI_ERROR_MODE             # Error trap mode, (default 2)
                           #   See for more details https://code.kx.com/q/basics/syscmds/#e-error-trap-clients

Monitoring

Containers within a Stream Processor cluster can report pipeline specific metrics (throughput & latency) and kdb+ specific metrics to a Monitoring endpoint such as Prometheus for event monitoring and alerting. Monitoring statistics are scraped by a sidecar container which surfaces the scraped statistics for HTTP pull.

If running a scaled pipeline, pipeline performance will be aggregated in the Controller, allowing a holistic view of the pipeline within Monitoring tools.

Note

Since Monitoring requires a sidecar container, if scaling a pipeline to multiple Workers, it is recommended that only the Controller enable Monitoring. Aggregated Worker statistics will be reported by the Controller.

To add Monitoring to a Stream Processor Controller, add an accompanying kxi-sidecar container to scrape the Controller.

services:
  ..

  controller-sidecar:
    image: registry.dl.kx.com/kxi-sidecar:0.8.0
    environment:
      - KXI_LOG_CONFIG=/opt/config/qlog.json
      - KXI_CONFIG_FILE=/opt/config/controller-sidecar.json
      - KDB_LICENSE_B64
    volumes:
      - ./config:/opt/config
    command: -p 8080 # Standard is to expose metrics on port 8080

In addition, the configuration directory should be created containing the sidecar configuration file.

{
    "connection": ":controller:6000",
    "frequencySecs": 5,
    "metrics":
    {
        "enabled":"true",
        "frequency": 5,
        "handler": {
            "pc": true,
            "pg": true,
            "ph": true,
            "po": true,
            "pp": true,
            "ps": true,
            "ts": true,
            "wc": true,
            "wo": true,
            "ws": true
          }
    }
}

The above configuration file can be used to control the frequency of the sidecar scraping the main container through metrics.frequency and tracks statistics on many of the kdb+ event handlers in the .z namespace (see https://code.kx.com/q/ref/dotz/).

Adding Prometheus to a Docker Compose

As an example of adding monitoring configuration, we demonstrate Prometheus using the KX Insights Monitoring component.

Prometheus is publicly available for download from Docker Hub (https://hub.docker.com/r/prom/prometheus/) and can be pulled using docker pull. Below we show a sample Prometheus container that exposes the Prometheus browser UI on http://localhost:9000.

services:
  prometheus:
      image: prom/prometheus
      ports:
        - 9000:9090
      volumes:
        - ./config:/etc/prometheus
        - prometheus-data:/prometheus
      command: --config.file=/etc/prometheus/prometheus.yml

volumes:
     prometheus-data:

The volume prometheus-data:/prometheus is used to store the scraped data so that they are available after a restart. A configuration file for Prometheus must be mounted into the Docker container and specified in the run command. This configuration file contains scrape targets, which are the container sidecars we previously configured. An example below is shown where we set the scrape interval and timeout values and specify multiple scrape_config jobs to scrape data from different sidecar containers in a cluster.

global:
  scrape_interval: 10s
  scrape_timeout: 5s

scrape_configs:
  - job_name: controller
    metrics_path: /metrics
    static_configs:
      - targets:
          - 'controller-sidecar:8080' # Point to controller sidecar
  - job_name: worker
    metrics_path: /metrics
    static_configs:
      - targets:
          - 'worker-sidecar:8080' # Point to a worker-sidecar, if exists

Service Discovery

Containers within a Stream Processor cluster can advertise themselves to different service discovery registries (e.g. Eureka) through the KX Insights Discovery Proxy.

Note

Since Service Discovery requires a sidecar container, if scaling a pipeline to multiple Workers, it is recommended that only the Controller enable Service Discovery.

Follow the same steps as the Monitoring sidecar setup, and add a Discovery configuration field in the sidecar configuration file ($KXI_CONFIG_FILE). The metrics field can be removed if not using Monitoring as well.

    ..
    "discovery":
    {
        "registry": ":proxy:5000",
        "heartbeatSecs": 30,
        "leaseExpirySecs": 90
    }
    ..

The registry field points to the address of the Kx Insights Discovery Proxy, while heartbeatSecs and leaseExpirySecs respectively specify the heartbeat frequency and the time required for an instance to be evicted based on heartbeat failure.

After configuring container sidecars with discovery, a discovery proxy and registry should be added to your Docker Compose deployment. Instructions on how to configure a discovery proxy and registry can be found here.

Kubernetes

A Helm chart has been provided for installing the Stream Processor service into a Kubernetes cluster.

Prerequisites

To run within Kubernetes there are a number of pre-requisites:

Kdb+ License

To start the Stream Processor service within Kubernetes, a license secret should be installed into the Kubernetes cluster.

First, print the base64 encoded license:

$ base64 -w 0 $QLIC/kc.lic

Then create and apply a secret containing the encoded license text:

$ cat kx-license-secret.yaml
apiVersion: v1
kind: Secret
metadata:
  name: kx-license-info
type: Opaque
stringData:
  license: <base64 license string>

$ kubectl apply -f kx-license-secret.yaml

Helm

Add the KX Helm chart repository to your helm install:

$ helm repo add --username <username> --password <password> \
    kxi-repo https://nexus.dl.kx.com/repository/kx-insights-charts/
"kxi-repo" has been added to your repositories

Update your helm repo local chart cache:

$ helm repo update
Hang tight while we grab the latest from your chart repositories...
...Successfully got an update from the "kxi-repo" chart repository
Update Complete. ⎈Happy Helming!⎈

Add a secret to your cluster to allow access to the images:

$ kubectl create secret docker-registry kx-repo-access \
    --docker-username=<username> \
    --docker-password=<password> \
    --docker-server=registry.dl.kx.com

Note

In the case above the secret kx-repo-access has been created. When deploying the charts it will be necessary to reference this secret in the imagePullSecrets within the configuration file.

Configuration file

A configuration file is required to run the Coordinator service. An example standalone configuration file is provided below. See all configuration options for more information and options.

$ cat spcoord.yaml
license:
  secretName: kx-license-info

imagePullSecrets:
  - name: kx-repo-access

persistence:
  enabled: true
  # You will need to provide this argument if persistence is enabled.
  # Point this to the name of a created Storage Class for checkpoints.
  storageClassName: standard
  controllerCheckpointFreq: 10000
  workerCheckpointFreq: 5000
  # Storage allocated to each worker/controller
  storage: 20Gi

discovery:
  # If KXI Service Discovery is set up, set this to true
  enabled: false

metrics:
  # If KXI Monitoring is set up, set this to true
  enabled: false

Installing the Coordinator

$ helm install sp kxi-repo/kxi-sp -f path/to/spcoord.yml

Accessing the Coordinator

To access the Coordinator for the following creation or teardown requests, first port-forward the Coordinator service locally.

$ kubectl port-forward sp-kxi-sp-0

Configuring and running a streaming job

Assuming a spec.q file exists locally with a pipeline defined, it's configuration is provided through the REST API when submitting it to the Coordinator.

$ curl -X POST localhost:5000/pipeline/create -d "{\"name\":\"kafka-in\",\"type\":\"spec\",\"config\":{\"content\":$(jq -asR < spec.q)},\"settings\": {\"minWorkers\":1, \"maxWorkers\":10, \"workerThreads\": 4, \"workerImage\": \"\"}}"

In the above REST API, the following settings are available as part of the settings field.

setting default description
minWorkers 1 Minimum required Workers ready for the pipeline to start
maxWorkers 10 Maximum required Workers ready for the pipeline to start
workerThreads 1 Number of secondary threads for Workers
workerImage Custom Worker image to deploy (if building a custom Worker from the .qpk)

Uninstalling the Stream Processor Coordinator

To remove the Stream Processor service from your Kubernetes cluster:

1) Teardown all running pipelines

Submit the teardown request to the Coordinator to bring down all pipelines.

$ curl -X POST localhost:5000/pipelines/teardown

Alternatively, pipelines can be individually removed with:

$ curl -X POST localhost:5000/pipeline/teardown/<id>

Or by using kubectl directly on the pipeline name:

$ kubectl delete rs,pod,svc,cm,servicemonitor,pvc,statefulset -l pipeline=$name

2) Uninstall the Helm chart

$ helm uninstall sp

Additionally, all Stream Processor workloads will have the following labels to help identify them:

app.kubernetes.io/part-of=stream-processor

All configuration options

The following options are available when configuring the Coordinator service when deploying with Helm.

config default description
imagePullSecrets Arrays of name of secrets with image pull permissions
defaultWorkerThreads 0 Default secondary threads for new pipeline submissions
discovery.enabled true Whether KX Insights Service Discovery is enabled. Note: Service must be set up separately.
license.user License user name (if not using a license secret)
license.email License email (if not using a license secret)
license.secretName Name of secret with base64 license text (if not using user/email)
metrics.enabled true Whether KX Insights Monitoring is enabled. Note: Service must be set up separately.
metrics.frequency 5 Polling frequency of monitoring metrics in seconds
metrics.handler.po true Capture metrics for the .z.po handler
metrics.handler.pc true Capture metrics for the .z.pc handler
metrics.handler.wo true Capture metrics for the .z.wo handler
metrics.handler.wc true Capture metrics for the .z.wc handler
metrics.handler.pg true Capture metrics for the .z.pg handler
metrics.handler.ps true Capture metrics for the .z.ps handler
metrics.handler.ws true Capture metrics for the .z.ws handler
metrics.handler.ph true Capture metrics for the .z.ph handler
metrics.handler.pp true Capture metrics for the .z.pp handler
metrics.handler.ts true Capture metrics for the .z.ts handler
persistence.enabled false Whether persistent volumes are enabled on pipelines. Note: checkpointing for recovery requires this be enabled
persistence.storageClassName Pre-configured storage class name to be used for persistent volumes
persistence.controllerCheckpointFreq 10000 Frequency of Controller checkpoints
persistence.workerCheckpointFreq 10000 Frequency of Worker checkpoints
persistence.storage 20Gi Persistent volume storage size

Reliable Transport client

Note

The Reliable Transport client must be configured if using the .qsp.read.fromRT data source or .qsp.write.toRT data sink.

The Reliable Transport client library can be used to abstract different technologies used for the transport of messages between different KX Insights micro-services by using a common API. These client libraries can be mounted into a Stream Processor at build time (using the kxi-sp-worker.qpk) or can be mounted with Docker and pointed to with the KXI_RT_LIB environmental variable.

If using the kxi-sp-worker qpk to build your own customization of the KX Insights Stream Processor you can specify a new pre-built RT client library as a dependency of your image. In the example qp.json shown below, we specify an example project with a tickerplant rt_tick RT qpk and an kxi-sp-worker qpk as dependencies to a custom project.

$ ls
rt_tick.qpk kxi-sp-worker.qpk

The RT implementation as a qpk, and the kxi-sp-worker.qpk can be combined into a custom kxi-sp-worker image alternative by creating a qpacker project with a qp.json file with both dependencies list.

{
   "my-sp-worker": {
       "depends" : [ "rt_tick", "kxi-sp-worker" ],
       "entry"   : [ "custom.q" ]
   }
}

Example RT client library for tickerplants

As mentioned previously, the above example could be mounted in a Docker container through configuration or built into a Docker image after building the following file into a qpk and adding it as a project dependency along with the kxi-sp-worker.qpk. The q code below represents an RT abstraction for the publicly available kdb+ tick TP implementation at https://github.com/KxSystems/kdb-tick.

// === rt publish and push functions ===
.rt.push:{'"cannot push unless you have called .rt.pub first"}; // will be overridden

.rt.pub:{[topic]
    if[not 10h=type topic;'"topic must be a string"];
    h:neg hopen $[":"=first topic;`$topic;`$getenv`$"RT_PUB_",upper topic];
    .rt.push:{[nph;payload] nph `.u.upd,payload;}[h;];
    };

// === rt update and subscribe ===

if[`upd in key `.;  '"do not define upd: rt+tick will implement this"];
if[`end in key `.u; '"do not define .u.end: rt+tick will implement this"];

.rt.upd:{[payload;idx] '"need to implement .rt.upd"};

.rt.sub:{[topic;startIdx]
    if[not 10h=type topic;'"topic must be a string"];

    //connect to the tickerplant
    h:hopen $[":"=first topic;`$topic;`$getenv`$"RT_SUB_",upper topic];

    //initialise our message counter
    .rt.idx:0;

    // === tick.q will call back to these ===
    upd::{[t;x] if[not type x; x:flip (cols t)!x]; .rt.upd[(t;x);.rt.idx]; .rt.idx+:1;};
    .u.end:{.rt.idx:.rt.date2startIdx x+1};

    //replay log file and continue the live subscription
    if[null startIdx;startIdx:0W]; // null means follow only, not start from beginning

    //subscribe
    res:h "(.u.sub[`;`]; .u `i`L; .u.d)";

    //define the tables
    {x[;0] set' x[;1]} res 0;

    //if start index is less than current index, then recover
    if[startIdx<.rt.idx:(.rt.date2startIdx res 2)+res[1;0]; .rt.recoverMultiDay[res[1];startIdx]];
    };

// 100 billion records per day
.rt.MAX_LOG_SZ:"j"$1e11;

.rt.date2startIdx:{("J"$(string x) except ".")*.rt.MAX_LOG_SZ};

.rt.recoverMultiDay:{[iL;startIdx]
    //iL - index and Log (as can be fed into -11!)
    i:first iL; L:last iL;
    //get all files in the same folder as the tp log file
    files:key dir:first pf:` vs last L;
    //get the name of the logfile itself
    fileName:last pf;
    //get all the lognameXXXX.XX.XX files (logname is sym by default - so usually the files are of the form sym2021.01.01, sym2021.01.02, sym2021.01.03, etc)
    files:files where files like (-10_ string fileName),"*";
    //from those files, get those with dates in the range we are interested in
    files:` sv/: dir,/:asc files where ("J"$(-10#/:string files) except\: ".")>=startIdx div .rt.MAX_LOG_SZ;
    //set up upd to skip the first part of the file and revert to regular definition when you hit start index
    upd::{[startIdx;updo;t;x] $[.rt.idx>=startIdx; [upd::updo; upd[t;x]]; .rt.idx+:1]}[startIdx;upd];
    //read all of all the log files except the last, where you read up to 'i'
    files:0W,/:files; files[(count files)-1;0]:i;
    //reset .rt.idx for each new day and replay the log file
    {.rt.idx:.rt.date2startIdx "D"$-10#string x 1; -11!x}each files;
    };