Skip to content


Pipelines can be configured using a combination of environment variables and YAML configuration.

User code for streaming pipelines can either be embedded into the YAML configuration, or mounting a directory containing the user code and setting the path to the pipeline q file with the KXI_SP_SPEC_PATH environment variable.


See Running for information about running with different configurations within Docker.

YAML configuration

Each field of the YAML configuration can be overridden by environment variables. If all fields of the YAML configuration are overridden this way, the YAML configuration file itself is not needed.

If present, the configuration must contain an id field as well as a settings field, itself containing a minWorkers field.

id: pipeline-id  # A human-readable way of identifying the pipeline within service discovery
                 # (default: code generated id with prefix "pipeline-")
                 # (env var override: KXI_SP_ID)
  minWorkers: n  # The minumum number of worker processes needed to be online to start the pipeline
                 # (default: 1)
                 # (env var override: KXI_SP_MIN_WORKERS)

Environment variables


The Controller can be configured with the following optional environment variables:

KXI_SP_ID                      # Human readable identifier (default: [generated] pipeline-[0-9a-f]{10})
KXI_SP_CONFIG                  # Path to YAML configuration file
                               #   Only required if a config file is used
KXI_SP_CHECKPOINT_FREQ         # Set the frequency of checkpointing - set to 0 to disable
                               #   In milliseconds (default 1000)
KXI_SP_CHECKPOINT_CLEANUP_FREQ # Set the frequency old checkpoints should be removed
                               #   In milliseconds (default 5000)
KXI_SP_SILENCE_CHECKPOINTS     # If set, silence warnings when Controller checkpointing has not been enabled
KXI_SP_DISABLE_REST            # If set, disable REST interface
KXI_SP_DISABLE_METRICS         # If set, disable metrics reporting - this can reduce incurred latency
KXI_SP_MIN_WORKERS             # Minimum number of Workers required to be online to start (default: 1)

KXI_CONFIG_FILE                # If a sidecar is used, path to the sidecar configuration
KXI_ERROR_MODE                 # Error trap mode, by default is 2
                               #   See for more details:


The Workers can be configured with the following optional environment variables. The pipeline ID will be inherited from the Controller, and will be post-fixed with the Worker unique id.

KXI_SP_PARENT_HOST         # Required. Host:Port for this Worker's Controller
KXI_SP_SPEC                # Required. Path the the pipeline spec file
KXI_SP_CHECKPOINT_FREQ     # Set the frequency of checkpointing - set to 0 to disable
                           #   In milliseconds (default 5000)
KXI_SP_REPORTING_FREQ      # Set the frequency of heartbeats from Worker to Controller (default: 5000)
                           #   In milliseconds (default 5000)
KXI_SP_DISABLE_METRICS     # If set, disable metrics reporting, this can reduce incurred latency
KXI_SP_DISABLE_REST        # If set, disable REST interface

KXI_RT_LIB                 # Path to mounted Kx Insights Reliable Transport client (if used)
KXI_CONFIG_FILE            # If a sidecar is used, path to the sidecar configuration
KXI_PROTECTED_EXECUTION    # Logs name of failing operator in errors, set to 'false' to disable
KXI_ERROR_MODE             # Error trap mode, (default 2)
                           #   See for more details


Containers within a Stream Processor cluster can report pipeline specific metrics (throughput & latency) and kdb+ specific metrics to a Monitoring endpoint such as Prometheus for event monitoring and alerting. Monitoring statistics are scraped by a sidecar container which surfaces the scraped statistics for HTTP pull.

If running a scaled pipeline, pipeline performance will be aggregated in the Controller, allowing a holistic view of the pipeline within Monitoring tools.


Since Monitoring requires a sidecar container, if scaling a pipeline to multiple Workers, it is recommended that only the Controller enable Monitoring. Aggregated Worker statistics will be reported by the Controller.

To add Monitoring to a Stream Processor Controller, add an accompanying kxi-sidecar container to scrape the Controller.


      - KXI_LOG_CONFIG=/opt/config/qlog.json
      - KXI_CONFIG_FILE=/opt/config/controller-sidecar.json
      - KDB_LICENSE_B64
      - ./config:/opt/config
    command: -p 8080 # Standard is to expose metrics on port 8080

In addition, the configuration directory should be created containing the sidecar configuration file.

    "connection": ":controller:6000",
    "frequencySecs": 5,
        "frequency": 5,
        "handler": {
            "pc": true,
            "pg": true,
            "ph": true,
            "po": true,
            "pp": true,
            "ps": true,
            "ts": true,
            "wc": true,
            "wo": true,
            "ws": true

The above configuration file can be used to control the frequency of the sidecar scraping the main container through metrics.frequency and tracks statistics on many of the kdb+ event handlers in the .z namespace (see

Adding Prometheus to a Docker Compose

As an example of adding monitoring configuration, we demonstrate Prometheus using the KX Insights Monitoring component.

Prometheus is publicly available for download from Docker Hub ( and can be pulled using docker pull. Below we show a sample Prometheus container that exposes the Prometheus browser UI on http://localhost:9000.

      image: prom/prometheus
        - 9000:9090
        - ./config:/etc/prometheus
        - prometheus-data:/prometheus
      command: --config.file=/etc/prometheus/prometheus.yml


The volume prometheus-data:/prometheus is used to store the scraped data so that they are available after a restart. A configuration file for Prometheus must be mounted into the Docker container and specified in the run command. This configuration file contains scrape targets, which are the container sidecars we previously configured. An example below is shown where we set the scrape interval and timeout values and specify multiple scrape_config jobs to scrape data from different sidecar containers in a cluster.

  scrape_interval: 10s
  scrape_timeout: 5s

  - job_name: controller
    metrics_path: /metrics
      - targets:
          - 'controller-sidecar:8080' # Point to controller sidecar
  - job_name: worker
    metrics_path: /metrics
      - targets:
          - 'worker-sidecar:8080' # Point to a worker-sidecar, if exists

Service Discovery

Containers within a Stream Processor cluster can advertise themselves to different service discovery registries (e.g. Eureka) through the KX Insights Discovery Proxy.


Since Service Discovery requires a sidecar container, if scaling a pipeline to multiple Workers, it is recommended that only the Controller enable Service Discovery.

Follow the same steps as the Monitoring sidecar setup, and add a Discovery configuration field in the sidecar configuration file ($KXI_CONFIG_FILE). The metrics field can be removed if not using Monitoring as well.

        "registry": ":proxy:5000",
        "heartbeatSecs": 30,
        "leaseExpirySecs": 90

The registry field points to the address of the Kx Insights Discovery Proxy, while heartbeatSecs and leaseExpirySecs respectively specify the heartbeat frequency and the time required for an instance to be evicted based on heartbeat failure.

After configuring container sidecars with discovery, a discovery proxy and registry should be added to your Docker Compose deployment. Instructions on how to configure a discovery proxy and registry can be found here.

Reliable Transport client


The Reliable Transport client must be configured if using the data source or .qsp.write.toRT data sink.

The Reliable Transport client library can be used to abstract different technologies used for the transport of messages between different KX Insights micro-services by using a common API. These client libraries can be mounted into a Stream Processor at build time (using the kxi-sp-worker.qpk) or can be mounted with Docker and pointed to with the KXI_RT_LIB environmental variable.

If using the kxi-sp-worker qpk to build your own customization of the KX Insights Stream Processor you can specify a new pre-built RT client library as a dependency of your image. In the example qp.json shown below, we specify an example project with a tickerplant rt_tick RT qpk and an kxi-sp-worker qpk as dependencies to a custom project.

$ ls
rt_tick.qpk kxi-sp-worker.qpk

The RT implementation as a qpk, and the kxi-sp-worker.qpk can be combined into a custom kxi-sp-worker image alternative by creating a qpacker project with a qp.json file with both dependencies list.

   "my-sp-worker": {
       "depends" : [ "rt_tick", "kxi-sp-worker" ],
       "entry"   : [ "custom.q" ]

Example RT client library for tickerplants

As mentioned previously, the above example could be mounted in a Docker container through configuration or built into a Docker image after building the following file into a qpk and adding it as a project dependency along with the kxi-sp-worker.qpk. The q code below represents an RT abstraction for the publicly available kdb+ tick TP implementation at

// === rt publish and push functions ===
.rt.push:{'"cannot push unless you have called first"}; // will be overridden{[topic]
    if[not 10h=type topic;'"topic must be a string"];
    h:neg hopen $[":"=first topic;`$topic;`$getenv`$"RT_PUB_",upper topic];
    .rt.push:{[nph;payload] nph `.u.upd,payload;}[h;];

// === rt update and subscribe ===

if[`upd in key `.;  '"do not define upd: rt+tick will implement this"];
if[`end in key `.u; '"do not define .u.end: rt+tick will implement this"];

.rt.upd:{[payload;idx] '"need to implement .rt.upd"};

    if[not 10h=type topic;'"topic must be a string"];

    //connect to the tickerplant
    h:hopen $[":"=first topic;`$topic;`$getenv`$"RT_SUB_",upper topic];

    //initialise our message counter

    // === tick.q will call back to these ===
    upd::{[t;x] if[not type x; x:flip (cols t)!x]; .rt.upd[(t;x);.rt.idx]; .rt.idx+:1;};
    .u.end:{.rt.idx:.rt.date2startIdx x+1};

    //replay log file and continue the live subscription
    if[null startIdx;startIdx:0W]; // null means follow only, not start from beginning

    res:h "(.u.sub[`;`]; .u `i`L; .u.d)";

    //define the tables
    {x[;0] set' x[;1]} res 0;

    //if start index is less than current index, then recover
    if[startIdx<.rt.idx:(.rt.date2startIdx res 2)+res[1;0]; .rt.recoverMultiDay[res[1];startIdx]];

// 100 billion records per day

.rt.date2startIdx:{("J"$(string x) except ".")*.rt.MAX_LOG_SZ};

    //iL - index and Log (as can be fed into -11!)
    i:first iL; L:last iL;
    //get all files in the same folder as the tp log file
    files:key dir:first pf:` vs last L;
    //get the name of the logfile itself
    fileName:last pf;
    //get all the lognameXXXX.XX.XX files (logname is sym by default - so usually the files are of the form sym2021.01.01, sym2021.01.02, sym2021.01.03, etc)
    files:files where files like (-10_ string fileName),"*";
    //from those files, get those with dates in the range we are interested in
    files:` sv/: dir,/:asc files where ("J"$(-10#/:string files) except\: ".")>=startIdx div .rt.MAX_LOG_SZ;
    //set up upd to skip the first part of the file and revert to regular definition when you hit start index
    upd::{[startIdx;updo;t;x] $[.rt.idx>=startIdx; [upd::updo; upd[t;x]]; .rt.idx+:1]}[startIdx;upd];
    //read all of all the log files except the last, where you read up to 'i'
    files:0W,/:files; files[(count files)-1;0]:i;
    //reset .rt.idx for each new day and replay the log file
    {.rt.idx:.rt.date2startIdx "D"$-10#string x 1; -11!x}each files;