Operators
This page outlines the available methods for managing data flow through your pipeline and performing various data processing tasks.
Accumulate
Aggregates a stream into an accumulator.
.qsp.accumulate[fn; initial]
.qsp.accumulate[fn; initial; output]
Parameters:
name | type | description | default |
---|---|---|---|
fn | function | An aggregator which takes the metadata, data, and the accumulator, and returns an updated accumulator. | Required |
initial | any | The initial state of the accumulator. | Required |
output | function | A function to transform the output before emitting it. It gets passed the value of the accumulator. | :: |
For all common arguments, refer to configuring operators
Aggregates a stream into an accumulator, which is updated and then emitted for each
incoming batch. The value being emitted is passed to the output
function,
which can modify it before passing it to the next operator. If the accumulator is a
dictionary, it may be necessary to enlist the result in the output function so the next
operator receives a table.
If no output function is specified, the accumulator will be emitted. If the accumulator
is a dictionary, an output function like {enlist x}
can be used to emit tables.
This pipeline calculates the running averages for each column.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.accumulate[{[md; data; acc]
acc[`total] +: count data;
acc[`sum] +: sum data;
acc
};
`total`sum!(0; ());
// The enlist is needed to turn the dictionary into a table
{enlist x[`sum] % x`total}]
.qsp.write.toConsole[]
publish each 10 cut ([] x: 100?1f; y: 100?1f)
This pipeline emits the number of times each status has been seen
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.accumulate[{[md; data; acc] acc + count each group data `status};
`a`b`c!0 0 0;
// The enlist is needed to turn the dictionary into a table
{enlist x}]
.qsp.write.toConsole[]
publish each 10 cut ([] status: 100?`a`b`c)
sp.accumulate(function, initial)
sp.accumulate(function, initial, output)
Aggregates a stream into an accumulator, which is updated and then emitted for each
incoming batch. The value being emitted is passed to the output
function,
which can modify it before passing it to the next operator. If the accumulator is a
dictionary, it may be necessary to enlist the result in the output function so the next
operator receives a table.
By default, .qsp.accumulate
emits the accumulator, but this value can be transformed
with the output
function. If the accumulator is a dictionary, an output function like
{enlist x}
could be used to emit tables.
Parameters:
name | type | description | default |
---|---|---|---|
function | function | An aggregator which takes the metadata, data, and the accumulator, and returns an updated accumulator. | Required |
initial | any | The initial state of the accumulator. The accumulation is performed in q, and as such this initial value must be convertible to q. This conversion can be performed manually with pykx.K . |
Required |
output | function | An optional function to transform output before emitting it. It gets passed the value of the accumulator. | :: |
Returns:
An accumulate
operator, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> def accumulator(md, data, acc):
return acc + sum(data)
>>> def output(data):
return data * 10
>>> sp.run(sp.read.from_callback('publish')
| sp.accumulate(accumulator, 100, output)
| sp.write.to_variable('out'))
>>> kx.q('publish',[1,2,3])
>>> kx.q('publish',[4,5,6])
>>> kx.q('out')
1060 1210
Apply
Applies a function to incoming batches in the stream.
.qsp.apply[fn]
.qsp.apply[fn; .qsp.use (!) . flip (
(`onFinish; onFinish);
(`state ; state))]
Parameters:
name | type | description | default |
---|---|---|---|
fn | function | An operator applied to incoming data (see below). | Required |
options:
name | type | description | default |
---|---|---|---|
onFinish | function | A function run when the pipeline finishes or is torn down, to flush the buffer. It is passed the operator and metadata. | None |
state | any | The initial state. | :: |
params | symbol or symbol[] | The arguments to pass to fn. | `operator`metadata`data |
For all common arguments, refer to configuring operators
fn
is a ternary function, which is applied to incoming batches in the stream.
The arguments of fn
are:
Since apply
is most often used with state, the operator
and metadata
arguments are implicitly added to the user-defined function.
Unlike other operators, apply
is asynchronous, and data returned by it does not
immediately flow through the rest of the pipeline. Instead, the operator must use
Push to push data through the pipeline when ready.
This is often useful when combined with the state API for implementing custom window operation, running asynchronous external tasks (see task registration), and similar behaviors.
When apply
is used to buffer data, the onFinish
option should be used to flush the
buffer when the pipeline finishes. It will be called once for every key in the state
including the default key (empty symbol), with the metadata dictionary passed to it
containing only the key.
Buffer events in memory before running an analytic:
.qsp.run
.qsp.read.fromCallback[`publish]
// Buffer the stream to 10000 events before pushing downstream
.qsp.apply[
{[op; md; data]
$[10000 <= count state: .qsp.get[op; md] , data;
// If we've hit 10000 events, clear the state and push the buffer
[.qsp.set[op; md; ()]; .qsp.push[op; md; state]];
// Otherwise, update the buffer
.qsp.set[op; md; state]
]
};
.qsp.use (!) . flip (
// Set the default state to empty
(`state; ());
(`onFinish; {[op; md] .qsp.push[op; md] .qsp.get[op; md]}))
]
// Run an analytic on the buffered stream
.qsp.map[{ 10*x }]
// Convert to the (tableName; tableData) format expected by RT
.qsp.map[{ (`customAnalytic; value flip x) }]
.qsp.write.toVariable[`output]
publish each 50 cut ([] data: 20000?1f)
Register an asynchronous task for true async:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.apply[
{[op; md; data]
// Register a task which represents the unfinished async kurl GET request
tid: .qsp.registerTask[op];
.kurl.async (
data `url;
"GET";
``callback!(
::;
{[op;md;data;tid;r]
.qsp.finishTask[op;tid]; // Mark the task as finished
data[`response]: r 1; // Add GET request response to data
.qsp.push[op;md;data]; // Push enriched data to the next operator
}[op;md;data;tid]
)
);
}
]
.qsp.write.toConsole[];
publish each ([] url: ("https://www.google.ca"; "https://www.example.com"))
sp.apply(function)
sp.apply(function, on_finish, state, params)
Unlike other operators, apply is asynchronous. To return data from it, you must use
sp.push
to push data through the pipeline when it is ready. Apply
is commonly used with the state API to implement custom windowing, or with the task API.
When apply
is used to buffer data, the onFinish
option should be used to flush the
buffer when the pipeline finishes. It will be called once for every key in the state
including the default key, with the metadata dictionary passed to it containing only the key.
Parameters:
name | type | description | default |
---|---|---|---|
function | function | A function that will be called with three arguments: operator, metadata, and data. Operator and metadata can be used to get/set the state of the operator, among other uses. | Required |
options:
name | type | description | default |
---|---|---|---|
on_finish | function | A function run when the pipeline finishes or is torn down, to flush the buffer. It is passed the operator and metadata. | None |
state | any | The initial state. | :: |
params | symbol or symbol[] | The arguments to pass to function. | `operator`metadata`data |
Returns:
An apply
operator, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> from itertools import groupby
>>> def split_list(lst):
return [list(group) for key, group in groupby(lst, lambda x: x == 1) if not key]
>>> data = [1, 2, 3, 1, 4, 5, 6, 1, 7, 8, 9]
>>> sp.run(sp.read.from_callback('publish')
| sp.apply(lambda op, md, data: [sp.push(op, md, sublist)
for sublist in split_list(data)])
| sp.map(lambda x: [x])
| sp.write.to_variable('out'))
>>> kx.q('publish', data)
>>> kx.q('out')
2 3
4 5 6
7 8 9
Filter
Filters elements out of a batch, or a batch from a stream.
.qsp.filter[fn]
.qsp.filter[fn; .qsp.use (!) . flip (
(`dropEmptyBatches; dropEmptyBatches);
(`allowPartials ; allowPartials);
(`state ; state))]
Parameters:
name | type | description | default |
---|---|---|---|
fn | function | A unary function returning a boolean atom or vector, that either filters elements out of a batch, or filter out a batch in its entirety from a stream. | Required |
options:
name | type | description | default |
---|---|---|---|
dropEmptyBatches | boolean | 1b to only emit non-empty batches. |
0b |
allowPartials | boolean | 1b indicates this filter operation can handle partial batches of data. See below for more details on partial batches. |
1b |
state | any | The initial state. | :: |
params | symbol or symbol[] | The arguments to pass to fn | `data |
For all common arguments, refer to configuring operators
The fn
function will be called on each batch in the stream.
If the result is a boolean
- vector, only records that it flags in the batch progress through the stream
- atom, all records in the batch progress or are discarded according to the flag
Partial data batches
Some source nodes can push partial sets of data through a pipeline to reduce overall
batch sizes. For example, a file reader might break the file down into smaller chunks and
push each chunk through the pipeline. If the current operator requires an entire
batch of data (ex. an entire file) then set allowPartials
to 0b
to force the
batch to be buffered to completion before running this operator. If the operator
does receive partial data, it is presumed to also emit them
Filter for a single table:
.qsp.run
.qsp.read.fromStream[":tp:5000"]
// Values come in as (tableName; tableValue), so select the desired
// table name
.qsp.filter[{ `trade ~ first x }]
// After the filter, there are only tuples corresponding to trades
// in the stream, so the `tableName` field can be discarded
.qsp.map[{ x 1 }]
.qsp.write.toConsole[]
sp.filter(function)
sp.filter(function, dropEmptyBatches, allowPartials, state, params)
Parameters:
name | type | description | default |
---|---|---|---|
function | function | A predicate function which will be applied to the data that passes through the operator. If the result is a boolean list, then only records that it flags in the batch progress through the stream If the result is a singular boolean value, then either all records in the batch, or none of them progress through the stream. | Required |
options:
name | type | description | default |
---|---|---|---|
dropEmptyBatches | boolean | 'True' to only emit non-empty batches. | 0b |
allowPartials | boolean | True indicates this filter operation can handle partial batches of data. See below for more details on partial batches. |
1b |
state | any | The initial state. | :: |
params | symbol or symbol[] | The arguments to pass to function. | `data |
Returns:
A filter
operator, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> def example_filter(data):
return [i for i in 'sale' == data['transaction']]
>>> sp.run(sp.read.from_callback('publish')
| sp.op.filter(example_filter)
| sp.write.to_variable('out'))
>>> data = pd.DataFrame({
'transaction': ['sale', 'purchase', 'sale', 'sale', 'return', 'purchase'],
'price': [1, 2, 3, 3, 3, 4]
})
>>> kx.q('publish', data)
>>> kx.q('out')
transaction price
-----------------
sale 1
sale 3
sale 3
Key By
(Beta Feature) Splits a stream on a value in the stream.
Beta Features
To enable beta features, set the environment variable KXI_SP_BETA_FEATURES
to
true
. See Beta Feature Usage Terms.
.qsp.keyBy[field]
.qsp.keyBy[field; .qsp.use (!) . flip enlist (
(`allowPartials; allowPartials))]
Parameters:
name | type | description | default |
---|---|---|---|
field | function or symbol or number | The field parameter allows you to extract a field from the input data to use as a key. If a symbol or number is provided, the field is used to index into the data in the stream and return the corresponding field. If a function is provided, the function will return the key for a given message. | Required |
options:
name | type | description | default |
---|---|---|---|
allowPartials | boolean | 1b indicates this filter operation can handle partial batches of data. See below for more details on partial batches. |
1b |
For all common arguments, refer to configuring operators
Partial data batches
Some source nodes can push partial sets of data through a pipeline to reduce overall
batch sizes. For example, a file reader might break the file down into smaller chunks and
push each chunk through the pipeline. If the current operator requires an entire
batch of data (ex. an entire file) then set allowPartials
to 0b
to force the
batch to be buffered to completion before running this operator. If the operator
does receive partial data, it is presumed to also emit them
Key on pass or fail:
.qsp.run
.qsp.read.fromCallback[`score]
.qsp.keyBy[`pass]
.qsp.accumulate[{z+count y};0]
.qsp.map[{enlist `pass`count!(y`key;x)}; .qsp.use``params!(`;`data`metadata)]
.qsp.write.toVariable[`output]
score ([] date: .z.d; id: 10?`3; pass: 10?0b);
output
pass count
----------
1 4
0 6
sp.key_by(field)
sp.key_by(field, allowPartials)
Parameters:
name | type | description | default |
---|---|---|---|
field | function or symbol or number | The method of keying the stream. In the first instance we can pass a string to designate a symbol column on which to partition the data. Otherwise, an integer or callable can be supplied. | Required |
options:
name | type | description | default |
---|---|---|---|
allowPartials | boolean | True indicates this filter operation can handle partial batches of data. See below for more details on partial batches. |
1b |
Returns:
A key_by
operator, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish')
| sp.op.key_by('transaction')
| sp.write.to_variable('out'))
>>> data = pd.DataFrame({
'transaction': ['sale', 'purchase', 'sale', 'sale', 'return', 'purchase'],
'price': [1, 2, 3, 3, 3, 4]
})
>>> kx.q('publish', data)
>>> kx.q('out')
transaction price
-----------------
sale 1
sale 3
sale 3
purchase 2
purchase 4
return 3
Map
Applies a function to data passing through the operator.
.qsp.map[fn]
.qsp.map[fn; .qsp.use (!) . flip (
(`allowPartials; allowPartials);
(`state ; state))]
Parameters:
name | type | description | default |
---|---|---|---|
fn | function | A unary function that is applied it to the data, and returns the result. | Required |
options:
name | type | description | default |
---|---|---|---|
allowPartials | boolean | 1b indicates this filter operation can handle partial batches of data. See below for more details on partial batches. |
1b |
For all common arguments, refer to configuring operators
Partial data batches
Some source nodes can push partial sets of data through a pipeline to reduce overall
batch sizes. For example, a file reader might break the file down into smaller chunks and
push each chunk through the pipeline. If the current operator requires an entire
batch of data (ex. an entire file) then set allowPartials
to 0b
to force the
batch to be buffered to completion before running this operator. If the operator
does receive partial data, it is presumed to also emit them
A basic map:
.qsp.run
.qsp.read.fromCallback[`trade]
.qsp.map[{ update price * size from x }]
.qsp.write.toConsole[]
trade ([] price: 10?200f; size: 10?1000)
A stateful map:
.qsp.run
.qsp.read.fromCallback[`trade]
// *Note* - using `state` implicitly adds the `operator` and `metadata`
// arguments required for .qsp.* state APIs.
.qsp.map[
{[op;md;data]
// Retrieve the previous state from the last batch
previous: .qsp.get[op;md];
// Calculate size * price for each symbol
v: select sp:price * size by sym from data;
// Set the state to include the current batch calculation
.qsp.set[op; md; previous , v];
// Send the difference between the current and last batch to
// any forward operators in the pipeline (here, console writer)
select from v - previous where sym in key[v]`sym
};
.qsp.use``state!(::; ([sym:0#`] sp:0#0f))
]
.qsp.write.toConsole[]
trade each 1 cut ([]
sym: `ABC`ABC`XYZ`XYZ`ABC`XYZ;
price: 200 203 53 52 190 55;
size: 100 100 200 400 100 150)
Retrieving metadata within a map:
.qsp.run
.qsp.read.fromCallback[`publish]
// Group events into logical groups every 5 seconds of event time
// based on the 'timestamp' column in the data to indicate which
// window an event should belong
.qsp.window.tumbling[00:00:05; `time]
.qsp.map[
{[md;x]
// Add the start of the window to the batched event data
update start: md`window from x
};
.qsp.use``params!(::; 1#`metadata)
]
.qsp.write.toConsole[]
publish ([] time: 0p + 00:00:01 * 0 3 5 7 9 11; data: til 6)
sp.map(function)
sp.map(function, allowPartials)
The function provided will be called on every message that passes through the pipeline stream. The map operation can modify the data by performing aggregations, type changes, transformations, etc.
Parameters:
name | type | description | default |
---|---|---|---|
function | function | The function that will be applied to the data that passes through the operator. Can be provided either as a Python function, or as a string of q code that defines a q function. | Required |
options:
name | type | description | default |
---|---|---|---|
allowPartials | boolean | True indicates this filter operation can handle partial batches of data. See below for more details on partial batches. |
1b |
Returns:
A map
operator, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> def square(data):
data['x'] *= 10
return data
>>> sp.run(sp.read.from_callback('publish')
| sp.map(square)
| sp.write.to_variable('out'))
>>> data = pd.DataFrame({
'x': np.random.randn(5),
'y': np.random.randn(5)
})
>>> kx.q('publish', data)
>>> kx.q('out')
x y
---------------------
-4.91861 -0.03564342
2.311394 0.8991633
-1.670712 0.433386
22.39179 -0.8260305
9.908324 -0.3064973
Merge
Merges two data streams using a function to combine them.
.qsp.merge[stream; function]
.qsp.merge[stream; function; .qsp.use (!) . flip (
(`flush ; flush);
(`trigger; trigger);
(`concat ; concat);
(`state ; state))]
Parameters:
name | type | description | default |
---|---|---|---|
stream | pipeline | A separate pipeline to merge. | Required |
function | function | A function of the two streams to combine both into one. | Required |
options:
name | type | description | default |
---|---|---|---|
flush | symbol | Indicates which side of the merge operation to flush data from. | left |
trigger | function or symbol | This defines when the merge function should be run. It can be a custom function run on the buffered data, or a symbol to use a predefined trigger. | both |
concat | function | A function to update the buffer with incoming data. | Append incoming data |
state | any | The initial state. | :: |
params | symbol or symbol[] | The arguments to pass to fn. | `left`right |
For all common arguments, refer to configuring operators
This API merges stream
to the current stream using the function
parameter.
Data loss
When merging data from multiple readers, it is very easy to write a pipeline that will exhibit data loss. Read through this page before using the merge operator.
Merge can be used to join and enrich streams with other streams, with static data, or
to union two streams into a single stream. The stream to which .qsp.merge
is appended
is the left stream, and the stream included as a parameter is the right stream.
The merge function is passed the buffered values for the two streams, to merge them
into an output batch. It must return the pair (metadata; data)
.
The flush
option indicates which side of the merge operation to flush data from.
Data is flushed after a merge has been performed, meaning that once the merge is
complete, the indicated stream is cleared for the next merge operation. Data that is
not flushed in the join will be buffered indefinitely.
This value can be one of the following symbols:
left
- Only flush data from the left streamright
- Only flush data from the right streamboth
- Flush both streamsnone
- Don't flush any data
The trigger
option determines when the merge function should be run. It
can be one of the following symbols, or a function that evaluates to a
boolean. If a function is provided it is passed the buffered state of the
left and right streams as arguments, each as pairs of (metadata; data)
.
immediate
- Emit on any inputboth
- Emit when both streams have buffered dataleft
- Emit when the left stream has buffered dataright
- Emit when the right stream has buffered data
Initial message ordering
With the exception of the default both
, using any of the above options may
require prior knowledge of which stream will receive data first. In cases
where the first message satisfies the trigger condition, i.e. the left side
receiving the first message when using left
, the merge function will be
passed an empty list representing the opposite buffer. The same behaviour applies
to right
and occurs regardless when using immediate
.
Be aware that this behaviour can also be encountered beyond the initial message
when using both
as the flush option above.
right: .qsp.read.fromCallback[`rcb];
.qsp.run
.qsp.read.fromCallback[`lcb]
.qsp.merge[right;{show "left (x): ",.Q.s1[x]," right (y): ",.Q.s1[y]};.qsp.use ``trigger!(::;`left)];
lcb ([] a:1 2; b:3 4)
"left (x): +`a`b!(1 2;3 4) right (y): ()"
The concat
option is a function applied to an incoming stream to concatenate
data from the same stream. It is passed two arguments: the previously cached state, and
the incoming message, both as (metadata; data)
tuples.
The params
list can be any subset left
,right
, leftMeta
, rightMeta
,
metadata
, and operator
, to specify what is passed to the merge function.
This example joins the last quote for each trade. Note that because nothing tracks when quotes come in relative to the trade stream, the following pipeline is nondeterministic.
// Create a data flow for quote updates
quotes: .qsp.read.fromCallback[`updQuote]
// A stateful map to hold the last seen quote for each symbol
.qsp.map[
{[o;m;z]
// Update the state with the last value for each symbol from the batch
// The '.qsp.set' returns the data set, forwarding it on to downstream
// operators.
.qsp.set[o;m] .qsp.get[o;m] upsert select price by sym from z
};
.qsp.use``state!(::; ()) ]
// Create a data stream for trade updates
quoteAsOfTrade: .qsp.read.fromCallback[`updTrade]
// Left join the last quote for each sym onto each new trade in the stream
// Since the updates from the quoteStream are keyed, the buffer will be updated
// with the latest data, and doesn't need to be re-keyed before joining with `lj`.
.qsp.merge[quotes; lj]
.qsp.write.toConsole[]
.qsp.run quoteAsOfTrade
updQuote ([sym: `ABC`XYZ] time: 2001.01.01D12:00:00; price: 45 72)
updTrade ([] sym: `ABC`XYZ; size: 1000 5000)
| sym size price
-----------------------------| --------------
2022.03.03D19:40:06.743993600| ABC 1000 45
2022.03.03D19:40:06.743993600| XYZ 5000 72
Note that when using keyed streams, .qsp.merge
keeps a separate state for each key.
Thus, in the following example where a reference file (implicitly keyed with the file name)
is joined to incoming batches, the right stream must be unkeyed with an apply operator.
Otherwise it will be buffered under the key store.csv
while the
left stream is buffered under the default key, and neither buffer would have incoming messages
on both streams to trigger the merge.
`stores.csv 0: csv 0: ([storeID: 0 1] country: `CA`US)
stores: .qsp.read.fromFile[`stores.csv; .qsp.use ``chunking!00b]
.qsp.decode.csv[([storeID: `long$()] country: `$())]
// Reset the metadata to be unkeyed
.qsp.apply[{[op; md; data] .qsp.push[op; enlist[`]!enlist[::]; data]}]
.qsp.run .qsp.read.fromCallback[`newSale]
.qsp.merge[stores; lj]
.qsp.write.toVariable[`out]
newSale ([] storeID: enlist 0; SKU: 31415; price: 199.99)
newSale ([] storeID: enlist 1; SKU: 27182; price: 34.99)
out
storeID SKU price country
----------------------------
0 31415 199.99 CA
1 27182 34.99 US
sp.merge(stream, function)
sp.merge(stream, function, flush, trigger, concat, state, params)
Merge can be used to join and enrich streams with other streams, with static data, or to union
two streams into a single stream. See also: union
Parameters:
name | type | description | default |
---|---|---|---|
stream | pipeline | The separate pipeline that will be merged with. | Required |
function | function | A function that takes two arguments, the data from each stream, and returns the data from both streams combined into one. | Required |
options:
name | type | description | default |
---|---|---|---|
flush | symbol | Indicates which side of the merge operation to flush data from. | left |
trigger | function or symbol | This defines when the merge function should be run. It can be a custom function run on the buffered data, or a symbol to use a predefined trigger. | both |
concat | function | A function to update the buffer with incoming data. | Append incoming data |
state | any | The initial state. | :: |
params | symbol or symbol[] | The arguments to pass to function. | `left`right |
Returns:
A merge
operator, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> import pandas as pd
>>> df1 = pd.DataFrame({'symbol': ['AAPL', 'GOOGL'], 'price': [150, 2800]})
>>> df1.set_index('symbol', inplace=True)
>>> df2 = pd.DataFrame({'symbol': ['AAPL', 'GOOGL'], 'size': [100, 200]})
>>> sp.run(sp.read.from_callback('publish')
| sp.merge(sp.read.from_callback('publish2'),
lambda x, y: pd.merge(x.pd(), y.pd(), on='symbol',how='left'))
| sp.write.to_variable('out'))
>>> kx.q('publish', df2)
>>> kx.q('publish2', df1)
>>> kx.q('out')
symbol size price
-----------------
AAPL 100 150
GOOGL 200 2800
Parallel
(Beta Feature) Applies multiple functions in parallel to a stream.
Beta Features
To enable beta features, set the environment variable KXI_SP_BETA_FEATURES
to true
. See Beta Feature Usage Terms.
.qsp.parallel[fns]
.qsp.parallel[fns; .qsp.use (!) . enlist flip (
(`merge; merge)]
name | type | description | default |
---|---|---|---|
fns | function, dictionary, string or function[] | An operator applied to incoming data. | Required |
options:
name | type | description | default |
---|---|---|---|
merge | function | A function run on the outputs of the functions passed in. | None |
params | symbol or symbol[] | The arguments to pass to each fn. | `data |
For all common arguments, refer to configuring operators
This operator is a core operator that can perform functions in parallel. This operator allows multiple aggregations to be run over the same data in parallel.
State is not supported
Cannot update multiple states within the same global variable therefore, state cannot be supported within parallel execution.
This pipeline applies functions on a stream.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.parallel[(avg;max)]
.qsp.map[enlist]
.qsp.write.toVariable[`output];
publish 1 2 3;
publish 4 5 6;
output
2f 3
5f 6
This pipeline applies a dictionary on a stream and merges the input with the correct output.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.parallel[`identity`usd`eur!(::; {0.777423 * x`price}; {0.764065 * x`price});
.qsp.use ``merge!(::; {x[`identity] ,' flip `identity _ x})]
.qsp.write.toVariable[`output];
publish ([] price: 2.7 5.07);
publish ([] price: 1.35 9.15);
{.01*`int$x*100} output
price usd eur
---------------
2.7 2.1 2.06
5.07 3.94 3.87
1.35 1.05 1.03
9.15 7.11 6.99
This pipeline applies functions as strings on a stream
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.parallel[("{0.777423 * x`price}";"{0.764065 * x`price}")]
.qsp.write.toVariable[`output];
publish ([] price: 2.7 5.07);
publish ([] price: 1.35 9.15);
{.01*`int$x*100} output
sp.parallel(functions)
sp.parallel(functions, merge, params)
This operator is a core operator that can perform functions in parallel. This operator allows multiple aggregations to be run over the same data in parallel.
State is not supported
Cannot update multiple states within the same global variable therefore, state cannot be supported within parallel execution.
Parameters:
name | type | description | default |
---|---|---|---|
functions | function, dictionary, string or function[] | A function or functions applied to incoming data. | Required |
options:
name | type | description | default |
---|---|---|---|
merge | function | A function run on the outputs of the functions passed in. | None |
params | symbol or symbol[] | The arguments to pass to function. | `data |
Returns:
A parallel
operator, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> import pandas as pd
>>> def avg(data):
return sum(data)/len(data)
>>> sp.run(sp.read.from_callback('publish')
| sp.parallel([sum,avg])
| sp.map(lambda x:[x]) # gets a list of batches, instead of appending all together
| sp.write.to_variable('out'))
>>> data = [1,2,3]
>>> kx.q('publish', data)
>>> kx.q('out')
6 2f
Reduce
Aggregates partial windows outputted from Window Nodes.
.qsp.reduce[fn; initial]
.qsp.reduce[fn; initial; output]
Parameters:
name | type | description | default |
---|---|---|---|
fn | function | An aggregator which takes the metadata, data, and the accumulator for a given window, and returns an updated accumulator. | Required |
initial | any | The initial state of the accumulator. | Required |
output | function | A function to transform the output before emitting it. It gets passed the value of the accumulator. | :: |
For all common arguments, refer to configuring operators
A window may include more records than can fit in memory. As such, it may be necessary
to reduce the buffered records into a smaller, aggregated value at regular intervals.
If the window operator uses the countTrigger
option, a partial window will be emitted
when the number of buffered records exceeds the countTrigger
. Partial windows will
also be emitted when a stream goes idle. These partial windows can be aggregated using
.qsp.reduce
. When the window is complete, .qsp.reduce
will emit the result of
reducing the partial windows.
The reduce operator runs a function on each incoming batch to update the accumulator
for that window. Each window has a separate accumulator.
For partial windows, such as those emitted by countTrigger
or idle streams,
the accumulator will be updated but not emitted.
When a window is closed, such as when the high-water mark passes the end of the window,
the accumulator will be updated for that final batch, and then it will be emitted.
If no output function is specified, the accumulator will be emitted. If the accumulator
is a dictionary, an output function like {enlist x}
can be used to emit tables.
Any partial windows will be emitted on teardown.
This pipeline calculates the average of the val
column. There are 1000 records per
window, but the reduction is run whenever the buffer size reaches 100 records.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.window.tumbling[00:00:01; `time; .qsp.use ``countTrigger!0 100]
.qsp.reduce[{[md; data; acc]
acc[`total] +: count data;
acc[`sum] +: sum data `val;
acc[`window]: md `window;
acc
};
`total`sum`window!(0; 0f; ::);
{enlist `startTime`average!(x`window; x[`sum] % x`total)}]
.qsp.write.toVariable[`out]
publish each 10 cut ([] time: .z.p + 00:00:00.001 * til 10000; val: 10000?1f)
out
startTime average
---------------------------------------
2023.10.17D18:29:03.000000000 0.4979897
2023.10.17D18:29:04.000000000 0.4955796
2023.10.17D18:29:05.000000000 0.4848963
2023.10.17D18:29:06.000000000 0.5141578
2023.10.17D18:29:07.000000000 0.4984107
2023.10.17D18:29:08.000000000 0.5013768
2023.10.17D18:29:09.000000000 0.4986774
2023.10.17D18:29:10.000000000 0.5160887
2023.10.17D18:29:11.000000000 0.5066924
2023.10.17D18:29:12.000000000 0.4941379
...
//Set a new high-water mark to close out earlier windows
publish ([] time: .z.p + 00:00:10; val: 1?1f)
startTime average
---------------------------------------
2023.10.17D18:29:03.000000000 0.4979897
2023.10.17D18:29:04.000000000 0.4955796
2023.10.17D18:29:05.000000000 0.4848963
2023.10.17D18:29:06.000000000 0.5141578
2023.10.17D18:29:07.000000000 0.4984107
2023.10.17D18:29:08.000000000 0.5013768
...
sp.reduce(function, initial)
sp.reduce(function, initial, output)
A window may include more records than can fit in memory. As such, it may be necessary
to reduce the buffered records into a smaller, aggregated value at regular intervals.
If the window operator uses the count_trigger
option, a partial window will be emitted
when the number of buffered records exceeds the count_trigger
. Partial windows will
also be emitted when a stream goes idle. These partial windows can be aggregated using this
operator. When the window is complete, this operator will emit the result of reducing the
partial windows.
The reduce operator runs a function on each incoming batch to update the accumulator for that window. Each window has a separate accumulator.
For partial windows, such as those emitted by count_trigger
or idle streams,
the accumulator will be updated but not emitted.
When a window is closed, such as when the high-water mark passes the end of the window, the accumulator will be updated for that final batch, and then it will be emitted.
By default, kxi.sp.reduce
emits the accumulator, but this value can be transformed
with the output
function. If the accumulator is a dictionary, an output function like
pykx.q.enlist
could be used to emit tables.
Parameters:
name | type | description | default |
---|---|---|---|
function | function | An aggregator which takes the metadata, data, and the accumulator for a given window, and returns an updated accumulator. | Required |
initial | any | The initial state of the accumulator. The accumulation is performed in q, and as such this initial value must be convertible to q. This conversion can be performed manually with pykx.K . |
Required |
output | function | An optional function to transform output before emitting it. It gets passed the value of the accumulator. | :: |
Returns:
A reduce
operator, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> import pandas as pd
>>> import random
>>> import datetime
>>> def accumulator(md, data, acc):
total_length = len(data)
min_time = min(data['time'])
sum_val = sum(data['val'])
return {
'start_time': min_time,
'avg_val': sum_val / total_length
}
>>> def output(accumulator_result):
return pd.DataFrame([accumulator_result])
>>> vals = [random.uniform(0, 1) for _ in range(10000)]
>>> current_datetime = datetime.datetime.now()
>>> time_values = [current_datetime + datetime.timedelta(seconds=i/10) for i in range(10000)]
>>> df = pd.DataFrame({'time': time_values, 'val': vals})
>>> empty_df = pd.DataFrame(columns=df.columns)
>>> sp.run(sp.read.from_callback('publish')
| sp.window.tumbling(datetime.timedelta(seconds=1),
time_assigner=lambda x: x['time'], count_trigger=5)
| sp.reduce(accumulator, empty_df, output)
| sp.write.to_variable('out'))
>>> kx.q('publish', df)
>>> kx.q('out')
avg_val start_time
---------------------------------------
0.4525153 2023.10.06D20:37:26.985194000
0.5691399 2023.10.06D20:37:27.085194000
0.5124086 2023.10.06D20:37:28.085194000
0.6062494 2023.10.06D20:37:29.085194000
0.3942734 2023.10.06D20:37:30.085194000
0.4774162 2023.10.06D20:37:31.085194000
...
Rolling
(Beta Feature) Applies a moving-window function to a stream.
Beta Features
To enable beta features, set the environment variable KXI_SP_BETA_FEATURES
to true
. See Beta Feature Usage Terms.
.qsp.rolling[n; fn]
name | q type | description | default |
---|---|---|---|
n | long | The size of the buffer | Required |
fn | function | A function that takes a vector | Required |
For all common arguments, refer to configuring operators
This operator is equivalent to .qsp.map, but for moving window functions such as moving averages or those comparing a vector to a shifted version of itself, such as the difference function.
This operator does not emit a moving window, that can be done with .qsp.window.count
.
Rather, it maintains a buffer of the last n
records, which is prepended to each
incoming batch. The results of the function on these prepended elements are dropped,
as their values would have already been emitted in an earlier batch.
Functions that aggregate data points to one value cannot be used
Functions that aggregate data to a constant number of data points (example sum) will not work in conjunction with the rolling operator because it needs to have data points in the buffer to provide the correct output.
Functions that displace values cannot be used
Functions like {-2 _ (x = prev x) and (x = prev prev x)}
and
{2 _ (x = next x) and (x = next next x)}
cannot be evaluated
with any rolling window size. In order to make this type of
function work you have to create a function that does not displace
needed input.
If a user wanted to know when there are multiple consecutive values between batches we could use these types of functions to do that with the rolling operator.
However, the function {-2 _ (x = prev x) and (x = prev prev x)}
with a
rolling window of 2 displaces needed values in order to output the correct
result. Below displays the rolling operator working on this function. Notice,
the enlist `b
batch as this is where the needed value is displaced.
Incorrect example:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.rolling[2; {-2 _ (x = prev x) and (x = prev prev x)}]
.qsp.write.toVariable[`output];
publish `a`a`a`b;
// happening in rolling:
// fn: {-2 _ (x = prev x) and (x = prev prev x)}
// {-2 _ (0110b and 0010b)}
// {-2 _ 0010b}
publish enlist `b;
// in buffer -> `a`b`b (due to rolling window length of 2)
// happening in rolling:
// fn: {-2 _ (x = prev x) and (x = prev prev x)}
// {-2 _ (001b and 000b)}
// {-2 _ 000b}
publish `b`a`a`b;
// in buffer -> `b`b`b`a`a`b
// happening in rolling:
// fn: {-2 _ (x = prev x) and (x = prev prev x)}
// {-2 _ (011010b and 001000b)}
// {-2 _ 001000b}
output
0000010b
Compare this to the result when passing in the same data in one single batch.
.qsp.teardown[];
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.rolling[2; {-2 _ (x = prev x) and (x = prev prev x)}]
.qsp.write.toVariable[`output];
publish `a`a`a`b`b`b`a`a`b;
// fn: {-2 _ (x = prev x) and (x = prev prev x)}
// {-2 _ (011011010b and 001001000b)}
// {-2 _ 001001000b}
output
0010010b
Correct Example:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.rolling[2; {2 _ (x = prev x) and (x = prev prev x)}]
.qsp.write.toVariable[`output];
publish each (`a`a`a`b; enlist `b; `b`a`a`b);
output
1001000b
.qsp.teardown[];
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.rolling[2; {2 _ (x = prev x) and (x = prev prev x)}]
.qsp.write.toVariable[`output];
publish `a`a`a`b`b`b`a`a`b;
output
1001000b
This pipeline applies a moving average. Since the average is based on a given record and the four before it, a buffer size of four is used.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.rolling[4; mavg[5]]
.qsp.write.toVariable[`output];
publish 1 2 10 1 2 2 1 2;
output
1 1.5 4.333333 3.5 3.2 3.4 3.2 1.6
The output will gradually approach 0 as the number of non-zero entries in the buffer decreases.
publish 0 0 0 0 0;
output
1.4 1 0.6 0.4 0
This pipeline calculates f(x[t]) = x[t] - x[t-1], starting at t=1
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.rolling[1; {1 _ deltas x}]
.qsp.write.toVariable[`output];
publish 2 4 6 8 11;
output
2 2 2 3
publish 15 20 26;
output
4 5 6
sp.rolling(n, fn)
This operator is equivalent to sp.map
, but for moving window functions such as
moving averages or those comparing a vector to a shifted version of itself,
such as the difference function.
This operator does not emit a moving window, that can be done with .qsp.window.count
.
Rather, it maintains a buffer of the last n
records, which is prepended to each
incoming batch. The results of the function on these prepended elements are dropped,
as their values would have already been emitted in an earlier batch.
aggregating not supported
Functions that aggregate data to a constant number of data points (example sum) will not work in conjunction with the rolling operator because it needs to have data points in the buffer to provide the correct output.
Parameters:
name | q type | description | default |
---|---|---|---|
n | long | The size of the buffer. | Required |
fn | function | A function that takes a vector. | Required |
Returns:
A rolling
operator, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import numpy as np
>>> import pykx as kx
>>> def moving_average(x, w):
return np.convolve(x, np.ones(w), 'valid') / w
>>> sp.run(sp.read.from_callback('publish')
| sp.rolling(2, lambda x: moving_average(x,2))
| sp.write.to_variable('out'))
>>> kx.q('publish', [1,2,3])
>>> kx.q('publish', [4,5,6])
>>> kx.q('publish', [7,8,9])
>>> kx.q('out')
1.5 2.5 3.5 4.5 5.5 6.5 7.5 8.5
Split
Splits the current stream.
.qsp.split[]
For all common arguments, refer to configuring operators
The split
operator allows a single stream to be split into arbitrarily many
separate streams for running separate analytics or processing.
Split operators can either be explicitly added, or are implicit if the
same operator appears as a parent multiple times when resolving all
streams given to .qsp.run
into a single DAG before running.
Explicit split operator:
streamA: .qsp.read.fromCallback[`publish]
.qsp.map[{x + 1}]
.qsp.split[]
streamB: streamA .qsp.map[{10 * x}] .qsp.write.toConsole[]
streamC: streamA .qsp.map[{x - 1}] .qsp.write.toVariable[`output]
.qsp.run (streamB; streamC)
publish ([] data: til 10)
Implicit split operator:
streamA: .qsp.read.fromCallback[`publish]
.qsp.map[{x + 1}]
streamB: streamA .qsp.map[{10 * x}] .qsp.write.toConsole[]
streamC: streamA .qsp.map[{x % 2}] .qsp.write.toVariable[`output]
.qsp.run (streamB; streamC)
publish ([] data: til 10)
sp.split()
The split operator allows a single stream to be split into arbitrarily many separate streams for running separate analytics or processing.
Split operators can be explicitly added, but are implicitly added if the same operator
appears as a parent multiple times when resolving the streams given
to sp.run
Returns:
A split
operator, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> def addOne(data):
data['x'] += 1
return data
>>> def addFive(data):
data['x'] += 5
return data
>>> def timesTen(data):
data['x'] *= 10
return data
>>> streamA=( sp.read.from_callback('publish')
| sp.map(addOne)
| sp.split())
>>> streamB=( streamA
| sp.map(addFive)
| sp.write.to_console())
>>> streamC=( streamA
| sp.map(timesTen)
| sp.write.to_variable('out'))
>>> sp.run(streamB, streamC)
>>> data = pd.DataFrame({
'x': np.random.randn(5),
'y': np.random.randn(5)
})
>>> kx.q('publish', data)
>>> kx.q('out')
x y
--------------------
7.807491 1.565018
8.19516 2.387283
15.90946 -0.03481929
12.44505 -0.5897372
2.520515 -0.06977756
SQL
Performs an SQL query over data in a stream.
.qsp.sql[query;schema]
.qsp.sql[query;schema;.qsp.use (!) . flip enlist(`schemaType;schemaType)]
Parameters:
name | type | description | default |
---|---|---|---|
query | string | An SQL query to be performed over table data in the stream. | Required |
schema | table, dictionary or :: | The schema of the incoming data. Either an empty table representing the schema of the data table, a dictionary of column names and their type character, or null to infer the schema. | :: |
options:
name | type | description | default |
---|---|---|---|
schemaType | symbol | How to interpret the provided schema object. By default the schema is treated as the desired literal output. Alternatively, this can be set to be schema and a special table of ([] name: `$(); datatype: `short$()) can be provided describing the desired output. |
literal |
For all common arguments, refer to configuring operators
Queries must
- conform to ANSI SQL and are limited to the documented supported operations
- reference a special
$1
alias in place of the table name
If data in the stream is not table data, an error will be signaled. Choosing to pass a schema will allow the query to be precompiled enabling faster processing on streaming data.
Queries run in a local worker process
SQL queries are not distributed across workers, and are currently run on each worker's substream
Select average price by date:
// Generate some random data with dates and prices
n: 100
t: ([] date: n?.z.d-til 3; price: n?1000f)
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.sql["select date, avg(price) from $1 group by date"; 0#t]
.qsp.write.toConsole[]
publish t
| date price
-----------------------------| -------------------
2021.07.26D18:28:29.377228300| 2021.07.24 509.2982
2021.07.26D18:28:29.377228300| 2021.07.25 455.0621
2021.07.26D18:28:29.377228300| 2021.07.26 507.9869
Select using q date parameter:
// Generate some random data with dates and prices
n: 100
t: ([] date: n?.z.d-til 3; price: n?1000f)
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.sql["select avg(price) from $1 where date = $2"; 0#t
.qsp.use``args!(::;enlist .z.d)]
.qsp.write.toConsole[]
publish t
| price
-----------------------------| --------
2021.07.26D18:31:20.854529500| 489.4781
Select the average price over 2-second windows:
schema: ([] time:`timestamp$(); price: `float$());
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.window.tumbling[00:00:02; `time]
.qsp.sql["select min(time) as start_time, max(time) as end_time, avg(price) from $1"; schema]
.qsp.write.toConsole[]
publish `time xasc ([] time: 100?.z.p+0D00:00:01*til 20; price: 100?1000f)
| start_time end_time price
-----------------------------| --------------------------------------------------------------------
2021.07.27D10:27:38.432116100| 2021.07.27D10:27:38.409337200 2021.07.27D10:27:39.409337200 407.0665
| start_time end_time price
-----------------------------| --------------------------------------------------------------------
2021.07.27D10:27:38.446046200| 2021.07.27D10:27:40.409337200 2021.07.27D10:27:41.409337200 528.9845
| start_time end_time price
-----------------------------| ------------------------------------------------------------------
2021.07.27D10:27:38.462265300| 2021.07.27D10:27:42.409337200 2021.07.27D10:27:43.409337200 387.83
..
Select using schemaType parameter:
schema: ([] name:`date`price; datatype:-12 -9h);
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.sql["select date, avg(price) from $1 group by date"; schema; .qsp.use``schemaType!(::;`schema)]
.qsp.write.toConsole[]
publish ([] date: n?.z.d-til 3; price: n?1000f)
| date price
-----------------------------| -------------------
2024.01.18D16:18:22.143816200| 2024.01.16 496.5402
2024.01.18D16:18:22.143816200| 2024.01.17 436.0239
2024.01.18D16:18:22.143816200| 2024.01.18 540.1907
sp.sql(query, schema)
The query must:
- conform to ANSI SQL and are limited to the documented supported operations
- reference a special $1 alias in place of the table name
If data in the stream is not table data, an error will be raised. Providing a schema will allow the query to be precompiled enabling faster processing on streaming data.
Queries run in a local worker process
SQL queries are not distributed across workers, and are currently run on each worker's substream.
Parameters:
name | type | description | default |
---|---|---|---|
query | string | An SQL query, which will be performed on tabular data in the stream. | Required |
schema | table, dictionary or :: | An empty table with the same schema as the data. | :: |
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> import pandas as pd
>>> df = pd.DataFrame({'x': range(10), 'y': range(10)})
>>> sp.run(sp.read.from_callback('publish')
| sp.sql('SELECT * FROM $1 ORDER BY x')
| sp.write.to_variable('out'))
>>> kx.q('publish', df)
>>> kx.q('out')
x y
---
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
Union
Interleaves two streams.
.qsp.union[stream]
.qsp.union[stream; .qsp.use (!) . flip (
(`flush ; flush);
(`trigger; trigger))]
Parameters:
name | type | description | default |
---|---|---|---|
stream | pipeline | The stream to unite with the current stream. | Required |
options:
name | type | description | default |
---|---|---|---|
flush | symbol | Indicates which side of the merge operation to flush data from. | both |
trigger | function or symbol | This defines when the merge function should be run. It can be a custom function run on the buffered data, or a symbol to use a predefined trigger. | immediate |
For all common arguments, refer to configuring operators
This API unites stream
and the current stream into a single stream.
This is similar to a join, with the difference that elements from both sides of the union are left as-is, resulting in a single stream.
To enable .qsp.union
Unions are non-deterministic, as it is unknown in which order the sources will be read
from. Set KXI_ALLOW_NONDETERMINISM
to "true" to allow enable this operator.
Simple union of two streams:
streamA: .qsp.read.fromCallback[`callbackA] .qsp.map[{ enlist(`a;x) }]
streamB: .qsp.read.fromCallback[`callbackB] .qsp.map[{ enlist(`b;x) }]
streamC: streamA .qsp.union[streamB] .qsp.write.toConsole[]
.qsp.run streamC
callbackA 1; callbackB 1; callbackA 1
2021.06.28D21:40:07.939960300 | `a 1
2021.06.28D21:40:08.204401700 | `b 1
2021.06.28D21:40:08.910553600 | `a 1
sp.union(stream)
sp.union(stream, flush, trigger)
Like merge, but elements from both sides of the union are left as-is.
Parameters:
name | type | description | default |
---|---|---|---|
stream | pipeline | The separate pipeline that will be unified with. | Required |
options:
name | type | description | default |
---|---|---|---|
flush | symbol | Indicates which side of the merge operation to flush data from. | both |
trigger | function or symbol | This defines when the merge function should be run. It can be a custom function run on the buffered data, or a symbol to use a predefined trigger. | immediate |
Returns:
A union
operator, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish')
| sp.union(sp.read.from_callback('publish2'))
| sp.write.to_variable('out'))
>>> kx.q('publish', range(0, 10))
>>> kx.q('publish2', range(10, 20))
>>> kx.q('out')
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19