Stream Processor APIs
This page provides information on how to interact with the Stream Processor.
The Stream Processor has two supported programming interfaces for writing streaming applications, one in q and one in Python.
To interact with the Stream Processor at runtime, refer to the OpenAPI REST interface.
- Coordinator OpenAPI This is the primary interface for interacting
with the Stream Processor. Use the Coordinator interface to deploy and teardown pipelines.
This interface is available using a Kubernetes service name
<release>-kxi-sp:5000
. - Controller OpenAPI The controller interface offers localized feedback on a single pipeline. This interface is only accessible in a microservice deployment or using a port forward in a platform deployment.
- Worker OpenAPI The worker interface allows for introspection of the runtime state of a given worker. This interface is only accessible in a microservice deployment or using a port forward in a platform deployment.
Warning
The Stream Processor prior to 1.4.0 kdb Insights and kdb Insights Enterprise supported the creation of Stream Processors with many-to-one connections for configurations where this was not supported. This has been removed as support for this feature causes issues with determinism in pipelines and should be avoided. To facilitate scenarios where many-to-one connections are required use merge nodes to connect data into a single connection.
Deprecation Policy
Code that is marked as deprecated
is subject to be removed in the next major release. Deprecated
APIs will be maintained for all minor versions of a release but will print a warning indicating
that they are deprecated. All deprecated APIs will indicate a path for upgrade in the relevant
documentation. See the release notes for details on any deprecated
or removed APIs.
Available APIs
Here are the categories of APIs
Category | Summary Usage |
---|---|
Configuring operators | Pass optional arguements into APIs |
General | Manage various pipeline behaviours |
Lifecycle | Manage actions upon various pipeline events |
Operators | Define desired data processing in pipeline |
Readers | Configure entry method of data into pipeline |
Decoders | Use certain decoders against data |
Encoders | Apply certain encoders to data |
Transform | Apply some pre-defined data transformation methods |
Stats | Compute certain statistical meaures |
State | Get or set the state of a pipeline operator |
String Utilities | Apply certain processing methods to strings |
Windows | Divide data into overlapping or non-overlapping buckets |
Writers | Configure storage mehtod of data out of pipeline |
User-Defined Functions | Fetch a custom defined function to use in pipeline |
Machine Learning | Use Classifiction, Clustering, Regression, Feature Creation, Metrics, Custom (Registry) Models and general ML utility functions |
Operator syntax
Pipeline API operators are designed to be chained, as in the examples, in a form that will be familiar to users of libraries such as jQuery.
APIs are executed from left to right (or top to bottom when newlines are added) and are designed to be composed for human readability. For example, in the following case, data would be read from Kafka, transformed through a JSON decoder, windowed and written to an Insights stream.
.qsp.run
.qsp.read.fromKafka[`trades]
.qsp.decode.json[]
.qsp.window.tumbling[00:00:05; `time]
.qsp.write.toStream[]
Implicit last argument
Each .qsp
operator returns an object representing a ‘node’ configuration;
and takes a node configuration as its last argument. That last argument is left implicit in the API documentation: each operator therefore has a rank higher than documented.
Pipeline API operators are invoked as projections on their implicit last arguments and therefore must be applied using bracket notation only, never prefix.
Implicit penultimate argument
Many Pipeline API operators are variadic; most take a penultimate argument of custom configuration options.
If .qsp.foo
is a Pipeline API operator that takes arguments x
and y
and optionally cco
a dictionary of custom configuration options, its true signatures are
.qsp.foo[x;y;node]
.qsp.foo[x;y;cco;node]
To limit duplication in the documentation, neither the cco
nor the node
arguments are documented. The signature would be documented as
.qsp.foo[x;y]
An important consequence is that
.qsp.foo[x;y]
.qsp.foo[x;y;cco]
are both unary projections.
Order of evaluation
Successive Pipeline API operators modify the node configuration and pass it along the chain for eventual evaluation, reversing the apparent evaluation order.
In
prd
{x where x>5}
til 8
q evaluates first til 8
, then the lambda, then prd
, but in
.qsp.run
.qsp.read.fromExpr["refData"]
.qsp.write.toConsole[]
because actual evaluation is handled by .qsp.run
, the table refData
is read before its contents are written to the console.
Pipeline operators are designed to be chained together to create a single pipeline. Operators are
joined using a pipe |
syntax. Each operator has a number of required arguments that can be
provided positionally, any optional arguments must use named arguments.
from kxi import sp
from datetime import timedelta
sp.run(sp.read.from_kafka('trades')
| sp.decode.json()
| sp.window.tumbling(timedelta(seconds=5), 'time')
| sp.write.to_stream())