Configuration
Configure the Stream Processor for deployment in either Docker or Kubernetes
As a microservice, the Stream Processor can be deployed using Docker or Kubernetes. Configuration is dependent on the deployment environment.
Within Docker, Controller and Worker containers must be configured and managed manually, either with Docker, Docker Compose, or a similar container management system.
Within Kubernetes, deployment and installation is greatly simplified through a Helm chart which installs the Stream Processor Coordinator (a Kubernetes Operator) into a cluster which accepts streaming jobs through a REST API. Being an Operator, the Coordinator will configure and launch all required Kubernetes workloads and resources.
Docker
Running with different configurations within Docker
User code for streaming pipelines can be either
- embedded into the YAML configuration
- mounted in a directory, setting environment variable
KXI_SP_SPEC_PATH
as the path to the.q
or.py
file that defines the pipeline
YAML configuration
Each field of the YAML configuration can be overridden by environment variables. (If all are overridden, the YAML configuration file itself is not needed.)
If present, the configuration must contain an id
field as well as a settings
field, itself containing a minWorkers
field.
id: pipeline-id # A legible way to identify the pipeline to Service Discovery
# (default: code generated id with prefix "pipeline-")
# (env var override: KXI_SP_ID)
settings:
minWorkers: n # The minumum number of worker processes
# needed to be online to start the pipeline
# (default: 1)
# (env var override: KXI_SP_MIN_WORKERS)
Environment variables
Controller
The Controller can be configured with the following optional environment variables:
Environment Variable | Default | Description |
---|---|---|
KXI_SP_ID | See description | legible identifier; defaults to pipeline-[0-9a-f]{10} |
KXI_SP_CONFIG | "" | path to YAML configuration file, if used |
KXI_SP_CHECKPOINT_FREQ | "5000" | frequency of checkpoints (ms), 0 disables |
KXI_SP_SILENCE_CHECKPOINTS | "false" | silence warnings when checkpointing disabled |
KXI_SP_DISABLE_METRICS | "true" | disable metrics reporting (can reduce incurred latency) |
KXI_SP_DISABLE_REST | "false" | disable REST interface |
KXI_SP_MIN_WORKERS | "1" | minimum number of Workers required to be online to start |
KXI_CONFIG_FILE | "" | path to the sidecar configuration (if used) |
KXI_ERROR_MODE | "0" | error trap mode |
KXI_SP_BETA_FEATURES | "false" | enables optional beta features in preview mode. Note that beta features are not for use in production and are subject to change in a future release |
Worker
The Workers can be configured with the following optional environment variables. The pipeline ID will be inherited from the Controller, and will be post-fixed with the Worker unique ID.
environment variable | default | description |
---|---|---|
KXI_SP_PARENT_HOST | Required | host:port for this Worker's Controller |
KXI_SP_SPEC | Required | path to the the pipeline spec file (python or q) |
KXI_SP_ORDINAL | "" | integer ordinal for the Worker, each Worker sharing a Controller must have a different ordinal |
KXI_SP_CHECKPOINT_FREQ | "5000" | frequency of checkpoints (ms), 0 disables |
KXI_SP_SILENCE_CHECKPOINTS | "false" | silence warnings when checkpointing disabled |
KXI_SP_REPORTING_FREQ | "5000" | frequency of heartbeats (ms) from Worker to Controller |
KXI_SP_DISABLE_METRICS | "true" | disable metrics reporting (can reduce incurred latency) |
KXI_SP_DISABLE_REST | "false" | disable REST interface |
KXI_SP_EVENT_JOURNAL (Beta) | "true" | enables event journaling, which is used by some readers for replay |
KXI_SP_JOURNAL_DIR (Beta) | "" | path to the directory where event journaling logs should be stored |
KXI_RT_LIB | "" | path to mounted kdb Insights Reliable Transport client |
KXI_RT_EVENT_FATAL | "false" | set to "true" to indicate that any data loss events from RT should cause the worker to error fatally |
KXI_CONFIG_FILE | "" | path to the sidecar configuration (if used) |
KXI_PROTECTED_EXECUTION | "true" | log names of failing operators in errors |
KXI_ERROR_MODE | "0" | error trap mode |
KXI_SP_CONFIG_PATH | "./config/sp" | path to custom mounted configuration maps and secrets specified in a deployment request (optional, only used in Kubernetes deployments) |
KXI_SP_BETA_FEATURES | "false" | enables optional beta features in preview mode. Note that beta features are not for use in production and are subject to change in a future release |
KXI_ALLOW_NONDETERMINISM | "false" | for pipeline workloads that have non-deterministic workloads, enabling non-determinism will turn off deduplication and let any data flow. See determinism for more details. |
KXI_SP_SET_BOUNDED | "" | for pipelines which make use of unbounded readers, this environment variable allows all readers to be considered as bounded readers. Useful for the case where a user is intending to manually finish a pipeline which uses unbounded readers for batch ingest scenarios. Set to "true" to enable this behavior. |
KXI_SP_PARENT_RETRIES | 60 | Number of times to retry connecting to the Controller. |
KXI_SP_PARENT_RETRY_WAIT | 00:00:03 | Time to wait between Controller connection attempts. |
KXI_SP_PARENT_OPEN_TIMEOUT | 00:00:00.500 | hopen timeout for each Controller connection attempt. If this time is exceeded a retry will be triggered. |
KXI_SP_RECONNECT_FREQ | 00:00:05 | Interval between worker attempts to reconnect to the controller after becoming disconnected. |
KXI_SP_RECONNECT_TIMEOUT | 00:05:00 | How long to wait after disconnecting from the controller before giving up and restarting the worker. After disconnecting, the worker will repeatedly attempt to reconnect to the controller. |
KXI_SP_OP_RETRIES | 60 | Number of times a pipeline operator will retry connecting to a process (if it has a requirement to connect to one). It should be noted that this environment variable will be ignored for operators that have retries as a configuration option. |
KXI_SP_OP_RETRY_WAIT | 00:00:03 | Time to wait between process connection attempts within an operator. It should be noted that this environment variable will be ignored for operators that have retryWait as a configuration option. |
KXI_SP_OP_OPEN_TIMEOUT | 00:00:00.500 | hopen timeout for each process connection attempt within an operator. If this time is exceeded a retry will be triggered. |
Monitoring
Containers within a Stream Processor cluster can report pipeline-specific metrics (throughput and latency) and kdb+-specific metrics to a Monitoring endpoint such as Prometheus for event monitoring and alerting. Monitoring statistics are scraped by a sidecar container which surfaces the scraped statistics for HTTP pull.
If running a scaled pipeline, pipeline performance will be aggregated in the Controller, allowing a holistic view of the pipeline within Monitoring tools.
Monitoring multiple Workers
Since Monitoring requires a sidecar container, if scaling a pipeline to multiple Workers, let only the Controller enable Monitoring. Aggregated Worker statistics will be reported by the Controller.
To add Monitoring to a Stream Processor Controller, add an accompanying kxi-sidecar
container to scrape the Controller.
services:
..
controller-sidecar:
image: portal.dl.kx.com/kxi-sidecar:0.8.0
environment:
- KXI_LOG_CONFIG=/opt/config/qlog.json
- KXI_CONFIG_FILE=/opt/config/controller-sidecar.json
- KDB_LICENSE_B64
volumes:
- ./config:/opt/config
command: -p 8080 # Standard is to expose metrics on port 8080
Create the configuration directory containing the sidecar controller-sidecar.json
configuration file.
{
"connection": ":controller:6000",
"frequencySecs": 5,
"metrics":
{
"enabled":"true",
"frequency": 5,
"handler": {
"pc": true,
"pg": true,
"ph": true,
"po": true,
"pp": true,
"ps": true,
"ts": true,
"wc": true,
"wo": true,
"ws": true
}
}
}
The above configuration file can be used to control the frequency of the sidecar scraping the main container through metrics.frequency
.
It tracks statistics on many of the q event handlers in the .z
namespace.
Adding Prometheus to a Docker Compose
As an example of adding monitoring configuration, we demonstrate Prometheus using the kdb Insights Monitoring component.
Prometheus is available for download from Docker Hub and can be pulled with docker pull
. Below we show a sample Prometheus container that exposes the Prometheus browser UI on http://localhost:9000
.
services:
prometheus:
image: prom/prometheus
ports:
- 9000:9090
volumes:
- ./config:/etc/prometheus
- prometheus-data:/prometheus
command: --config.file=/etc/prometheus/prometheus.yml
volumes:
prometheus-data:
The volume prometheus-data:/prometheus
is used to store the scraped data so that they are available after a restart. A configuration file for Prometheus must be mounted into the Docker container and specified in the run command.
This configuration file contains scrape targets, which are the container sidecars we previously configured. In the example below we set the scrape interval and timeout values and specify multiple scrape_config
jobs to scrape data from different sidecar containers in a cluster.
global:
scrape_interval: 10s
scrape_timeout: 5s
scrape_configs:
- job_name: controller
metrics_path: /metrics
static_configs:
- targets:
- 'controller-sidecar:8080' # Point to controller sidecar
- job_name: worker
metrics_path: /metrics
static_configs:
- targets:
- 'worker-sidecar:8080' # Point to a worker-sidecar, if exists
Kubernetes
A Helm chart has been provided for installing the Stream Processor service into a Kubernetes cluster.
Prerequisites
To run within Kubernetes there are a number of pre-requisites:
helm
should be installedkubectl
should be installed- Cloud vendor CLI installed and authorized to the appropriate Kubernetes cluster
Kdb+ License
To start the Stream Processor service within Kubernetes, a license secret should be installed into the Kubernetes cluster.
kubectl create secret generic kxi-license --from-file=license=${QLIC}/k4.lic
Helm
Add the KX Helm chart repository to your helm install:
helm repo add --username <username> --password <password> \
kxi-repo https://nexus.dl.kx.com/repository/kx-insights-charts/
"kxi-repo" has been added to your repositories
Update your helm repository local chart cache:
helm repo update
Hang tight while we grab the latest from your chart repositories...
...Successfully got an update from the "kxi-repo" chart repository
Update Complete. ⎈Happy Helming!⎈
Add a secret to your cluster to allow access to the images:
kubectl create secret docker-registry kx-repo-access \
--docker-username=<username> \
--docker-password=<password> \
--docker-server=portal.dl.kx.com
Note
In the case above the secret kx-repo-access
has been created. When deploying the charts it will be necessary to reference this secret in the imagePullSecrets
within the configuration file.
Configuration file
A configuration file is required to run the Coordinator service. An example standalone configuration file is provided below. See all configuration options for more information and options.
cat spcoord.yaml
license:
secretName: kxi-license
imagePullSecrets:
- name: kx-repo-access
persistence:
enabled: true
# You will need to provide this argument if persistence is enabled.
# Point this to the name of a created Storage Class for checkpoints.
storageClassName: standard
controllerCheckpointFreq: 10000
workerCheckpointFreq: 5000
# Storage allocated to each worker/controller
storage: 20Gi
discovery:
# If KXI Service Discovery is set up, set this to true
enabled: false
metrics:
# If KXI Monitoring is set up, set this to true
enabled: false
logging:
endpoints: # list of endpoints (can provide multiple)
- fd://stdout
formatMode: json
routings: # Mapping of logging to level
ALL: DEBUG # `ALL` is the default routing
spcoord: TRACE # Set coordinator logs to trace
KUBE: TRACE # Set kubernetes logs to trace
Installing the Coordinator
helm install sp kxi-repo/kxi-sp -f path/to/spcoord.yml
One kxi-sp
install per namespace
Only 1 Coordinator may be installed in a given Kubernetes namespace. If you wish to install another, it must be done in another namespace.
Accessing the Coordinator
To access the Coordinator for the following creation or teardown requests, first port-forward the Coordinator service locally.
kubectl port-forward sp-kxi-sp-0
Configuring and running a streaming job
Assuming a spec.q
file exists locally with a pipeline defined, it's configuration is provided through the REST API when submitting it to the Coordinator.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "kafka-in",
type : "spec",
base : "q",
config : { content: $spec },
settings : {
minWorkers: "1",
maxWorkers: "10",
workerThreads: "4",
workerImage: ""
},
kubeConfig: { "secrets" : ["aws-creds"]},
persistence : {
controller : { class:"standard", size:"25Gi", checkpointFreq: 5000 },
worker : { class:"standard", size:"100Gi", checkpointFreq: 1000 }
}
}' | jq -asR .)"
In the above REST API, the following settings are available as part of the settings
field.
setting | default | description |
---|---|---|
minWorkers |
1 | Minimum required Workers ready for the pipeline to start |
maxWorkers |
10 | Maximum required Workers ready for the pipeline to start |
workerThreads |
0 | Number of secondary threads for Workers |
workerImage |
Custom Worker image to deploy (if building a custom Worker from the .qpk ) |
Configuring Persistence for a streaming job
Persistence configuration can be customized for each deployment of a streaming job. It is configured
with the persistence
field which is composed of a map of two fields, controller
and worker
,
used respectively to configure persistence and checkpointing for the Controller and Worker. For each process
type, the Kubernetes storage class, storage size and checkpoint frequency can be configured.
setting | default | description |
---|---|---|
disabled |
false |
Flag indicating whether to disable persistence for the process type |
class |
Kubernetes cluster default | Kubernetes storage class used to provision persistent disk. Default value can be specified with Helm chart (persistence.storageClass ) |
size |
20Gi |
Storage size allocated for the persistent volume. Default value can be specified with Helm chart (persistence.storage ) |
checkpointFreq |
5000 |
Checkpointing frequency of process type (in milliseconds). Default value can be specified with Helm chart (persistence.controllerCheckpointFreq & persistence.workerCheckpointFreq ) |
Persistence can be disabled per job by explicitly setting the disabled
field to true for the Controller
and or Worker, as shown below:
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat $spec)" \
'{
name : "no-persistence",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
persistence : { controller: { disabled: true }, worker: { disabled: true }}
}' | jq -asR .)"
Note: If persistence is disabled by default by the helm charts, no persistent disk will be deployed to back checkpoints unless all the necessary fields are provided in the deployment request (class, size and checkpoint frequency).
Increasing the size of an existing persistent volume
Persistence for the SP in Kubernetes is accomplished using PersistentVolumeClaims (PVCs), which can have their size expanded after creation. To resize a PVC, the PVC's StorageClass must be set to allow volume expansion. If the StorageClass is correctly configured, and the PVC is of a type that supports volume expansion, you can expand the size of the persistent volume by editing the PVC to have a larger size. This can be done using a tool such as kubectl
or k9s
. For a more detailed explanation of the process of expanding PVCs, see the official Kubernetes documentation on Expanding Persistent Volume Claims.
Worker pipelines can be found using label selectors. The label selectors you'll need to use to find all Worker PVCs for a specific pipeline are sp.kx.com/pipeline=<pipeline ID>
and sp.kx.com/role=worker
.
For example, to list the Worker PVCs for a pipeline with ID spcoord-my-pipeline
, you could use the following kubectl
command:
kubectl get pvc -l sp.kx.com/pipeline=spcoord-my-pipeline,sp.kx.com/role=worker
Configuring mounting of custom configuration for a streaming job
In a Kubernetes deployment of the Stream Processor, configuration files that are required for Workers to
execute a streaming job (e.g. credentials) can be mounted with the kubeConfig
field of a deployment
request. These files must be embedded into a Kubernetes Secret or ConfigMap prior to creation of the
pipeline in the same namespace as the Stream Processor's deployment.
For more information about how to create a secret or configuration, please refer to the links below:
https://kubernetes.io/docs/concepts/configuration/configmap/
https://kubernetes.io/docs/concepts/configuration/secret/
The kubeConfig
field is an optional map that contains two fields secrets
and configMaps
which both
contain an array of secret and configMaps
names that are to be mounted in the Worker executing the streaming
job.
Deploying a streaming job from a Python specification
Streaming pipeline specifications, can be written in Python or q. By default if no base
parameter is
provided in the deployment request, it is assumed that the submitted spec is written in q. However if
a user has written a pipeline spec using the SP python API, the user can set the base
deployment
parameter to "py"
as shown in the example below:
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat test.py)" \
'{
name : "test",
type : "spec",
base : "py",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" }
}' | jq -asR .)"
Where test.py
is the following:
from kxi import sp
sp.run(sp.read.from_callback('publish').to_console())
Deploying a streaming job with access to Machine Learning (ML) functionality
In addition to the use of the plugins defined in the API section here, additional ML functionality can be used through deployment of an q-ml
base image. The full set of available ML functions is defined here. Presently the ML functionality can only be deployed using a streaming pipeline specification written in q, the following outlines a sample deployment of a q-ml
base image:
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "test",
type : "spec",
base : "q-ml",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" }
}' | jq -asR .)"
Where spec.q
is the following:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.ml.minMaxScaler[::;.qsp.use ``bufferSize!(`;100)]
.qsp.ml.sequentialKMeans[{select from x};.qsp.use ``k!(`;3)]
.qsp.write.toConsole[]
Uninstalling the Stream Processor Coordinator
To remove the Stream Processor service from your Kubernetes cluster:
1) Teardown all running pipelines
Submit the teardown request to the Coordinator to bring down all pipelines.
curl -X POST localhost:5000/pipelines/teardown
Alternatively, pipelines can be individually removed with:
curl -X POST localhost:5000/pipeline/teardown/<id>
2) Uninstall the Helm chart
helm uninstall sp
Additionally, all Stream Processor workloads will have the following labels to help identify them:
app.kubernetes.io/part-of=stream-processor
All configuration options
The following options are available when configuring the Coordinator service when deploying with Helm.
option | default | description |
---|---|---|
image.repository |
portal.dl.kx.com |
The URL of the image repository for the coordinator image. |
image.component |
kxi-sp-coordinator |
The name of the coordinator image. |
image.pullPolicy |
IfNotPresent |
The Kubernetes image pull policy for this image. |
ctlImage.repository |
portal.dl.kx.com |
The URL of the image repository for the default controller image. |
ctlImage.component |
kxi-sp-controller |
The name of the controller image. |
ctlImage.pullPolicy |
IfNotPresent |
The Kubernetes image pull policy for this image. |
workImage.repository |
portal.dl.kx.com |
The URL of the image repository for the default worker image. |
workImage.component |
kxi-sp-worker |
The name of the worker image. |
workImage.pullPolicy |
IfNotPresent |
The Kubernetes image pull policy for this image. |
mlImage.repository |
portal.dl.kx.com |
The URL of the image repository for the default machine learning worker image. |
mlImage.component |
kxi-ml |
The name of the machine learning worker image. |
mlImage.pullPolicy |
IfNotPresent |
The Kubernetes image pull policy for this image. |
pyImage.repository |
portal.dl.kx.com |
The URL of the image repository for the default Python worker image. |
pyImage.component |
kxi-sp-python |
The name of the Python worker image. |
pyImage.pullPolicy |
IfNotPresent |
The Kubernetes image pull policy for this image. |
imagePullSecrets |
[] |
Arrays of name of secrets with image pull permissions. |
env |
{} |
Additional environment variables to add to the coordinator. |
debug |
false |
Enables interactivity for the coordinator. |
port |
5000 |
The port that the coordinator will bind to and serve its REST interface from. |
instanceParam |
{ "g": 1, "t": 1000 } |
Command line parameters to pass to the coordinator. See command line parameters for details. |
defaultWorkerThreads |
0 |
Default secondary threads for new pipeline submissions. |
betaFeatures |
false |
Enables optional beta features in a preview mode. Beta features are not intended to be used in production and are subject to change. |
auth.enabled |
false |
Indicates if authentication should be enabled for the coordinator's REST interface. |
persistence.enabled |
true |
Whether persistent volumes are enabled on pipelines. Note: checkpointing for recovery requires this be enabled |
persistence.storageClassName |
null |
Pre-configured storage class name to be used for persistent volumes (if not specified will use the Kubernetes cluster's default storage class) |
persistence.controllerCheckpointFreq |
5000 |
Frequency of Controller checkpoints |
persistence.workerCheckpointFreq |
5000 |
Frequency of Worker checkpoints |
persistence.storage |
20Gi |
Persistent volume storage size |
autoscaling.enabled |
false |
Indicates if the coordinator should automatically scale based on load. |
autoscaling.minReplicas |
1 |
The minimum number of coordinator replicas that should be running. |
autoscaling.maxReplicas |
1 |
The maximum number of coordinator replicas that should be running. |
autoscaling.targetCPUUtilizationPercentage |
80 |
The maximum amount of CPU a replica should consume before triggering a scale up event. |
autoscaling.targetMemoryUtilizationPercentage |
The maximum amount of memory a replica should consume before triggering a scale up event. | |
replicaCount |
1 |
If autoscaling is enabled, this is the baseline number of replicas that should be deployed. |
affinity |
hard |
One of hard , soft , hard-az or soft-az . Hard affinity requires all replicas to be on different nodes. Soft prefers different nodes but does not require it. The az suffix indicates the node allocation must be across different availability zones. |
license.onDemand |
false |
Indicates if a kc.lic will be used. |
license.asFile |
true |
Indicates if the license will be provided as a file, otherwise it must be provided as an environment variable. |
license.secretName |
Name of secret with base64 license text either as a file if asFile is enabled or as an environment variable. |
|
discovery.enabled |
true |
Whether kdb Insights Service Discovery is enabled. Note: Service must be set up separately. |
metrics.enabled |
true |
Whether kdb Insights Monitoring is enabled. Note: Service must be set up separately. |
metrics.frequency |
5 |
Polling frequency of monitoring metrics in seconds |
metrics.handler.po |
true |
Capture metrics for the .z.po handler |
metrics.handler.pc |
true |
Capture metrics for the .z.pc handler |
metrics.handler.wo |
true |
Capture metrics for the .z.wo handler |
metrics.handler.wc |
true |
Capture metrics for the .z.wc handler |
metrics.handler.pg |
true |
Capture metrics for the .z.pg handler |
metrics.handler.ps |
true |
Capture metrics for the .z.ps handler |
metrics.handler.ws |
true |
Capture metrics for the .z.ws handler |
metrics.handler.ph |
true |
Capture metrics for the .z.ph handler |
metrics.handler.pp |
true |
Capture metrics for the .z.pp handler |
metrics.handler.ts |
true |
Capture metrics for the .z.ts handler |
logging.endpoints |
[ "fd://stdout" ] |
List of endpoints to pass to qlog. Logging will be written to each of the given endpoints |
logging.formatMode |
"json" |
Desired logging format. (e.g. "test", "json") |
logging.routings |
ALL: INFO, SPCOORD: TRACE |
Log routing represented by key value pairs, configuring the logging level of for different facilities (e.g ) |