Configuration
Configure the Stream Processor for deployment in either Docker or Kubernetes
As a microservice, the Stream Processor can be deployed using Docker or Kubernetes. Configuration is dependent on the deployment environment.
Within Docker, Controller and Worker containers must be configured and managed manually, either with Docker, Docker Compose, or a similar container management system.
Within Kubernetes, deployment and installation is greatly simplified through a Helm chart which installs the Stream Processor Coordinator (a Kubernetes Operator) into a cluster which accepts streaming jobs through a REST API. Being an Operator, the Coordinator will configure and launch all required Kubernetes workloads and resources.
Docker
Running with different configurations within Docker
User code for streaming pipelines can be either
- embedded into the YAML configuration
- mounted in a directory, setting environment variable
KXI_SP_SPEC_PATH
as the path to the.q
or.py
file that defines the pipeline
YAML configuration
Each field of the YAML configuration can be overridden by environment variables. (If all are overridden, the YAML configuration file itself is not needed.)
If present, the configuration must contain an id
field as well as a settings
field, itself containing a minWorkers
field.
id: pipeline-id # A legible way to identify the pipeline to Service Discovery
# (default: code generated id with prefix "pipeline-")
# (env var override: KXI_SP_ID)
settings:
minWorkers: n # The minumum number of worker processes
# needed to be online to start the pipeline
# (default: 1)
# (env var override: KXI_SP_MIN_WORKERS)
Environment variables
Controller
The Controller can be configured with the following optional environment variables:
KXI_SP_ID legible identifier; defaults to pipeline-[0-9a-f]{10} KXI_SP_CONFIG path to YAML configuration file, if used KXI_SP_CHECKPOINT_FREQ frequency of checkpoints (ms): 0 disables; default 5000 KXI_SP_SILENCE_CHECKPOINTS silence warnings when Controller checkpointing disabled KXI_SP_DISABLE_REST disable REST interface KXI_SP_DISABLE_METRICS disable metrics reporting (can reduce incurred latency) KXI_SP_MIN_WORKERS minimum Workers required online to start; default: 1
KXI_CONFIG_FILE if a sidecar is used, path to the sidecar configuration KXI_ERROR_MODE Error trap mode, default 2
KXI_SP_BETA_FEATURES enables optional beta features in preview mode. Note that beta features are not for use in production and are subject to change in a future release; default false
Worker
The Workers can be configured with the following optional environment variables. The pipeline ID will be inherited from the Controller, and will be post-fixed with the Worker unique ID.
KXI_SP_PARENT_HOST host:port for this Worker's Controller (required) KXI_SP_SPEC path to the the pipeline spec file (python or q) (required) KXI_SP_CHECKPOINT_FREQ frequency of checkpoints (ms): 0 disables, default 5000 KXI_SP_REPORTING_FREQ frequency of heartbeats (ms) from Worker to Controller, default 5000 KXI_SP_DISABLE_METRICS disable metrics reporting (can reduce incurred latency) KXI_SP_DISABLE_REST disable REST interface
KXI_RT_LIB path to mounted KX Insights Reliable Transport client KXI_CONFIG_FILE path to the sidecar configuration (if used) KXI_PROTECTED_EXECUTION log name of failing operator in errors 'false' disables KXI_ERROR_MODE error trap mode, default 2 KXI_SP_CONFIG_PATH path to custom mounted configuration maps and secrets specified in a deployment request (optional, only used in Kubernetes deployments)
KXI_SP_BETA_FEATURES enables optional beta features in preview mode. Note that beta features are not for use in production and are subject to change in a future release; default false
Monitoring
Containers within a Stream Processor cluster can report pipeline-specific metrics (throughput and latency) and kdb+-specific metrics to a Monitoring endpoint such as Prometheus for event monitoring and alerting. Monitoring statistics are scraped by a sidecar container which surfaces the scraped statistics for HTTP pull.
If running a scaled pipeline, pipeline performance will be aggregated in the Controller, allowing a holistic view of the pipeline within Monitoring tools.
Monitoring multiple Workers
Since Monitoring requires a sidecar container, if scaling a pipeline to multiple Workers, let only the Controller enable Monitoring. Aggregated Worker statistics will be reported by the Controller.
To add Monitoring to a Stream Processor Controller, add an accompanying kxi-sidecar
container to scrape the Controller.
services:
..
controller-sidecar:
image: registry.dl.kx.com/kxi-sidecar:0.8.0
environment:
- KXI_LOG_CONFIG=/opt/config/qlog.json
- KXI_CONFIG_FILE=/opt/config/controller-sidecar.json
- KDB_LICENSE_B64
volumes:
- ./config:/opt/config
command: -p 8080 # Standard is to expose metrics on port 8080
Create the configuration directory containing the sidecar controller-sidecar.json
configuration file.
{
"connection": ":controller:6000",
"frequencySecs": 5,
"metrics":
{
"enabled":"true",
"frequency": 5,
"handler": {
"pc": true,
"pg": true,
"ph": true,
"po": true,
"pp": true,
"ps": true,
"ts": true,
"wc": true,
"wo": true,
"ws": true
}
}
}
The above configuration file can be used to control the frequency of the sidecar scraping the main container through metrics.frequency
.
It tracks statistics on many of the q event handlers in the .z
namespace.
Adding Prometheus to a Docker Compose
As an example of adding monitoring configuration, we demonstrate Prometheus using the KX Insights Monitoring component.
Prometheus is available for download from Docker Hub and can be pulled with docker pull
. Below we show a sample Prometheus container that exposes the Prometheus browser UI on http://localhost:9000
.
services:
prometheus:
image: prom/prometheus
ports:
- 9000:9090
volumes:
- ./config:/etc/prometheus
- prometheus-data:/prometheus
command: --config.file=/etc/prometheus/prometheus.yml
volumes:
prometheus-data:
The volume prometheus-data:/prometheus
is used to store the scraped data so that they are available after a restart. A configuration file for Prometheus must be mounted into the Docker container and specified in the run command.
This configuration file contains scrape targets, which are the container sidecars we previously configured. In the example below we set the scrape interval and timeout values and specify multiple scrape_config
jobs to scrape data from different sidecar containers in a cluster.
global:
scrape_interval: 10s
scrape_timeout: 5s
scrape_configs:
- job_name: controller
metrics_path: /metrics
static_configs:
- targets:
- 'controller-sidecar:8080' # Point to controller sidecar
- job_name: worker
metrics_path: /metrics
static_configs:
- targets:
- 'worker-sidecar:8080' # Point to a worker-sidecar, if exists
Service Discovery
Containers within a Stream Processor cluster can advertise themselves to different service discovery registries (e.g. Eureka) through the KX Insights Discovery Proxy.
Service Discovery with multiple Workers
Since Service Discovery requires a sidecar container, if scaling a pipeline to multiple Workers, let only the Controller enable Service Discovery.
Follow the same steps as the Monitoring sidecar setup, and add a Discovery configuration field in the sidecar configuration file (KXI_CONFIG_FILE
). The metrics
field can be removed if not using Monitoring as well.
..
"discovery":
{
"registry": ":proxy:5000",
"heartbeatSecs": 30,
"leaseExpirySecs": 90
}
..
The registry
field points to the address of the KXI Discovery Proxy, while heartbeatSecs
and leaseExpirySecs
respectively specify the heartbeat frequency and the time required for an instance to be evicted based on heartbeat failure.
After configuring container sidecars with discovery, a discovery proxy and registry should be added to your Docker Compose deployment.
How to configure a discovery proxy and registry
Kubernetes
A Helm chart has been provided for installing the Stream Processor service into a Kubernetes cluster.
Prerequisites
To run within Kubernetes there are a number of pre-requisites:
helm
should be installedkubectl
should be installed- Cloud vendor CLI installed and authorized to the appropriate Kubernetes cluster
Kdb+ License
To start the Stream Processor service within Kubernetes, a license secret should be installed into the Kubernetes cluster.
First, print the base64 encoded license:
base64 -w 0 $QLIC/kc.lic
Then create and apply a secret containing the encoded license text:
cat kx-license-secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: kx-license-info
type: Opaque
stringData:
license: <base64 license string>
kubectl apply -f kx-license-secret.yaml
Helm
Add the KX Helm chart repository to your helm install:
helm repo add --username <username> --password <password> \
kxi-repo https://nexus.dl.kx.com/repository/kx-insights-charts/
"kxi-repo" has been added to your repositories
Update your helm repository local chart cache:
helm repo update
Hang tight while we grab the latest from your chart repositories...
...Successfully got an update from the "kxi-repo" chart repository
Update Complete. ⎈Happy Helming!⎈
Add a secret to your cluster to allow access to the images:
kubectl create secret docker-registry kx-repo-access \
--docker-username=<username> \
--docker-password=<password> \
--docker-server=registry.dl.kx.com
Note
In the case above the secret kx-repo-access
has been created. When deploying the charts it will be necessary to reference this secret in the imagePullSecrets
within the configuration file.
Configuration file
A configuration file is required to run the Coordinator service. An example standalone configuration file is provided below. See all configuration options for more information and options.
cat spcoord.yaml
license:
secretName: kx-license-info
imagePullSecrets:
- name: kx-repo-access
persistence:
enabled: true
# You will need to provide this argument if persistence is enabled.
# Point this to the name of a created Storage Class for checkpoints.
storageClassName: standard
controllerCheckpointFreq: 10000
workerCheckpointFreq: 5000
# Storage allocated to each worker/controller
storage: 20Gi
discovery:
# If KXI Service Discovery is set up, set this to true
enabled: false
metrics:
# If KXI Monitoring is set up, set this to true
enabled: false
logging:
endpoints: # list of endpoints (can provide multiple)
- fd://stdout
formatMode: json
routings: # Mapping of logging to level
ALL: DEBUG # `ALL` is the default routing
spcoord: TRACE # Set coordinator logs to trace
KUBE: TRACE # Set kubernetes logs to trace
Installing the Coordinator
helm install sp kxi-repo/kxi-sp -f path/to/spcoord.yml
One kxi-sp
install per namespace
Only 1 Coordinator may be installed in a given Kubernetes namespace. If you wish to install another, it must be done in another namespace.
Accessing the Coordinator
To access the Coordinator for the following creation or teardown requests, first port-forward the Coordinator service locally.
kubectl port-forward sp-kxi-sp-0
Configuring and running a streaming job
Assuming a spec.q
file exists locally with a pipeline defined, it's configuration is provided through the REST API when submitting it to the Coordinator.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "kafka-in",
type : "spec",
base : "q",
config : { content: $spec },
settings : {
minWorkers: "1",
maxWorkers: "10",
workerThreads: "4",
workerImage: ""
},
kubeConfig: { "secrets" : ["aws-creds"]},
persistence : {
controller : { class:"standard", size:"25Gi", checkpointFreq: 5000 },
worker : { class:"standard", size:"100Gi", checkpointFreq: 1000 }
}
}' | jq -asR .)"
In the above REST API, the following settings are available as part of the settings
field.
setting | default | description |
---|---|---|
minWorkers |
1 | Minimum required Workers ready for the pipeline to start |
maxWorkers |
10 | Maximum required Workers ready for the pipeline to start |
workerThreads |
1 | Number of secondary threads for Workers |
workerImage |
Custom Worker image to deploy (if building a custom Worker from the .qpk ) |
Configuring Persistence for a streaming job
Persistence configuration can be customized for each deployment of a streaming job. It is configured
with the persistence
field which is composed of a map of two fields, controller
and worker
,
used respectively to configure persistence and checkpointing for the Controller and Worker. For each process
type, the Kubernetes storage class, storage size and checkpoint frequency can be configured.
setting | default | description |
---|---|---|
disabled |
false |
Flag indicating whether to disable persistence for the process type |
class |
Kubernetes cluster default | Kubernetes storage class used to provision persistent disk. Default value can be specified with Helm chart (persistence.storageClass ) |
size |
20Gi |
Storage size allocated for the persistent volume. Default value can be specified with Helm chart (persistence.storage ) |
checkpointFreq |
5000 |
Checkpointing frequency of process type (in milliseconds). Default value can be specified with Helm chart (persistence.controllerCheckpointFreq & persistence.workerCheckpointFreq ) |
Persistence can be disabled per job by explicitly setting the disabled
field to true for the Controller
and or Worker, as shown below:
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat $spec)" \
'{
name : "no-persistence",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
persistence : { controller: { disabled: true }, worker: { disabled: true }}
}' | jq -asR .)"
Note: If persistence is disabled by default by the helm charts, no persistent disk will be deployed to back checkpoints unless all the necessary fields are provided in the deployment request (class, size and checkpoint frequency).
Configuring mounting of custom configuration for a streaming job
In a Kubernetes deployment of the Stream Processor, configuration files that are required for Workers to
execute a streaming job (e.g. credentials) can be mounted with the kubeConfig
field of a deployment
request. These files must be embedded into a Kubernetes Secret or ConfigMap prior to creation of the
pipeline in the same namespace as the Stream Processor's deployment.
For more information about how to create a secret or configuration, please refer to the links below:
https://kubernetes.io/docs/concepts/configuration/configmap/ https://kubernetes.io/docs/concepts/configuration/secret/
The kubeConfig
field is an optional map that contains two fields secrets
and configMaps
which both
contain an array of secret and configMaps
names that are to be mounted in the Worker executing the streaming
job.
Deploying a streaming job from a Python specification
Streaming pipeline specifications, can be written in Python or q. By default if no base
parameter is
provided in the deployment request, it is assumed that the submitted spec is written in q. However if
a user has written a pipeline spec using the SP python API, the user can set the base
deployment
parameter to "py"
as shown in the example below:
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat test.py)" \
'{
name : "test",
type : "spec",
base : "py",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" }
}' | jq -asR .)"
Where test.py
is the following:
from kxi import sp
sp.run(sp.read.from_callback('publish').to_console())
Deploying a streaming job with access to Machine Learning (ML) functionality
In addition to the use of the plugins defined in the API section here, additional ML functionality can be used through deployment of an q-ml
base image. The full set of available ML functions is defined here. Presently the ML functionality can only be deployed using a streaming pipeline specification written in q, the following outlines a sample deployment of a q-ml
base image:
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "test",
type : "spec",
base : "q-ml",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" }
}' | jq -asR .)"
Where spec.q
is the following:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.ml.minMaxScaler[::;.qsp.use ``bufferSize!(`;100)]
.qsp.ml.sequentialKMeans[{select from x};.qsp.use ``k!(`;3)]
.qsp.write.toConsole[]
Uninstalling the Stream Processor Coordinator
To remove the Stream Processor service from your Kubernetes cluster:
1) Teardown all running pipelines
Submit the teardown request to the Coordinator to bring down all pipelines.
curl -X POST localhost:5000/pipelines/teardown
Alternatively, pipelines can be individually removed with:
curl -X POST localhost:5000/pipeline/teardown/<id>
Or by using kubectl
directly on the pipeline name:
kubectl delete rs,pod,svc,cm,servicemonitor,pvc,statefulset -l pipeline=$name
2) Uninstall the Helm chart
helm uninstall sp
Additionally, all Stream Processor workloads will have the following labels to help identify them:
app.kubernetes.io/part-of=stream-processor
All configuration options
The following options are available when configuring the Coordinator service when deploying with Helm.
option | default | description |
---|---|---|
imagePullSecrets |
Arrays of name of secrets with image pull permissions | |
defaultWorkerThreads |
0 |
Default secondary threads for new pipeline submissions |
betaFeatures |
false |
Enables optional beta features in a preview mode. Beta features are not intended to be used in production and are subject to change |
discovery.enabled |
true |
Whether KX Insights Service Discovery is enabled. Note: Service must be set up separately. |
license.user |
License user name (if not using a license secret) | |
license.email |
License email (if not using a license secret) | |
license.secretName |
Name of secret with base64 license text (if not using user/email) | |
metrics.enabled |
true |
Whether KX Insights Monitoring is enabled. Note: Service must be set up separately. |
metrics.frequency |
5 |
Polling frequency of monitoring metrics in seconds |
metrics.handler.po |
true |
Capture metrics for the .z.po handler |
metrics.handler.pc |
true |
Capture metrics for the .z.pc handler |
metrics.handler.wo |
true |
Capture metrics for the .z.wo handler |
metrics.handler.wc |
true |
Capture metrics for the .z.wc handler |
metrics.handler.pg |
true |
Capture metrics for the .z.pg handler |
metrics.handler.ps |
true |
Capture metrics for the .z.ps handler |
metrics.handler.ws |
true |
Capture metrics for the .z.ws handler |
metrics.handler.ph |
true |
Capture metrics for the .z.ph handler |
metrics.handler.pp |
true |
Capture metrics for the .z.pp handler |
metrics.handler.ts |
true |
Capture metrics for the .z.ts handler |
persistence.enabled |
true |
Whether persistent volumes are enabled on pipelines. Note: checkpointing for recovery requires this be enabled |
persistence.storageClassName |
null |
Pre-configured storage class name to be used for persistent volumes (if not specified will use the Kubernetes cluster's default storage class) |
persistence.coordinatorCheckpointFreq |
5000 |
Frequency of Coordinator checkpoints |
persistence.controllerCheckpointFreq |
5000 |
Frequency of Controller checkpoints |
persistence.workerCheckpointFreq |
5000 |
Frequency of Worker checkpoints |
persistence.storage |
20Gi |
Persistent volume storage size |
keycloak.enabled |
true |
Whether Keycloak should be used for authentication of Coordinator REST APIs |
logging.endpoints |
[ "fd://stdout" ] |
List of endpoints to pass to qlog. Logging will be written to each of the given endpoints |
logging.formatMode |
"json" |
Desired logging format. (e.g. "test", "json") |
logging.routings |
ALL: INFO, SPCOORD: TRACE |
Log routing represented by key value pairs, configuring the logging level of for different facilities (e.g ) |