kxi.sp.op
Stream Processor operators.
An operator is a first-class building block in the stream processor API. Operators are strung together in a user's program, and provide the core functionality for the Stream Processor.
Note: "operator" terminology
In the KX Stream Processor, the term "operator" can refer to either one of the basic
operators like map
, or filter
, but can also refer to any component of a pipeline. In
other words: all readers/writers/decoders/etc. are operators, but not all operators are
readers/writers/decoders/etc.
Note: kxi.sp.op
aliases in kxi.sp
All operators in this module are available through aliases in kxi.sp
for convenience. For
example, you can access kxi.sp.op.map
as kxi.sp.map
.
OperatorParams Objects
class OperatorParams(AutoNameEnum)
Specifies a parameter that will be provided to the function you provide to an operator.
operator
Provide the operator's dictionary, which may be required by other SP functions.
metadata
Provide the message's metadata, which may be required by other SP functions.
data
Provide the message's data.
Operator Objects
class Operator()
Stream Processor operator interface.
An operator is a first-class building block in the stream processor API. Operators can be
strung together to form a kxi.sp.Pipeline
instance,
which can then be run.
Pipeline and operator objects can be joined together using the |
operator. Operators can also
be joined to lists/tuples of pipelines or operators.
See Also:
Attributes:
id
str - The unique ID of this operator.
as_pipeline
@property
def as_pipeline()
A new pipeline that only contains this operator.
accumulate
@Operator
def accumulate(function: OperatorFunction,
initial: Any = None,
output: OperatorFunction = None) -> Operator
Aggregates a stream into an accumulator.
Aggregates a stream into an accumulator, which is updated and then emitted for each
incoming batch. The value being emitted is passed to the output
function,
which can modify it before passing it to the next operator. If the accumulator is a
dictionary, it may be necessary to enlist the result in the output function so the next
operator receives a table.
By default, .qsp.accumulate
emits the accumulator, but this value can be transformed
with the output
function. If the accumulator is a dictionary, an output function like
{enlist x}
could be used to emit tables.
Arguments:
function
- An aggregator which takes the metadata, data, and the accumulator, and returns an updated accumulator.initial
- The initial state of the accumulator. The accumulation is performed in q, and as such this initial value must be convertible to q. This conversion can be performed manually withpykx.K
.output
- An optional function to transform output before emitting it. It gets passed the value of the accumulator.
Returns:
An accumulate
operator, which can be joined to other operators or pipelines.
>>> from kxi import sp
>>> import pykx as kx
>>> def accumulator(md, data, acc):
return acc + sum(data)
>>> def output(data):
return data * 10
>>> sp.run(sp.read.from_callback('publish')
| sp.accumulate(accumulator, 100, output)
| sp.write.to_variable('out'))
>>> kx.q('publish',[1,2,3])
>>> kx.q('publish',[4,5,6])
>>> kx.q('out')
1060 1210
apply
@Operator
def apply(
function: OperatorFunction,
*,
on_finish: Optional[OperatorFunction] = None,
state: Any = None,
params: Optional[dict] = kx.q('()!()')) -> Operator
Apply a function to incoming batches in the stream.
Unlike other operators, apply is asynchronous. To return data from it, you must use
sp.push
to push data through the pipeline when it is ready. Apply
is commonly used with the state API to implement custom windowing, or with the task API.
When apply
is used to buffer data, the onFinish
option should be used to flush the
buffer when the pipeline finishes. It will be called once for every key in the state
including the default key, with the metadata dictionary passed to it containing only the key.
Arguments:
function
- A function that will be called with three arguments: operator, metadata, and data. Operator and metadata can be used to get/set the state of the operator, among other uses.on_finish
- A function run when the pipeline finishes or is torn down, to flush the buffer. It is passed the operator and metadata.state
- The initial state.params
- The arguments to pass to function.
Returns:
An apply
operator, which can be joined to other operators or pipelines.
>>> from kxi import sp
>>> import pykx as kx
>>> from itertools import groupby
>>> def split_list(lst):
return [list(group) for key, group in groupby(lst, lambda x: x == 1) if not key]
>>> data = [1, 2, 3, 1, 4, 5, 6, 1, 7, 8, 9]
>>> sp.run(sp.read.from_callback('publish')
| sp.apply(lambda op, md, data: [sp.push(op, md, sublist)
for sublist in split_list(data)])
| sp.map(lambda x: [x])
| sp.write.to_variable('out'))
>>> kx.q('publish', data)
>>> kx.q('out')
2 3
4 5 6
7 8 9
filter
@Operator
def filter(
function: OperatorFunction,
dropEmptyBatches: bool = False,
allowPartials: bool = True,
state: Any = None,
params: Optional[dict] = kx.q('()!()')) -> Operator
Filter elements out of a batch, or a batch from a stream.
Arguments:
function
- A predicate function which will be applied to the data that passes through the operator. If the result is a boolean list, then only records that it flags in the batch progress through the stream. If the result is a singular boolean value, then either all records in the batch, or none of them progress through the stream.dropEmptyBatches
- 'True' to only emit non-empty batches.allowPartials
-True
indicates this filter operation can handle partial batches of data. See below for more details on partial batches.state
- The initial state.params
- The arguments to pass to function.
Returns:
A filter
operator, which can be joined to other operators or pipelines.
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> def example_filter(data):
return [i for i in 'sale' == data['transaction']]
>>> sp.run(sp.read.from_callback('publish')
| sp.op.filter(example_filter)
| sp.write.to_variable('out'))
>>> data = pd.DataFrame({
'transaction': ['sale', 'purchase', 'sale', 'sale', 'return', 'purchase'],
'price': [1, 2, 3, 3, 3, 4]
})
>>> kx.q('publish', data)
>>> kx.q('out')
transaction price
-----------------
sale 1
sale 3
sale 3
key_by
@Operator
def key_by(field: Union[str, int, Callable],
allowPartials: bool = True) -> Operator
(Beta Feature)Add a key to a stream to partition data.
Notes:
To enable beta features, set the environment variable KXI_SP_BETA_FEATURES
to
true
.
Arguments:
field
- The method of keying the stream. In the first instance we can pass a string to designate a symbol column on which to partition the data. Otherwise, an integer or callable can be supplied.allowPartials
-True
indicates this filter operation can handle partial batches of data. See below for more details on partial batches.
Returns:
A key_by
operator, which can be joined to other operators or pipelines.
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish')
| sp.op.key_by('transaction')
| sp.write.to_variable('out'))
>>> data = pd.DataFrame({
'transaction': ['sale', 'purchase', 'sale', 'sale', 'return', 'purchase'],
'price': [1, 2, 3, 3, 3, 4]
})
>>> kx.q('publish', data)
>>> kx.q('out')
transaction price
-----------------
sale 1
sale 3
sale 3
purchase 2
purchase 4
return 3
map
@Operator
def map(function: OperatorFunction, allowPartials: bool = True) -> Operator
Apply a function to data passing through the operator.
The function provided will be called on every message that passes through the pipeline stream. The map operation can modify the data by performing aggregations, type changes, transformations, etc.
Arguments:
function
- The function that will be applied to the data that passes through the operator. Can be provided either as a Python function, or as a string of q code that defines a q function.allowPartials
-True
indicates this filter operation can handle partial batches of data. See below for more details on partial batches.
Returns:
A map
operator, which can be joined to other operators or pipelines.
>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> def square(data):
data['x'] *= 10
return data
>>> sp.run(sp.read.from_callback('publish')
| sp.map(square)
| sp.write.to_variable('out'))
>>> data = pd.DataFrame({
'x': np.random.randn(5),
'y': np.random.randn(5)
})
>>> kx.q('publish', data)
>>> kx.q('out')
x y
---------------------
-4.91861 -0.03564342
2.311394 0.8991633
-1.670712 0.433386
22.39179 -0.8260305
9.908324 -0.3064973
merge
@Operator
def merge(
stream: Pipeline,
function: OperatorFunction,
flush: Optional[str] = None,
trigger: Union[str, Callable] = None,
concat: Optional[OperatorFunction] = None,
state: Any = None,
params: Optional[dict] = kx.q('()!()')) -> Operator
Merge two streams using a function to combine them.
Merge can be used to join and enrich streams with other streams, with static data, or to union
two streams into a single stream. See also: union
Arguments:
stream
- The separate pipeline that will be merged with.function
- A function that takes two arguments, the data from each stream, and returns the data from both streams combined into one.flush
- Indicates which side of the merge operation to flush data from.trigger
- This defines when the merge function should be run. It can be a custom function run on the buffered data, or a symbol to use a predefined trigger.concat
- A function to update the buffer with incoming data.state
- The initial state.params
- The arguments to pass to function.
Returns:
A merge
operator, which can be joined to other operators or pipelines.
>>> from kxi import sp
>>> import pykx as kx
>>> import pandas as pd
>>> df1 = pd.DataFrame({'symbol': ['AAPL', 'GOOGL'], 'price': [150, 2800]})
>>> df1.set_index('symbol', inplace=True)
>>> df2 = pd.DataFrame({'symbol': ['AAPL', 'GOOGL'], 'size': [100, 200]})
>>> sp.run(sp.read.from_callback('publish')
| sp.merge(sp.read.from_callback('publish2'),
lambda x, y: pd.merge(x.pd(), y.pd(), on='symbol',how='left'))
| sp.write.to_variable('out'))
>>> kx.q('publish', df2)
>>> kx.q('publish2', df1)
>>> kx.q('out')
symbol size price
-----------------
AAPL 100 150
GOOGL 200 2800
parallel
@Operator
def parallel(
functions: List[OperatorFunction],
*,
merge: Optional[OperatorFunction] = None,
params: Optional[dict] = kx.q('()!()')
) -> Operator
(Beta Feature)Applies multiple functions in parallel to a stream.
This operator is a core operator that can perform functions in parallel. This operator allows multiple aggregations to be run over the same data in parallel.
Note: Beta Features
To enable beta features, set the environment variable KXI_SP_BETA_FEATURES
to
true
.
Note: State is not supported Cannot update multiple states within the same global variable therefore, state cannot be supported within parallel execution.
Arguments:
functions
- A function or functions applied to incoming data.merge
- A function run on the outputs of the functions passed in.params
- The arguments to pass to function.
Returns:
A parallel
operator, which can be joined to other operators or pipelines.
>>> from kxi import sp
>>> import pykx as kx
>>> import pandas as pd
>>> def avg(data):
return sum(data)/len(data)
>>> sp.run(sp.read.from_callback('publish')
| sp.parallel([sum,avg])
| sp.map(lambda x:[x]) # gets a list of batches, instead of appending all together
| sp.write.to_variable('out'))
>>> data = [1,2,3]
>>> kx.q('publish', data)
>>> kx.q('out')
6 2f
reduce
@Operator
def reduce(function: OperatorFunction,
initial: Any = None,
output: OperatorFunction = None) -> Operator
Aggregates partial windows.
A window may include more records than can fit in memory. As such, it may be necessary
to reduce the buffered records into a smaller, aggregated value at regular intervals.
If the window operator uses the count_trigger
option, a partial window will be emitted
when the number of buffered records exceeds the count_trigger
. Partial windows will
also be emitted when a stream goes idle. These partial windows can be aggregated using this
operator. When the window is complete, this operator will emit the result of reducing the
partial windows.
The reduce operator runs a function on each incoming batch to update the accumulator for that window. Each window has a separate accumulator.
For partial windows, such as those emitted by count_trigger
or idle streams,
the accumulator will be updated but not emitted.
When a window is closed, such as when the high-water mark passes the end of the window, the accumulator will be updated for that final batch, and then it will be emitted.
By default, kxi.sp.reduce
emits the accumulator, but this value can be transformed
with the output
function. If the accumulator is a dictionary, an output function like
pykx.q.enlist
could be used to emit tables.
Arguments:
function
- An aggregator which takes the metadata, data, and the accumulator for a given window, and returns an updated accumulator.initial
- The initial state of the accumulator. The accumulation is performed in q, and as such this initial value must be convertible to q. This conversion can be performed manually withpykx.K
.output
- An optional function to transform output before emitting it. It gets passed the value of the accumulator.
Returns:
A reduce
operator, which can be joined to other operators or pipelines.
>>> from kxi import sp
>>> import pykx as kx
>>> import pandas as pd
>>> import random
>>> import datetime
>>> def accumulator(md, data, acc):
total_length = len(data)
min_time = min(data['time'])
sum_val = sum(data['val'])
return {
'start_time': min_time,
'avg_val': sum_val / total_length
}
>>> def output(accumulator_result):
return pd.DataFrame([accumulator_result])
>>> vals = [random.uniform(0, 1) for _ in range(10000)]
>>> current_datetime = datetime.datetime.now()
>>> time_values = [current_datetime + datetime.timedelta(seconds=i/10) for i in range(10000)]
>>> df = pd.DataFrame({'time': time_values, 'val': vals})
>>> empty_df = pd.DataFrame(columns=df.columns)
>>> sp.run(sp.read.from_callback('publish')
| sp.window.tumbling(datetime.timedelta(seconds=1),
time_assigner=lambda x: x['time'], count_trigger=5)
| sp.reduce(accumulator, empty_df, output)
| sp.write.to_variable('out'))
>>> kx.q('publish', df)
>>> kx.q('out')
avg_val start_time
---------------------------------------
0.4525153 2023.10.06D20:37:26.985194000
0.5691399 2023.10.06D20:37:27.085194000
0.5124086 2023.10.06D20:37:28.085194000
0.6062494 2023.10.06D20:37:29.085194000
0.3942734 2023.10.06D20:37:30.085194000
0.4774162 2023.10.06D20:37:31.085194000
...
rolling
@Operator
def rolling(n: int, fn: OperatorFunction) -> Operator
(Beta Feature)Applies a moving-window function to a stream.
This operator is equivalent to sp.map
, but for moving window functions such as
moving averages or those comparing a vector to a shifted version of itself,
such as the difference function.
This operator does not emit a moving window, that can be done with .qsp.window.count
.
Rather, it maintains a buffer of the last n
records, which is prepended to each
incoming batch. The results of the function on these prepended elements are dropped,
as their values would have already been emitted in an earlier batch.
Notes:
To enable beta features, set the environment variable KXI_SP_BETA_FEATURES
to
true
.
Warnings:
Functions that aggregate data to a constant number of data points (example sum) will not work in conjunction with the rolling operator because it needs to have data points in the buffer to provide the correct output.
Arguments:
n
- The size of the buffer.fn
- A function that takes a vector.
Returns:
A rolling
operator, which can be joined to other operators or pipelines.
>>> from kxi import sp
>>> import numpy as np
>>> import pykx as kx
>>> def moving_average(x, w):
return np.convolve(x, np.ones(w), 'valid') / w
>>> sp.run(sp.read.from_callback('publish')
| sp.rolling(2, lambda x: moving_average(x,2))
| sp.write.to_variable('out'))
>>> kx.q('publish', [1,2,3])
>>> kx.q('publish', [4,5,6])
>>> kx.q('publish', [7,8,9])
>>> kx.q('out')
1.5 2.5 3.5 4.5 5.5 6.5 7.5 8.5
split
@Operator
def split() -> Operator
Split the current stream.
The split operator allows a single stream to be split into arbitrarily many separate streams for running separate analytics or processing.
Split operators can be explicitly added, but are implicitly added if the same operator
appears as a parent multiple times when resolving the streams given
to sp.run
Returns:
A split
operator, which can be joined to other operators or pipelines.
>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> def addOne(data):
data['x'] += 1
return data
>>> def addFive(data):
data['x'] += 5
return data
>>> def timesTen(data):
data['x'] *= 10
return data
>>> streamA=( sp.read.from_callback('publish')
| sp.map(addOne)
| sp.split())
>>> streamB=( streamA
| sp.map(addFive)
| sp.write.to_console())
>>> streamC=( streamA
| sp.map(timesTen)
| sp.write.to_variable('out'))
>>> sp.run(streamB, streamC)
>>> data = pd.DataFrame({
'x': np.random.randn(5),
'y': np.random.randn(5)
})
>>> kx.q('publish', data)
>>> kx.q('out')
x y
--------------------
7.807491 1.565018
8.19516 2.387283
15.90946 -0.03481929
12.44505 -0.5897372
2.520515 -0.06977756
sql
@Operator
def sql(query: CharString, schema: Optional[kx.Table] = None) -> Operator
Perform an SQL query on tabular data in the stream.
The query must:
- conform to ANSI SQL and are limited to the documented supported operations
- reference a special $1 alias in place of the table name
If data in the stream is not table data, an error will be raised. Providing a schema will allow the query to be precompiled enabling faster processing on streaming data.
Note: Queries run in a local worker process SQL queries are not distributed across workers, and are currently run on each worker's substream.
Arguments:
query
- An SQL query, which will be performed on tabular data in the stream.schema
- An empty table with the same schema as the data.>>> from kxi import sp >>> import pykx as kx >>> import pandas as pd >>> df = pd.DataFrame({'x': range(10), 'y': range(10)}) >>> sp.run(sp.read.from_callback('publish') | sp.sql('SELECT * FROM $1 ORDER BY x') | sp.write.to_variable('out')) >>> kx.q('publish', df) >>> kx.q('out')
x y --- 0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9
udf
def udf(
name: CharString,
package: CharString,
version: CharString = None,
params: Optional[dict] = kx.q('()!()')
) -> Operator
Retrieve a User-Defined-Function(UDF) from a defined 'KX_PACKAGE_PATH' package directory
Arguments:
name
- Name of the UDF that is to be retrievedpackage
- Name of the package from which to retrieve a UDFversion
- Version of the package from which to retrieve the UDFparams
- Additional optional input information to the UDF passed as the final argument to the function
Returns:
A function which is to be used as a UDF within a merge, filter, map operator
Examples:
Retrieve a UDF providing all necessary information
>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> # Set path for package retrieval
>>> kx.q.setenv('KX_PACKAGE_PATH', b'/tmp/packages')
>>> # Retrieve the UDF function (in this example the UDF adds 10 to input)
>>> udf = sp.udf('map_udf', 'test', '1.0.0', {'param':10})
>>> # Use the UDF within SP operator
>>> sp.run(sp.read.from_callback('publish')
| sp.map(udf)
| sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
'x': np.random.rand(5),
'x1': np.random.rand(5)
})
>>> kx.q('publish', data)
x x1
-----------------
10.49318 10.39275
10.57852 10.51709
10.08389 10.51598
10.19599 10.40666
10.37564 10.17808
>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> # Set path for package retrieval
>>> kx.q.setenv('KX_PACKAGE_PATH', b'/tmp/packages')
>>> # Retrieve the UDF function
>>> udf = sp.udf('filter_udf', 'test')
>>> # Use the UDF within SP operator (this UDF filters data for x<0.5)
>>> sp.run(sp.read.from_callback('publish')
| sp.filter(udf)
| sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
'x': np.random.rand(5),
'x1': np.random.rand(5)
})
>>> kx.q('publish', data)
x x1
--------------------
0.07347808 0.7263142
0.3159526 0.9216436
0.3410485 0.1809536
union
@Operator
def union(stream: Pipeline,
flush: Optional[str] = None,
trigger: Union[str, Callable] = None) -> Operator
Unite two streams.
Like merge, but elements from both sides of the union are left as-is.
Arguments:
stream
- The separate pipeline that will be unified with.flush
- Indicates which side of the merge operation to flush data from.trigger
- This defines when the merge function should be run. It can be a custom function run on the buffered data, or a symbol to use a predefined trigger.
Returns:
A union
operator, which can be joined to other operators or pipelines.
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish')
| sp.union(sp.read.from_callback('publish2'))
| sp.write.to_variable('out'))
>>> kx.q('publish', range(0, 10))
>>> kx.q('publish2', range(10, 20))
>>> kx.q('out')
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19