Skip to content

Stream Processor APIs

This page provides information on how to interact with the Stream Processor.

The Stream Processor has two supported programming interfaces for writing streaming applications, one in q and one in Python.

To interact with the Stream Processor at runtime, refer to the OpenAPI REST interface.

  • Coordinator OpenAPI This is the primary interface for interacting with the Stream Processor. Use the Coordinator interface to deploy and teardown pipelines. This interface is available using a Kubernetes service name <release>-kxi-sp:5000.
  • Controller OpenAPI The controller interface offers localized feedback on a single pipeline. This interface is only accessible in a microservice deployment or using a port forward in a platform deployment.
  • Worker OpenAPI The worker interface allows for introspection of the runtime state of a given worker. This interface is only accessible in a microservice deployment or using a port forward in a platform deployment.

Warning

The Stream Processor prior to 1.4.0 kdb Insights and kdb Insights Enterprise supported the creation of Stream Processors with many-to-one connections for configurations where this was not supported. This has been removed as support for this feature causes issues with determinism in pipelines and should be avoided. To facilitate scenarios where many-to-one connections are required use merge nodes to connect data into a single connection.

Deprecation Policy

Code that is marked as deprecated is subject to be removed in the next major release. Deprecated APIs will be maintained for all minor versions of a release but will print a warning indicating that they are deprecated. All deprecated APIs will indicate a path for upgrade in the relevant documentation. See the release notes for details on any deprecated or removed APIs.

Available APIs

Here are the categories of APIs

Category Summary Usage
Configuring operators Pass optional arguements into APIs
General Manage various pipeline behaviours
Lifecycle Manage actions upon various pipeline events
Operators Define desired data processing in pipeline
Readers Configure entry method of data into pipeline
Decoders Use certain decoders against data
Encoders Apply certain encoders to data
Transform Apply some pre-defined data transformation methods
Stats Compute certain statistical meaures
State Get or set the state of a pipeline operator
String Utilities Apply certain processing methods to strings
Windows Divide data into overlapping or non-overlapping buckets
Writers Configure storage mehtod of data out of pipeline
User-Defined Functions Fetch a custom defined function to use in pipeline
Machine Learning Use Classifiction, Clustering, Regression, Feature Creation, Metrics, Custom (Registry) Models and general ML utility functions

Operator syntax

Pipeline API operators are designed to be chained, as in the examples, in a form that will be familiar to users of libraries such as jQuery.

APIs are executed from left to right (or top to bottom when newlines are added) and are designed to be composed for human readability. For example, in the following case, data would be read from Kafka, transformed through a JSON decoder, windowed and written to an Insights stream.

.qsp.run
    .qsp.read.fromKafka[`trades]
    .qsp.decode.json[]
    .qsp.window.tumbling[00:00:05; `time]
    .qsp.write.toStream[]

Implicit last argument

Each .qsp operator returns an object representing a ‘node’ configuration; and takes a node configuration as its last argument. That last argument is left implicit in the API documentation: each operator therefore has a rank higher than documented.

Pipeline API operators are invoked as projections on their implicit last arguments and therefore must be applied using bracket notation only, never prefix.

Implicit penultimate argument

Many Pipeline API operators are variadic; most take a penultimate argument of custom configuration options.

If .qsp.foo is a Pipeline API operator that takes arguments x and y and optionally cco a dictionary of custom configuration options, its true signatures are

.qsp.foo[x;y;node]
.qsp.foo[x;y;cco;node]

To limit duplication in the documentation, neither the cco nor the node arguments are documented. The signature would be documented as

.qsp.foo[x;y]

An important consequence is that

.qsp.foo[x;y]
.qsp.foo[x;y;cco]

are both unary projections.

Order of evaluation

Successive Pipeline API operators modify the node configuration and pass it along the chain for eventual evaluation, reversing the apparent evaluation order.

In

prd
{x where x>5}
til 8

q evaluates first til 8, then the lambda, then prd, but in

.qsp.run
.qsp.read.fromExpr["refData"]
.qsp.write.toConsole[]

because actual evaluation is handled by .qsp.run, the table refData is read before its contents are written to the console.

Pipeline operators are designed to be chained together to create a single pipeline. Operators are joined using a pipe | syntax. Each operator has a number of required arguments that can be provided positionally, any optional arguments must use named arguments.

from kxi import sp
from datetime import timedelta

sp.run(sp.read.from_kafka('trades')
    | sp.decode.json()
    | sp.window.tumbling(timedelta(seconds=5), 'time')
    | sp.write.to_stream())