Operators
Stream Processor operators.
An operator is a first-class building block in the stream processor API. Operators are strung together in a user's program, and provide the core functionality for the Stream Processor.
"operator" terminology
In the KX Stream Processor, the term "operator" can refer to either one of the basic
operators like map
, or filter
, but can also refer to any component of a pipeline. In
other words: all readers/writers/decoders/etc. are operators, but not all operators are
readers/writers/decoders/etc.
kxi.sp.op
aliases in kxi.sp
All operators in this module are available through aliases in kxi.sp
for convenience. For
example, you can access kxi.sp.op.map
as kxi.sp.map
.
kxi.sp.op.AccumulateOperator (Operator)
kxi.sp.op.ApplyOperator (Operator)
kxi.sp.op.FilterOperator (Operator)
kxi.sp.op.MapOperator (Operator)
kxi.sp.op.MergeOperator (Operator)
kxi.sp.op.ReduceOperator (Operator)
kxi.sp.op.SQLOperator (Operator)
kxi.sp.op.SplitOperator (Operator)
kxi.sp.op.UnionOperator (Operator)
kxi.sp.op.OperatorParams (AutoNameEnum)
Specifies a parameter that will be provided to the function you provide to an operator.
kxi.sp.op.OperatorParams.data
Provide the message's data.
kxi.sp.op.OperatorParams.metadata
Provide the message's metadata, which may be required by other SP functions.
kxi.sp.op.OperatorParams.operator
Provide the operator's dictionary, which may be required by other SP functions.
kxi.sp.op.Operator
Stream Processor operator interface.
An operator is a first-class building block in the stream processor API. Operators can be
strung together to form a kxi.sp.Pipeline
instance, which can then be
run.
Pipeline and operator objects can be joined together using the |
operator. Operators can also
be joined to lists/tuples of pipelines or operators.
See Also:
kxi.sp.Pipeline
Attributes:
Name | Type | Description |
---|---|---|
id |
str |
The unique ID of this operator. |
kxi.sp.op.Operator.as_pipeline
property
readonly
A new pipeline that only contains this operator.
kxi.sp.op.Operator.__new__
special
staticmethod
Create and return a new object. See help(type) for accurate signature.
kxi.sp.op.Operator.__init__
special
kxi.sp.op.Operator.__or__
special
kxi.sp.op.Operator.__ror__
special
kxi.sp.op.Operator.__hash__
special
kxi.sp.op.Operator.__repr__
special
kxi.sp.op.accumulate
Aggregates a stream into an accumulator.
Aggregates a stream into an accumulator, which is updated and then emitted for each
incoming batch. The value being emitted is passed to the output
function,
which can modify it before passing it to the next operator. If the accumulator is a
dictionary, it may be necessary to enlist the result in the output function so the next
operator receives a table.
By default, .qsp.accumulate
emits the accumulator, but this value can be transformed
with the output
function. If the accumulator is a dictionary, an output function like
{enlist x}
could be used to emit tables.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
function |
Union[Callable, str] |
An aggregator which takes the metadata, data, and the accumulator, and returns an updated accumulator. |
required |
initial |
Any |
The initial state of the accumulator. The accumulation is performed in q,
and as such this initial value must be convertible to q. This conversion can
be performed manually with |
None |
output |
Union[Callable, str] |
An optional function to transform output before emitting it. It gets passed the value of the accumulator. |
None |
Returns:
Type | Description |
---|---|
Operator |
An |
kxi.sp.op.apply
Apply a function to incoming batches in the stream.
Unlike other operators, apply is asynchronous. To return data from it, you must use
sp.push
to push data through the pipeline when it is ready. Apply is commonly
used with the state API to implement custom windowing, or with the task API.
When apply
is used to buffer data, the onFinish
option should be used to flush the
buffer when the pipeline finishes. It will be called once for every key in the state
including the default key, with the metadata dictionary passed to it containing only the key.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
function |
Union[Callable, str] |
A function that will be called with three arguments: operator, metadata, and data. Operator and metadata can be used to get/set the state of the operator, among other uses. |
required |
on_finish |
Union[Callable, str] |
A function run when the pipeline finishes or is torn down, to flush the buffer. It is passed the operator and metadata. |
None |
Returns:
Type | Description |
---|---|
Operator |
An |
kxi.sp.op.filter
Filter elements out of a batch, or a batch from a stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
function |
Union[Callable, str] |
A predicate function which will be applied to the data that passes through the operator. If the result is a boolean list, then only records that it flags in the batch progress through the stream. If the result is a singular boolean value, then either all records in the batch, or none of them progress through the stream. |
required |
Returns:
Type | Description |
---|---|
Operator |
A |
kxi.sp.op.map
Apply a function to data passing through the operator.
The function provided will be called on every message that passes through the pipeline stream. The map operation can modify the data by performing aggregations, type changes, transformations, etc.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
function |
Union[Callable, str] |
The function that will be applied to the data that passes through the operator. Can be provided either as a Python function, or as a string of q code that defines a q function. |
required |
Returns:
Type | Description |
---|---|
Operator |
A |
kxi.sp.op.merge
Merge two streams using a function to combine them.
Merge can be used to join and enrich streams with other streams, with static data, or to union
two streams into a single stream. See also: union
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
Pipeline |
The separate pipeline that will be merged with. |
required |
function |
Union[Callable, str] |
A function that takes two arguments, the data from each stream, and returns the data from both streams combined into one. |
required |
Returns:
Type | Description |
---|---|
Operator |
A |
kxi.sp.op.reduce
Aggregates partial windows.
A window may include more records than can fit in memory. As such, it may be necessary
to reduce the buffered records into a smaller, aggregated value at regular intervals.
If the window operator uses the count_trigger
option, a partial window will be emitted
when the number of buffered records exceeds the count_trigger
. Partial windows will
also be emitted when a stream goes idle. These partial windows can be aggregated using this
operator. When the window is complete, this operator will emit the result of reducing the
partial windows.
The reduce operator runs a function on each incoming batch to update the accumulator for that window. Each window has a separate accumulator.
For partial windows, such as those emitted by count_trigger
or idle streams,
the accumulator will be updated but not emitted.
When a window is closed, such as when the high-water mark passes the end of the window, the accumulator will be updated for that final batch, and then it will be emitted.
By default, kxi.sp.reduce
emits the accumulator, but this value can be transformed
with the output
function. If the accumulator is a dictionary, an output function like
pykx.q.enlist
could be used to emit tables.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
function |
Union[Callable, str] |
An aggregator which takes the metadata, data, and the accumulator for a given window, and returns an updated accumulator. |
required |
initial |
Any |
The initial state of the accumulator. The accumulation is performed in q,
and as such this initial value must be convertible to q. This conversion can
be performed manually with |
None |
output |
Union[Callable, str] |
An optional function to transform output before emitting it. It gets passed the value of the accumulator. |
None |
Returns:
Type | Description |
---|---|
Operator |
A |
kxi.sp.op.split
Split the current stream.
The split operator allows a single stream to be split into arbitrarily many separate streams for running separate analytics or processing.
Split operators can be explicitly added, but are implicitly added if the same operator
appears as a parent multiple times when resolving the streams given to sp.run
Returns:
Type | Description |
---|---|
Operator |
A |
kxi.sp.op.sql
Perform an SQL query on tabular data in the stream.
The query must:
- conform to ANSI SQL and are limited to the documented supported operations
- reference a special $1 alias in place of the table name
If data in the stream is not table data, an error will be raised. Providing a schema will allow the query to be precompiled enabling faster processing on streaming data.
Queries run in a local worker process
SQL queries are not distributed across workers, and are currently run on each worker's substream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
Union[str, bytes, pykx.wrappers.CharVector] |
An SQL query, which will be performed on tabular data in the stream. |
required |
schema |
Optional[pykx.wrappers.Table] |
An empty table with the same schema as the data. |
None |
kxi.sp.op.union
Unite two streams.
Like merge, but elements from both sides of the union are left as-is.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
Pipeline |
The separate pipeline that will be unified with. |
required |
Returns:
Type | Description |
---|---|
Operator |
A |