# Operators

Stream Processor operators.

An operator is a first-class building block in the stream processor API. Operators are strung together in a user's program, and provide the core functionality for the Stream Processor.

"operator" terminology

In the KX Stream Processor, the term "operator" can refer to either one of the basic operators like map, or filter, but can also refer to any component of a pipeline. In other words: all readers/writers/decoders/etc. are operators, but not all operators are readers/writers/decoders/etc.

kxi.sp.op aliases in kxi.sp

All operators in this module are available through aliases in kxi.sp for convenience. For example, you can access kxi.sp.op.map as kxi.sp.map.

## kxi.sp.op.OperatorParams

Bases: AutoNameEnum

Specifies a parameter that will be provided to the function you provide to an operator.

### kxi.sp.op.OperatorParams.operator = auto() class-attribute

Provide the operator's dictionary, which may be required by other SP functions.

### kxi.sp.op.OperatorParams.metadata = auto() class-attribute

Provide the message's metadata, which may be required by other SP functions.

### kxi.sp.op.OperatorParams.data = auto() class-attribute

Provide the message's data.

## kxi.sp.op.Operator

Stream Processor operator interface.

An operator is a first-class building block in the stream processor API. Operators can be strung together to form a kxi.sp.Pipeline instance, which can then be run.

Pipeline and operator objects can be joined together using the | operator. Operators can also be joined to lists/tuples of pipelines or operators.

kxi.sp.Pipeline

Attributes:

Name Type Description
id str

The unique ID of this operator.

### kxi.sp.op.Operator.as_pipeline property

A new pipeline that only contains this operator.

## kxi.sp.op.accumulate

Aggregates a stream into an accumulator.

Aggregates a stream into an accumulator, which is updated and then emitted for each incoming batch. The value being emitted is passed to the output function, which can modify it before passing it to the next operator. If the accumulator is a dictionary, it may be necessary to enlist the result in the output function so the next operator receives a table.

By default, .qsp.accumulate emits the accumulator, but this value can be transformed with the output function. If the accumulator is a dictionary, an output function like {enlist x} could be used to emit tables.

Parameters:

Name Type Description Default
function OperatorFunction

An aggregator which takes the metadata, data, and the accumulator, and returns an updated accumulator.

required
initial Any

The initial state of the accumulator. The accumulation is performed in q, and as such this initial value must be convertible to q. This conversion can be performed manually with pykx.K.

None
output OperatorFunction

An optional function to transform output before emitting it. It gets passed the value of the accumulator.

None

Returns:

Type Description
Operator

An accumulate operator, which can be joined to other operators or pipelines.

## kxi.sp.op.apply

Apply a function to incoming batches in the stream.

Unlike other operators, apply is asynchronous. To return data from it, you must use sp.push to push data through the pipeline when it is ready. Apply is commonly used with the state API to implement custom windowing, or with the task API.

When apply is used to buffer data, the onFinish option should be used to flush the buffer when the pipeline finishes. It will be called once for every key in the state including the default key, with the metadata dictionary passed to it containing only the key.

Parameters:

Name Type Description Default
function OperatorFunction

A function that will be called with three arguments: operator, metadata, and data. Operator and metadata can be used to get/set the state of the operator, among other uses.

required
on_finish OperatorFunction

A function run when the pipeline finishes or is torn down, to flush the buffer. It is passed the operator and metadata.

None

Returns:

Type Description
Operator

An apply operator, which can be joined to other operators or pipelines.

## kxi.sp.op.filter

Filter elements out of a batch, or a batch from a stream.

Parameters:

Name Type Description Default
function OperatorFunction

A predicate function which will be applied to the data that passes through the operator. If the result is a boolean list, then only records that it flags in the batch progress through the stream. If the result is a singular boolean value, then either all records in the batch, or none of them progress through the stream.

required

Returns:

Type Description
Operator

A filter operator, which can be joined to other operators or pipelines.

## kxi.sp.op.map

Apply a function to data passing through the operator.

The function provided will be called on every message that passes through the pipeline stream. The map operation can modify the data by performing aggregations, type changes, transformations, etc.

Parameters:

Name Type Description Default
function OperatorFunction

The function that will be applied to the data that passes through the operator. Can be provided either as a Python function, or as a string of q code that defines a q function.

required

Returns:

Type Description
Operator

A map operator, which can be joined to other operators or pipelines.

## kxi.sp.op.merge

Merge two streams using a function to combine them.

Merge can be used to join and enrich streams with other streams, with static data, or to union two streams into a single stream. See also: union

Parameters:

Name Type Description Default
stream Pipeline

The separate pipeline that will be merged with.

required
function OperatorFunction

A function that takes two arguments, the data from each stream, and returns the data from both streams combined into one.

required

Returns:

Type Description
Operator

A merge operator, which can be joined to other operators or pipelines.

## kxi.sp.op.parallel

Applies multiple functions in parallel to a stream.

This operator is a core operator that can perform functions in parallel. This operator allows multiple aggregations to be run over the same data in parallel.

State is not supported

Cannot update multiple states within the same global variable therefore, state cannot be supported within parallel execution.

Parameters:

Name Type Description Default
functions List[OperatorFunction]

A function or functions applied to incoming data.

required
merge Optional[OperatorFunction]

A function run on the outputs of the functions passed in.

None

Returns:

Type Description
Operator

A parallel operator, which can be joined to other operators or pipelines.

## kxi.sp.op.reduce

Aggregates partial windows.

A window may include more records than can fit in memory. As such, it may be necessary to reduce the buffered records into a smaller, aggregated value at regular intervals. If the window operator uses the count_trigger option, a partial window will be emitted when the number of buffered records exceeds the count_trigger. Partial windows will also be emitted when a stream goes idle. These partial windows can be aggregated using this operator. When the window is complete, this operator will emit the result of reducing the partial windows.

The reduce operator runs a function on each incoming batch to update the accumulator for that window. Each window has a separate accumulator.

For partial windows, such as those emitted by count_trigger or idle streams, the accumulator will be updated but not emitted.

When a window is closed, such as when the high-water mark passes the end of the window, the accumulator will be updated for that final batch, and then it will be emitted.

By default, kxi.sp.reduce emits the accumulator, but this value can be transformed with the output function. If the accumulator is a dictionary, an output function like pykx.q.enlist could be used to emit tables.

Parameters:

Name Type Description Default
function OperatorFunction

An aggregator which takes the metadata, data, and the accumulator for a given window, and returns an updated accumulator.

required
initial Any

The initial state of the accumulator. The accumulation is performed in q, and as such this initial value must be convertible to q. This conversion can be performed manually with pykx.K.

None
output OperatorFunction

An optional function to transform output before emitting it. It gets passed the value of the accumulator.

None

Returns:

Type Description
Operator

A reduce operator, which can be joined to other operators or pipelines.

## kxi.sp.op.rolling

Applies a moving-window function to a stream.

This operator is equivalent to sp.map, but for moving window functions such as moving averages or those comparing a vector to a shifted version of itself, such as the difference function.

This operator does not emit a moving window, that can be done with .qsp.window.count. Rather, it maintains a buffer of the last n records, which is prepended to each incoming batch. The results of the function on these prepended elements are dropped, as their values would have already been emitted in an earlier batch.

Warning

Functions that aggregate data to a constant number of data points (example sum) will not work in conjunction with the rolling operator because it needs to have data points in the buffer to provide the correct output.

Parameters:

Name Type Description Default
n int

The size of the buffer.

required
fn OperatorFunction

A function that takes a vector.

required

Returns:

Type Description
Operator

A rolling operator, which can be joined to other operators or pipelines.

## kxi.sp.op.split

Split the current stream.

The split operator allows a single stream to be split into arbitrarily many separate streams for running separate analytics or processing.

Split operators can be explicitly added, but are implicitly added if the same operator appears as a parent multiple times when resolving the streams given to sp.run

Returns:

Type Description
Operator

A split operator, which can be joined to other operators or pipelines.

## kxi.sp.op.sql

Perform an SQL query on tabular data in the stream.

The query must:

If data in the stream is not table data, an error will be raised. Providing a schema will allow the query to be precompiled enabling faster processing on streaming data.

Queries run in a local worker process

SQL queries are not distributed across workers, and are currently run on each worker's substream.

Parameters:

Name Type Description Default
query CharString

An SQL query, which will be performed on tabular data in the stream.

required
schema Optional[kx.Table]

An empty table with the same schema as the data.

None

## kxi.sp.op.udf

Retrieve a User-Defined-Function(UDF) from a defined 'KX_PACKAGE_PATH' package directory

Parameters:

Name Type Description Default
name CharString

Name of the UDF that is to be retrieved

required
package CharString

Name of the package from which to retrieve a UDF

required
version CharString

Version of the package from which to retrieve the UDF

None
params Optional[dict]

Additional optional input information to the UDF passed as the final argument to the function

kx.q('()!()')

Returns:

Type Description
Operator

A function which is to be used as a UDF within a merge, filter, map operator

Examples:

Retrieve a UDF providing all necessary information

>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
# Set path for package retrieval
>>> kx.q.setenv('KX_PACKAGE_PATH', b'/tmp/packages')
# Retrieve the UDF function (in this example the UDF adds 10 to input)
>>> udf = sp.udf('map_udf', 'test', '1.0.0', {'param':10})
# Use the UDF within SP operator
| sp.map(udf)
| sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
'x': np.random.rand(5),
'x1': np.random.rand(5)
})
>>> kx.q('publish', data)
x        x1
-----------------
10.49318 10.39275
10.57852 10.51709
10.08389 10.51598
10.19599 10.40666
10.37564 10.17808
pykx.Identity(pykx.q('::'))


Retrieve a UDF using latest package and default parameters

>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
# Set path for package retrieval
>>> kx.q.setenv('KX_PACKAGE_PATH', b'/tmp/packages')
# Retrieve the UDF function
>>> udf = sp.udf('filter_udf', 'test')
# Use the UDF within SP operator (this UDF filters data for x<0.5)
| sp.filter(udf)
| sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
'x': np.random.rand(5),
'x1': np.random.rand(5)
})
>>> kx.q('publish', data)
x          x1
--------------------
0.07347808 0.7263142
0.3159526  0.9216436
0.3410485  0.1809536
pykx.Identity(pykx.q('::'))


## kxi.sp.op.union

Unite two streams.

Like merge, but elements from both sides of the union are left as-is.

Parameters:

Name Type Description Default
stream Pipeline

The separate pipeline that will be unified with.

required

Returns:

Type Description
Operator

A union operator, which can be joined to other operators or pipelines.