Operators
Stream Processor operators.
An operator is a first-class building block in the stream processor API. Operators are strung together in a user's program, and provide the core functionality for the Stream Processor.
"operator" terminology
In the KX Stream Processor, the term "operator" can refer to either one of the operators in this module, or any component of a pipeline. For instance it could be said that a pipeline is a series of chained operators, even though some of those operators are readers or writers.
Put another way: all readers/writers/decoders/etc. are operators, but not all operators are readers/writers/decoders/etc.
kxi.sp.op
aliases in kxi.sp
All operators in this module are available through aliases in kxi.sp
for convenience. For
example, you can access kxi.sp.op.map
as kxi.sp.map
.
kxi.sp.op.accumulate
Aggregates a stream into an accumulator.
Aggregates a stream into an accumulator, which is updated and then emitted for each
incoming batch. The value being emitted is passed to the output
function,
which can modify it before passing it to the next operator. If the accumulator is a
dictionary, it may be necessary to enlist the result in the output function so the next
operator receives a table.
By default, .qsp.accumulate
emits the accumulator, but this value can be transformed
with the output
function. If the accumulator is a dictionary, an output function like
{enlist x}
could be used to emit tables.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
function |
Union[Callable, str] |
An aggregator which takes the metadata, data, and the accumulator, and returns an updated accumulator. |
required |
initial |
Any |
The initial state of the accumulator. The accumulation is performed in q,
and as such this initial value must be convertible to q. This conversion can
be performed manually with |
None |
output |
Union[Callable, str] |
An optional function to transform output before emitting it. It gets passed the value of the accumulator. |
None |
Returns:
Type | Description |
---|---|
Pipeline |
A pipeline comprised of the |
kxi.sp.op.apply
Apply a function to incoming batches in the stream.
Unlike other operators, apply is asynchronous. To return data from it, you must use
sp.push
to push data through the pipeline when it is ready. Apply is commonly
used with the state API to implement custom windowing, or with the task API.
When apply
is used to buffer data, the onFinish
option should be used to flush the
buffer when the pipeline finishes. It will be called once for every key in the state
including the default key, with the metadata dictionary passed to it containing only the key.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
function |
Union[Callable, str] |
A function that will be called with three arguments: operator, metadata, and data. Operator and metadata can be used to get/set the state of the operator, among other uses. |
required |
on_finish |
Union[Callable, str] |
A function run when the pipeline finishes or is torn down, to flush the buffer. It is passed the operator and metadata. |
None |
Returns:
Type | Description |
---|---|
Pipeline |
A pipeline comprised of the |
kxi.sp.op.filter
Filter elements out of a batch, or a batch from a stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
function |
Union[Callable, str] |
A predicate function which will be applied to the data that passes through the operator. If the result is a boolean list, then only records that it flags in the batch progress through the stream. If the result is a singular boolean value, then either all records in the batch, or none of them progress through the stream. |
required |
Returns:
Type | Description |
---|---|
Pipeline |
A pipeline comprised of the |
kxi.sp.op.map
Apply a function to data passing through the operator.
TODO: flesh out this docstring. Explain how the function will be called with an entire batch of
data, rather than each item/row in the batch, and how that can be a good thing (i.e. vector ops
are more efficient, and easy to do on pykx.K
objects). Tell them that they can iterate
over the data manually if they want/need to.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
function |
Union[Callable, str] |
The function that will be applied to the data that passes through the operator. Can be provided either as a Python function, or as a string of q code that defines a q function. |
required |
Returns:
Type | Description |
---|---|
Pipeline |
A pipeline comprised of the |
kxi.sp.op.merge
Merge two streams using a function to combine them.
Merge can be used to join and enrich streams with other streams, with static data, or to union
two streams into a single stream. See also: union
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
Pipeline |
The separate pipeline that will be merged with. |
required |
function |
Union[Callable, str] |
A function that takes two arguments, the data from each stream, and returns the data from both streams combined into one. |
required |
Returns:
Type | Description |
---|---|
Pipeline |
A pipeline comprised of the |
kxi.sp.op.reduce
Aggregates partial windows.
A window may include more records than can fit in memory. As such, it may be necessary
to reduce the buffered records into a smaller, aggregated value at regular intervals.
If the window operator uses the count_trigger
option, a partial window will be emitted
when the number of buffered records exceeds the count_trigger
. Partial windows will
also be emitted when a stream goes idle. These partial windows can be aggregated using this
operator. When the window is complete, this operator will emit the result of reducing the
partial windows.
The reduce operator runs a function on each incoming batch to update the accumulator for that window. Each window has a separate accumulator.
For partial batches, such as those emitted by count_trigger
or idle streams,
the accumulator will be updated but not emitted.
When a window is closed, such as when the high-water mark passes the end of the window, the accumulator will be updated for that final batch, and then it will be emitted.
By default, kxi.sp.reduce
emits the accumulator, but this value can be transformed
with the output
function. If the accumulator is a dictionary, an output function like
pykx.q.enlist
could be used to emit tables.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
function |
Union[Callable, str] |
An aggregator which takes the metadata, data, and the accumulator for a given window, and returns an updated accumulator. |
required |
initial |
Any |
The initial state of the accumulator. The accumulation is performed in q,
and as such this initial value must be convertible to q. This conversion can
be performed manually with |
None |
output |
Union[Callable, str] |
An optional function to transform output before emitting it. It gets passed the value of the accumulator. |
None |
Returns:
Type | Description |
---|---|
Pipeline |
A pipeline comprised of the |
kxi.sp.op.split
Split the current stream.
The split operator allows a single stream to be split into arbitrarily many separate streams for running separate analytics or processing.
Split operators can be explicitly added, but are implicitly added if the same operator
appears as a parent multiple times when resolving the streams given to sp.run
Returns:
Type | Description |
---|---|
Pipeline |
A pipeline comprised of the |
kxi.sp.op.sql
Perform an SQL query on tabular data in the stream.
The query must:
- conform to ANSI SQL and are limited to the documented supported operations
- reference a special $1 alias in place of the table name
If data in the stream is not table data, an error will be signaled. Providing a schema will allow the query to be precompiled enabling faster processing on streaming data.
Queries run in a local worker process
SQL queries are not distributed across workers, and are currently run on each worker's substream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
Union[str, bytes, pykx.wrappers.CharVector] |
an SQL query, which will be performed on tabular data in the stream. |
required |
schema |
Optional[pykx.wrappers.Table] |
an optional empty table representing the schema of the data. |
None |
kxi.sp.op.union
Unite two streams.
Like merge, but elements from both sides of the union are left as-is.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
Pipeline |
The separate pipeline that will be unified with. |
required |
Returns:
Type | Description |
---|---|
Pipeline |
A pipeline comprised of the |