Skip to content

Operators

Stream Processor operators.

An operator is a first-class building block in the stream processor API. Operators are strung together in a user's program, and provide the core functionality for the Stream Processor.

"operator" terminology

In the KX Stream Processor, the term "operator" can refer to either one of the operators in this module, or any component of a pipeline. For instance it could be said that a pipeline is a series of chained operators, even though some of those operators are readers or writers.

Put another way: all readers/writers/decoders/etc. are operators, but not all operators are readers/writers/decoders/etc.

kxi.sp.op aliases in kxi.sp

All operators in this module are available through aliases in kxi.sp for convenience. For example, you can access kxi.sp.op.map as kxi.sp.map.

kxi.sp.op.accumulate

Aggregates a stream into an accumulator.

Aggregates a stream into an accumulator, which is updated and then emitted for each incoming batch. The value being emitted is passed to the output function, which can modify it before passing it to the next operator. If the accumulator is a dictionary, it may be necessary to enlist the result in the output function so the next operator receives a table.

By default, .qsp.accumulate emits the accumulator, but this value can be transformed with the output function. If the accumulator is a dictionary, an output function like {enlist x} could be used to emit tables.

Parameters:

Name Type Description Default
function Union[Callable, str]

An aggregator which takes the metadata, data, and the accumulator, and returns an updated accumulator.

required
initial Any

The initial state of the accumulator. The accumulation is performed in q, and as such this initial value must be convertible to q. This conversion can be performed manually with pykx.K.

None
output Union[Callable, str]

An optional function to transform output before emitting it. It gets passed the value of the accumulator.

None

Returns:

Type Description
Pipeline

A pipeline comprised of the accumulate operator, which can be joined to other pipelines.

kxi.sp.op.apply

Apply a function to incoming batches in the stream.

Unlike other operators, apply is asynchronous. To return data from it, you must use sp.push to push data through the pipeline when it is ready. Apply is commonly used with the state API to implement custom windowing, or with the task API.

When apply is used to buffer data, the onFinish option should be used to flush the buffer when the pipeline finishes. It will be called once for every key in the state including the default key, with the metadata dictionary passed to it containing only the key.

Parameters:

Name Type Description Default
function Union[Callable, str]

A function that will be called with three arguments: operator, metadata, and data. Operator and metadata can be used to get/set the state of the operator, among other uses.

required
on_finish Union[Callable, str]

A function run when the pipeline finishes or is torn down, to flush the buffer. It is passed the operator and metadata.

None

Returns:

Type Description
Pipeline

A pipeline comprised of the apply operator, which can be joined to other pipelines.

kxi.sp.op.filter

Filter elements out of a batch, or a batch from a stream.

Parameters:

Name Type Description Default
function Union[Callable, str]

A predicate function which will be applied to the data that passes through the operator. If the result is a boolean list, then only records that it flags in the batch progress through the stream. If the result is a singular boolean value, then either all records in the batch, or none of them progress through the stream.

required

Returns:

Type Description
Pipeline

A pipeline comprised of the filter operator, which can be joined to other pipelines.

kxi.sp.op.map

Apply a function to data passing through the operator.

TODO: flesh out this docstring. Explain how the function will be called with an entire batch of data, rather than each item/row in the batch, and how that can be a good thing (i.e. vector ops are more efficient, and easy to do on pykx.K objects). Tell them that they can iterate over the data manually if they want/need to.

Parameters:

Name Type Description Default
function Union[Callable, str]

The function that will be applied to the data that passes through the operator. Can be provided either as a Python function, or as a string of q code that defines a q function.

required

Returns:

Type Description
Pipeline

A pipeline comprised of the map operator, which can be joined to other pipelines.

kxi.sp.op.merge

Merge two streams using a function to combine them.

Merge can be used to join and enrich streams with other streams, with static data, or to union two streams into a single stream. See also: union

Parameters:

Name Type Description Default
stream Pipeline

The separate pipeline that will be merged with.

required
function Union[Callable, str]

A function that takes two arguments, the data from each stream, and returns the data from both streams combined into one.

required

Returns:

Type Description
Pipeline

A pipeline comprised of the merge operator, which can be joined to other pipelines.

kxi.sp.op.reduce

Aggregates partial windows.

A window may include more records than can fit in memory. As such, it may be necessary to reduce the buffered records into a smaller, aggregated value at regular intervals. If the window operator uses the count_trigger option, a partial window will be emitted when the number of buffered records exceeds the count_trigger. Partial windows will also be emitted when a stream goes idle. These partial windows can be aggregated using this operator. When the window is complete, this operator will emit the result of reducing the partial windows.

The reduce operator runs a function on each incoming batch to update the accumulator for that window. Each window has a separate accumulator.

For partial batches, such as those emitted by count_trigger or idle streams, the accumulator will be updated but not emitted.

When a window is closed, such as when the high-water mark passes the end of the window, the accumulator will be updated for that final batch, and then it will be emitted.

By default, kxi.sp.reduce emits the accumulator, but this value can be transformed with the output function. If the accumulator is a dictionary, an output function like pykx.q.enlist could be used to emit tables.

Parameters:

Name Type Description Default
function Union[Callable, str]

An aggregator which takes the metadata, data, and the accumulator for a given window, and returns an updated accumulator.

required
initial Any

The initial state of the accumulator. The accumulation is performed in q, and as such this initial value must be convertible to q. This conversion can be performed manually with pykx.K.

None
output Union[Callable, str]

An optional function to transform output before emitting it. It gets passed the value of the accumulator.

None

Returns:

Type Description
Pipeline

A pipeline comprised of the reduce operator, which can be joined to other pipelines.

kxi.sp.op.split

Split the current stream.

The split operator allows a single stream to be split into arbitrarily many separate streams for running separate analytics or processing.

Split operators can be explicitly added, but are implicitly added if the same operator appears as a parent multiple times when resolving the streams given to sp.run

Returns:

Type Description
Pipeline

A pipeline comprised of the split operator, which can be joined to other pipelines.

kxi.sp.op.sql

Perform an SQL query on tabular data in the stream.

The query must:

If data in the stream is not table data, an error will be signaled. Providing a schema will allow the query to be precompiled enabling faster processing on streaming data.

Queries run in a local worker process

SQL queries are not distributed across workers, and are currently run on each worker's substream.

Parameters:

Name Type Description Default
query Union[str, bytes, pykx.wrappers.CharVector]

an SQL query, which will be performed on tabular data in the stream.

required
schema Optional[pykx.wrappers.Table]

an optional empty table representing the schema of the data.

None

kxi.sp.op.union

Unite two streams.

Like merge, but elements from both sides of the union are left as-is.

Parameters:

Name Type Description Default
stream Pipeline

The separate pipeline that will be unified with.

required

Returns:

Type Description
Pipeline

A pipeline comprised of the union operator, which can be joined to other pipelines.