Skip to content

Configuration

Configure the Stream Processor for deployment in either Docker or Kubernetes

As a microservice, the Stream Processor can be deployed using Docker or Kubernetes. Configuration is dependent on the deployment environment.

Within Docker, Controller and Worker containers must be configured and managed manually, either with Docker, Docker Compose, or a similar container management system.

Within Kubernetes, deployment and installation is greatly simplified through a Helm chart which installs the Stream Processor Coordinator (a Kubernetes Operator) into a cluster which accepts streaming jobs through a REST API. Being an Operator, the Coordinator will configure and launch all required Kubernetes workloads and resources.

Docker

Running with different configurations within Docker

User code for streaming pipelines can be either

  • embedded into the YAML configuration
  • mounted in a directory, setting environment variable KXI_SP_SPEC_PATH as the path to the .q or .py file that defines the pipeline

YAML configuration

Each field of the YAML configuration can be overridden by environment variables. (If all are overridden, the YAML configuration file itself is not needed.)

If present, the configuration must contain an id field as well as a settings field, itself containing a minWorkers field.

id: pipeline-id  # A legible way to identify the pipeline to Service Discovery
                 # (default: code generated id with prefix "pipeline-")
                 # (env var override: KXI_SP_ID)
settings:
  minWorkers: n  # The minumum number of worker processes
                 # needed to be online to start the pipeline
                 # (default: 1)
                 # (env var override: KXI_SP_MIN_WORKERS)

Environment variables

Controller

The Controller can be configured with the following optional environment variables:

Environment Variable Default Description
KXI_SP_ID See description legible identifier; defaults to pipeline-[0-9a-f]{10}
KXI_SP_CONFIG "" path to YAML configuration file, if used
KXI_SP_CHECKPOINT_FREQ "5000" frequency of checkpoints (ms), 0 disables
KXI_SP_SILENCE_CHECKPOINTS "false" silence warnings when checkpointing disabled
KXI_SP_DISABLE_METRICS "true" disable metrics reporting (can reduce incurred latency)
KXI_SP_DISABLE_REST "false" disable REST interface
KXI_SP_MIN_WORKERS "1" minimum number of Workers required to be online to start
KXI_CONFIG_FILE "" path to the sidecar configuration (if used)
KXI_ERROR_MODE "0" error trap mode
KXI_SP_BETA_FEATURES "false" enables optional beta features in preview mode. Note that beta features are not for use in production and are subject to change in a future release

Worker

The Workers can be configured with the following optional environment variables. The pipeline ID will be inherited from the Controller, and will be post-fixed with the Worker unique ID.

environment variable default description
KXI_SP_PARENT_HOST Required host:port for this Worker's Controller
KXI_SP_SPEC Required path to the the pipeline spec file (python or q)
KXI_SP_ORDINAL "" integer ordinal for the Worker, each Worker sharing a Controller must have a different ordinal
KXI_SP_CHECKPOINT_FREQ "5000" frequency of checkpoints (ms), 0 disables
KXI_SP_SILENCE_CHECKPOINTS "false" silence warnings when checkpointing disabled
KXI_SP_REPORTING_FREQ "5000" frequency of heartbeats (ms) from Worker to Controller
KXI_SP_DISABLE_METRICS "true" disable metrics reporting (can reduce incurred latency)
KXI_SP_DISABLE_REST "false" disable REST interface
KXI_SP_EVENT_JOURNAL (Beta) "true" enables event journaling, which is used by some readers for replay
KXI_SP_JOURNAL_DIR (Beta) "" path to the directory where event journaling logs should be stored
KXI_RT_LIB "" path to mounted KX Insights Reliable Transport client
KXI_RT_EVENT_FATAL "false" set to "true" to indicate that any data loss events from RT should cause the worker to error fatally
KXI_CONFIG_FILE "" path to the sidecar configuration (if used)
KXI_PROTECTED_EXECUTION "true" log names of failing operators in errors
KXI_ERROR_MODE "0" error trap mode
KXI_SP_CONFIG_PATH "./config/sp" path to custom mounted configuration maps and secrets specified in a deployment request (optional, only used in Kubernetes deployments)
KXI_SP_BETA_FEATURES "false" enables optional beta features in preview mode. Note that beta features are not for use in production and are subject to change in a future release
KXI_ALLOW_NONDETERMINISM "false" for pipeline workloads that have non-deterministic workloads, enabling non-determinism will turn off deduplication and let any data flow. See determinism for more details.

Monitoring

Containers within a Stream Processor cluster can report pipeline-specific metrics (throughput and latency) and kdb+-specific metrics to a Monitoring endpoint such as Prometheus for event monitoring and alerting. Monitoring statistics are scraped by a sidecar container which surfaces the scraped statistics for HTTP pull.

If running a scaled pipeline, pipeline performance will be aggregated in the Controller, allowing a holistic view of the pipeline within Monitoring tools.

Monitoring multiple Workers

Since Monitoring requires a sidecar container, if scaling a pipeline to multiple Workers, let only the Controller enable Monitoring. Aggregated Worker statistics will be reported by the Controller.

To add Monitoring to a Stream Processor Controller, add an accompanying kxi-sidecar container to scrape the Controller.

services:
  ..

  controller-sidecar:
    image: registry.dl.kx.com/kxi-sidecar:0.8.0
    environment:
      - KXI_LOG_CONFIG=/opt/config/qlog.json
      - KXI_CONFIG_FILE=/opt/config/controller-sidecar.json
      - KDB_LICENSE_B64
    volumes:
      - ./config:/opt/config
    command: -p 8080 # Standard is to expose metrics on port 8080

Create the configuration directory containing the sidecar controller-sidecar.json configuration file.

{
    "connection": ":controller:6000",
    "frequencySecs": 5,
    "metrics":
    {
        "enabled":"true",
        "frequency": 5,
        "handler": {
            "pc": true,
            "pg": true,
            "ph": true,
            "po": true,
            "pp": true,
            "ps": true,
            "ts": true,
            "wc": true,
            "wo": true,
            "ws": true
          }
    }
}

The above configuration file can be used to control the frequency of the sidecar scraping the main container through metrics.frequency. It tracks statistics on many of the q event handlers in the .z namespace.

Adding Prometheus to a Docker Compose

As an example of adding monitoring configuration, we demonstrate Prometheus using the KX Insights Monitoring component.

Prometheus is available for download from Docker Hub and can be pulled with docker pull. Below we show a sample Prometheus container that exposes the Prometheus browser UI on http://localhost:9000.

services:
  prometheus:
      image: prom/prometheus
      ports:
        - 9000:9090
      volumes:
        - ./config:/etc/prometheus
        - prometheus-data:/prometheus
      command: --config.file=/etc/prometheus/prometheus.yml

volumes:
     prometheus-data:

The volume prometheus-data:/prometheus is used to store the scraped data so that they are available after a restart. A configuration file for Prometheus must be mounted into the Docker container and specified in the run command.

This configuration file contains scrape targets, which are the container sidecars we previously configured. In the example below we set the scrape interval and timeout values and specify multiple scrape_config jobs to scrape data from different sidecar containers in a cluster.

global:
  scrape_interval: 10s
  scrape_timeout: 5s

scrape_configs:
  - job_name: controller
    metrics_path: /metrics
    static_configs:
      - targets:
          - 'controller-sidecar:8080' # Point to controller sidecar
  - job_name: worker
    metrics_path: /metrics
    static_configs:
      - targets:
          - 'worker-sidecar:8080' # Point to a worker-sidecar, if exists

Service Discovery

Containers within a Stream Processor cluster can advertise themselves to different service discovery registries (e.g. Eureka) through the KX Insights Discovery Proxy.

Service Discovery with multiple Workers

Since Service Discovery requires a sidecar container, if scaling a pipeline to multiple Workers, let only the Controller enable Service Discovery.

Follow the same steps as the Monitoring sidecar setup, and add a Discovery configuration field in the sidecar configuration file (KXI_CONFIG_FILE). The metrics field can be removed if not using Monitoring as well.

    ..
    "discovery":
    {
        "registry": ":proxy:5000",
        "heartbeatSecs": 30,
        "leaseExpirySecs": 90
    }
    ..

The registry field points to the address of the KXI Discovery Proxy, while heartbeatSecs and leaseExpirySecs respectively specify the heartbeat frequency and the time required for an instance to be evicted based on heartbeat failure.

After configuring container sidecars with discovery, a discovery proxy and registry should be added to your Docker Compose deployment.

How to configure a discovery proxy and registry

Kubernetes

A Helm chart has been provided for installing the Stream Processor service into a Kubernetes cluster.

Prerequisites

To run within Kubernetes there are a number of pre-requisites:

Kdb+ License

To start the Stream Processor service within Kubernetes, a license secret should be installed into the Kubernetes cluster.

First, print the base64 encoded license:

base64 -w 0 $QLIC/kc.lic

Then create and apply a secret containing the encoded license text:

cat kx-license-secret.yaml
apiVersion: v1
kind: Secret
metadata:
  name: kx-license-info
type: Opaque
stringData:
  license: <base64 license string>
kubectl apply -f kx-license-secret.yaml

Helm

Add the KX Helm chart repository to your helm install:

helm repo add --username <username> --password <password> \
    kxi-repo https://nexus.dl.kx.com/repository/kx-insights-charts/
"kxi-repo" has been added to your repositories

Update your helm repository local chart cache:

helm repo update
Hang tight while we grab the latest from your chart repositories...
...Successfully got an update from the "kxi-repo" chart repository
Update Complete. ⎈Happy Helming!⎈

Add a secret to your cluster to allow access to the images:

kubectl create secret docker-registry kx-repo-access \
    --docker-username=<username> \
    --docker-password=<password> \
    --docker-server=registry.dl.kx.com

Note

In the case above the secret kx-repo-access has been created. When deploying the charts it will be necessary to reference this secret in the imagePullSecrets within the configuration file.

Configuration file

A configuration file is required to run the Coordinator service. An example standalone configuration file is provided below. See all configuration options for more information and options.

cat spcoord.yaml
license:
  secretName: kx-license-info

imagePullSecrets:
  - name: kx-repo-access

persistence:
  enabled: true
  # You will need to provide this argument if persistence is enabled.
  # Point this to the name of a created Storage Class for checkpoints.
  storageClassName: standard
  controllerCheckpointFreq: 10000
  workerCheckpointFreq: 5000
  # Storage allocated to each worker/controller
  storage: 20Gi

discovery:
  # If KXI Service Discovery is set up, set this to true
  enabled: false

metrics:
  # If KXI Monitoring is set up, set this to true
  enabled: false

logging:
  endpoints:         # list of endpoints (can provide multiple)
    - fd://stdout
  formatMode: json
  routings:          # Mapping of logging to level
    ALL: DEBUG        # `ALL` is the default routing
    spcoord: TRACE      # Set coordinator logs to trace
    KUBE: TRACE         # Set kubernetes logs to trace

Installing the Coordinator

helm install sp kxi-repo/kxi-sp -f path/to/spcoord.yml

One kxi-sp install per namespace

Only 1 Coordinator may be installed in a given Kubernetes namespace. If you wish to install another, it must be done in another namespace.

Accessing the Coordinator

To access the Coordinator for the following creation or teardown requests, first port-forward the Coordinator service locally.

kubectl port-forward sp-kxi-sp-0

Configuring and running a streaming job

Assuming a spec.q file exists locally with a pipeline defined, it's configuration is provided through the REST API when submitting it to the Coordinator.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name     : "kafka-in",
        type     : "spec",
        base     : "q",
        config   : { content: $spec },
        settings : {
            minWorkers: "1",
            maxWorkers: "10",
            workerThreads: "4",
            workerImage: ""
        },
        kubeConfig: { "secrets" : ["aws-creds"]},
        persistence : {
          controller : { class:"standard", size:"25Gi", checkpointFreq: 5000 },
          worker     : { class:"standard", size:"100Gi", checkpointFreq: 1000 }
        }
    }' | jq -asR .)"

In the above REST API, the following settings are available as part of the settings field.

setting default description
minWorkers 1 Minimum required Workers ready for the pipeline to start
maxWorkers 10 Maximum required Workers ready for the pipeline to start
workerThreads 1 Number of secondary threads for Workers
workerImage Custom Worker image to deploy (if building a custom Worker from the .qpk)

Configuring Persistence for a streaming job

Persistence configuration can be customized for each deployment of a streaming job. It is configured with the persistence field which is composed of a map of two fields, controller and worker, used respectively to configure persistence and checkpointing for the Controller and Worker. For each process type, the Kubernetes storage class, storage size and checkpoint frequency can be configured.

setting default description
disabled false Flag indicating whether to disable persistence for the process type
class Kubernetes cluster default Kubernetes storage class used to provision persistent disk. Default value can be specified with Helm chart (persistence.storageClass)
size 20Gi Storage size allocated for the persistent volume. Default value can be specified with Helm chart (persistence.storage)
checkpointFreq 5000 Checkpointing frequency of process type (in milliseconds). Default value can be specified with Helm chart (persistence.controllerCheckpointFreq & persistence.workerCheckpointFreq)

Persistence can be disabled per job by explicitly setting the disabled field to true for the Controller and or Worker, as shown below:

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat $spec)" \
    '{
        name        : "no-persistence",
        type        : "spec",
        config      : { content: $spec },
        settings    : { minWorkers: "1", maxWorkers: "10" },
        persistence : { controller: { disabled: true }, worker: { disabled: true }}
    }' | jq -asR .)"

Note: If persistence is disabled by default by the helm charts, no persistent disk will be deployed to back checkpoints unless all the necessary fields are provided in the deployment request (class, size and checkpoint frequency).

Increasing the size of an existing persistent volume

Persistence for the SP in Kubernetes is accomplished using PersistentVolumeClaims (PVCs), which can have their size expanded after creation. To resize a PVC, the PVC's StorageClass must be set to allow volume expansion. If the StorageClass is correctly configured, and the PVC is of a type that supports volume expansion, you can expand the size of the persistent volume by editing the PVC to have a larger size. This can be done using a tool such as kubectl or k9s. For a more detailed explanation of the process of expanding PVCs, see the official Kubernetes documentation on Expanding Persistent Volume Claims.

Worker pipelines can be found using label selectors. The label selectors you'll need to use to find all Worker PVCs for a specific pipeline are sp.kx.com/pipeline=<pipeline ID> and sp.kx.com/role=worker.

For example, to list the Worker PVCs for a pipeline with ID spcoord-my-pipeline, you could use the following kubectl command:

kubectl get pvc -l sp.kx.com/pipeline=spcoord-my-pipeline,sp.kx.com/role=worker

Configuring mounting of custom configuration for a streaming job

In a Kubernetes deployment of the Stream Processor, configuration files that are required for Workers to execute a streaming job (e.g. credentials) can be mounted with the kubeConfig field of a deployment request. These files must be embedded into a Kubernetes Secret or ConfigMap prior to creation of the pipeline in the same namespace as the Stream Processor's deployment.

For more information about how to create a secret or configuration, please refer to the links below:

https://kubernetes.io/docs/concepts/configuration/configmap/

https://kubernetes.io/docs/concepts/configuration/secret/

The kubeConfig field is an optional map that contains two fields secrets and configMaps which both contain an array of secret and configMaps names that are to be mounted in the Worker executing the streaming job.

Deploying a streaming job from a Python specification

Streaming pipeline specifications, can be written in Python or q. By default if no base parameter is provided in the deployment request, it is assumed that the submitted spec is written in q. However if a user has written a pipeline spec using the SP python API, the user can set the base deployment parameter to "py" as shown in the example below:

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat test.py)" \
    '{
        name     : "test",
        type     : "spec",
        base     : "py",
        config   : { content: $spec },
        settings : { minWorkers: "1", maxWorkers: "10" }
    }' | jq -asR .)"

Where test.py is the following:

from kxi import sp

sp.run(sp.read.from_callback('publish').to_console())

Deploying a streaming job with access to Machine Learning (ML) functionality

In addition to the use of the plugins defined in the API section here, additional ML functionality can be used through deployment of an q-ml base image. The full set of available ML functions is defined here. Presently the ML functionality can only be deployed using a streaming pipeline specification written in q, the following outlines a sample deployment of a q-ml base image:

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name     : "test",
        type     : "spec",
        base     : "q-ml",
        config   : { content: $spec },
        settings : { minWorkers: "1", maxWorkers: "10" }
    }' | jq -asR .)"

Where spec.q is the following:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.ml.minMaxScaler[::;.qsp.use ``bufferSize!(`;100)]
  .qsp.ml.sequentialKMeans[{select from x};.qsp.use ``k!(`;3)]
  .qsp.write.toConsole[]

Uninstalling the Stream Processor Coordinator

To remove the Stream Processor service from your Kubernetes cluster:

1) Teardown all running pipelines

Submit the teardown request to the Coordinator to bring down all pipelines.

curl -X POST localhost:5000/pipelines/teardown

Alternatively, pipelines can be individually removed with:

curl -X POST localhost:5000/pipeline/teardown/<id>

Or by using kubectl directly on the pipeline name:

kubectl delete rs,pod,svc,cm,servicemonitor,pvc,statefulset -l pipeline=$name

2) Uninstall the Helm chart

helm uninstall sp

Additionally, all Stream Processor workloads will have the following labels to help identify them:

app.kubernetes.io/part-of=stream-processor

All configuration options

The following options are available when configuring the Coordinator service when deploying with Helm.

option default description
image.repository registry.dl.kx.com The URL of the image repository for the coordinator image.
image.component kxi-sp-coordinator The name of the coordinator image.
image.pullPolicy IfNotPresent The Kubernetes image pull policy for this image.
ctlImage.repository registry.dl.kx.com The URL of the image repository for the default controller image.
ctlImage.component kxi-sp-controller The name of the controller image.
ctlImage.pullPolicy IfNotPresent The Kubernetes image pull policy for this image.
workImage.repository registry.dl.kx.com The URL of the image repository for the default worker image.
workImage.component kxi-sp-worker The name of the worker image.
workImage.pullPolicy IfNotPresent The Kubernetes image pull policy for this image.
mlImage.repository registry.dl.kx.com The URL of the image repository for the default machine learning worker image.
mlImage.component kxi-ml The name of the machine learning worker image.
mlImage.pullPolicy IfNotPresent The Kubernetes image pull policy for this image.
pyImage.repository registry.dl.kx.com The URL of the image repository for the default Python worker image.
pyImage.component kxi-sp-python The name of the Python worker image.
pyImage.pullPolicy IfNotPresent The Kubernetes image pull policy for this image.
imagePullSecrets [] Arrays of name of secrets with image pull permissions.
env {} Additional environment variables to add to the coordinator.
debug false Enables interactivity for the coordinator.
port 5000 The port that the coordinator will bind to and serve its REST interface from.
instanceParam { "g": 1, "t": 1000 } Command line parameters to pass to the coordinator. See command line parameters for details.
defaultWorkerThreads 0 Default secondary threads for new pipeline submissions.
betaFeatures false Enables optional beta features in a preview mode. Beta features are not intended to be used in production and are subject to change.
auth.enabled false Indicates if authentication should be enabled for the coordinator's REST interface.
persistence.enabled true Whether persistent volumes are enabled on pipelines. Note: checkpointing for recovery requires this be enabled
persistence.storageClassName null Pre-configured storage class name to be used for persistent volumes (if not specified will use the Kubernetes cluster's default storage class)
persistence.controllerCheckpointFreq 5000 Frequency of Controller checkpoints
persistence.workerCheckpointFreq 5000 Frequency of Worker checkpoints
persistence.storage 20Gi Persistent volume storage size
autoscaling.enabled false Indicates if the coordinator should automatically scale based on load.
autoscaling.minReplicas 1 The minimum number of coordinator replicas that should be running.
autoscaling.maxReplicas 1 The maximum number of coordinator replicas that should be running.
autoscaling.targetCPUUtilizationPercentage 80 The maximum amount of CPU a replica should consume before triggering a scale up event.
autoscaling.targetMemoryUtilizationPercentage The maximum amount of memory a replica should consume before triggering a scale up event.
replicaCount 1 If autoscaling is enabled, this is the baseline number of replicas that should be deployed.
affinity hard One of hard, soft, hard-az or soft-az. Hard affinity requires all replicas to be on different nodes. Soft prefers different nodes but does not require it. The az suffix indicates the node allocation must be across different availability zones.
license.onDemand false Indicates if a kc.lic will be used.
license.asFile true Indicates if the license will be provided as a file, otherwise it must be provided as an environment variable.
license.secretName Name of secret with base64 license text either as a file if asFile is enabled or as an environment variable.
discovery.enabled true Whether KX Insights Service Discovery is enabled. Note: Service must be set up separately.
metrics.enabled true Whether KX Insights Monitoring is enabled. Note: Service must be set up separately.
metrics.frequency 5 Polling frequency of monitoring metrics in seconds
metrics.handler.po true Capture metrics for the .z.po handler
metrics.handler.pc true Capture metrics for the .z.pc handler
metrics.handler.wo true Capture metrics for the .z.wo handler
metrics.handler.wc true Capture metrics for the .z.wc handler
metrics.handler.pg true Capture metrics for the .z.pg handler
metrics.handler.ps true Capture metrics for the .z.ps handler
metrics.handler.ws true Capture metrics for the .z.ws handler
metrics.handler.ph true Capture metrics for the .z.ph handler
metrics.handler.pp true Capture metrics for the .z.pp handler
metrics.handler.ts true Capture metrics for the .z.ts handler
logging.endpoints [ "fd://stdout" ] List of endpoints to pass to qlog. Logging will be written to each of the given endpoints
logging.formatMode "json" Desired logging format. (e.g. "test", "json")
logging.routings ALL: INFO, SPCOORD: TRACE Log routing represented by key value pairs, configuring the logging level of for different facilities (e.g )