Windows
This page presents methods for segmenting pipeline data into overlapping or non-overlapping windows based on time, position, or custom conditions.
Count Window
Cuts the stream into windows of equal record counts.
.qsp.window.count[size]
.qsp.window.count[size;frequency]
.qsp.window.count[size;frequency; .qsp.use (!) . flip enlist(
(`acceptDictionaries; acceptDictionaries))]
Parameters:
name | type | description | default |
---|---|---|---|
size | long | The exact number of records to include in each window. | Required |
frequency | long | The number of records between the starts of consecutive windows. If this is less than size , the windows will overlap. |
The window size |
options:
name | type | description | default |
---|---|---|---|
acceptDictionaries | boolean | If batches will never be dictionaries, this can be 0b to increase performance. | 1b |
For all common arguments, refer to configuring operators
This operator buffers incoming records until the number of buffered records reaches
or exceeds size
, at which point the buffer is split into windows of length size
.
Any left over records will remain in the buffer. Sliding windows can be emitted by using
the frequency
parameter. Regardless of how many overlapping windows a record will appear
in, only one copy of each record is kept in memory.
Any partial windows will be emitted on teardown.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.window.count[10]
.qsp.write.toConsole[]
//This will emit two windows of ten records each, with five remaining in the buffer
publish ([] x: til 25)
//This will bring the number of buffered records up to ten, triggering another window
publish ([] x: til 5)
sp.window.count(size, frequency)
Parameters:
name | type | description | default |
---|---|---|---|
size | long | The exact number of records to include in each window. | Required |
frequency | long | The number of records between the starts of consecutive windows. If this is less than size , the windows will overlap. |
The window size |
accept_dictionaries | boolean | If batches will never be dictionaries, this can be False to increase performance. | true |
Returns:
A count
operator, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import datetime
>>> import pandas as pd
>>> import pykx as kx
>>> import random
>>> df = pd.DataFrame({
'time': pd.date_range(end=datetime.datetime.now(), periods=30, freq='S'),
'data': [random.uniform(0, 1) for _ in range(30)]
})
>>> sp.run(sp.read.from_callback('publish')
| sp.window.count(size=3, frequency=2)
| sp.write.to_console())
>>> kx.q('publish', df)
| time data
-----------------------------| ---------------------------------------
2023.10.11D16:42:02.301313466| 2023.10.11D16:41:30.315908000 0.1778936
2023.10.11D16:42:02.301313466| 2023.10.11D16:41:31.315908000 0.3942574
2023.10.11D16:42:02.301313466| 2023.10.11D16:41:32.315908000 0.1997905
| time data
-----------------------------| ---------------------------------------
2023.10.11D16:42:02.301390098| 2023.10.11D16:41:32.315908000 0.1997905
2023.10.11D16:42:02.301390098| 2023.10.11D16:41:33.315908000 0.3745958
2023.10.11D16:42:02.301390098| 2023.10.11D16:41:34.315908000 0.1060404
| time data
-----------------------------| ---------------------------------------
2023.10.11D16:42:02.301437123| 2023.10.11D16:41:34.315908000 0.1060404
2023.10.11D16:42:02.301437123| 2023.10.11D16:41:35.315908000 0.9795624
2023.10.11D16:42:02.301437123| 2023.10.11D16:41:36.315908000 0.8657293
...
| time data
-----------------------------| ----------------------------------------
2023.10.11D16:42:02.301886505| 2023.10.11D16:41:56.315908000 0.9722341
2023.10.11D16:42:02.301886505| 2023.10.11D16:41:57.315908000 0.09027353
2023.10.11D16:42:02.301886505| 2023.10.11D16:41:58.315908000 0.3506035
Global Window
Cuts the stream into windows of varying record counts.
.qsp.window.global[trigger]
.qsp.window.global[trigger; .qsp.use (!) . flip (
(`mixedSchemas ; mixedSchemas);
(`acceptDictionaries; acceptDictionaries);
(`state ; state)]
Parameters:
name | type | description | default |
---|---|---|---|
trigger | function | A function that splits the stream (see below). | Required |
options:
name | type | description | default |
---|---|---|---|
mixedSchemas | boolean | mixedSchemas must be true if batches are tables with different schemas. It may also significantly increase performance when batches are tuples or lists of different types. Note the caveat below about how this changes the buffer parameter. | 0b |
acceptDictionaries | boolean | If batches will never be dictionaries, this can be 0b to increase performance. | 1b |
state | any | The initial state. | :: |
For all common arguments, refer to configuring operators
This operator splits a stream into windows by buffering incoming batches, and splitting
the buffer on the indices returned by the trigger
function.
The trigger
function is passed the following parameters:
- the operator's id
- the buffered records
- an offset of where the current batch starts
- the current batch's metadata
- the current batch's data
As batches are ingested, the trigger function will be applied to each batch, and data will be buffered. However, the buffering behavior will depend on the output of the trigger function:
- If the trigger function returns an empty list or generic null, the incoming batch will be buffered and nothing will be emitted.
- If the trigger function returns numbers, the buffer will be split on those indices, with each index being the start of a new window.
Last data batch
The last list will remain in the buffer. This last list can be emitted by returning
the count of the buffer as the last index. To map indices in the current batch to
indices in the buffer, add the offset
parameter to the indices.
The buffered records cannot be modified from the trigger function.
Batches with mixed schemas are only supported when using the mixedSchemas
option.
Caveat when using mixedSchemas
When this is set, the buffer passed to the trigger will be a list of batches, rather than a single table. The indices returned by the trigger function will still work as though the buffer were a single list.
On teardown, any records left in the buffer will be emitted as a single batch.
This pipeline emits a window whenever the high water mark is exceeded.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.window.global[{[id; buffer; offset; md; data]
state: .qsp.get[id; md];
// Split whenever a new high water mark is seen
runningMax: or\[state , data `val];
// Find where the running maximum increases
indices: offset + 1 + where 0 <> 1 _ deltas runningMax;
.qsp.set[id; md] last runningMax;
: $[state < max data `val;
indices;
()]}; .qsp.use ``state!0 15]
.qsp.write.toConsole[]
The pipeline can then be passed data
// This will be buffered, as the initial high water mark is not exceeded
publish ([] val: 0 1 5);
// The 16 exceeds the high water mark, causing everything before it to be emitted
publish ([] val: 14 15 16);
// As each of these are the highest value seen thus far, two windows are emitted
publish ([] val: 17 18);
// With the high water mark now at 18, these will all be buffered
publish ([] val: 16 17 16);
This pipeline splits the stream on a specific value.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.window.global[{[id; buffer; offset; md; data]
offset + where data[`status] = `new
}]
.qsp.write.toConsole[]
The pipeline can then be passed data.
Each window, after the first, will start with the status new
neg[h](`publish; ([] val: 88 89 21 98 0n 24; status: `running`running`running`running`new`running))
neg[h](`publish; ([] val: 34 28 0n 90 0n 47; status: `running`running`new`running`new`running))
sp.window.global_window(trigger_function)
Naming Convention
Defining/using a module attribute named global
would be a syntax error, so this function is
named global_window
as a workaround.
Parameters:
name | type | description | default |
---|---|---|---|
trigger | function | A function that splits the stream (see below). | Required |
mixed_schemas | boolean | True to support batches being tables with different schemas. | False |
accept_dictionaries | boolean | If batches will never be dictionaries, this can be False to increase performance. | 1b |
state | any | The initial state. | :: |
Returns: A global
window, which can be joined to other operators or pipelines.
The trigger
function is passed the following parameters:
- the operator's id
- the buffered records
- an offset of where the current batch starts
- the current batch's metadata
- the current batch's data
As batches are ingested, the trigger function will be applied to the batch, and data will be buffered. However, the buffering behavior will depend on the output of the trigger function:
- If the trigger function returns an empty list or generic null, the incoming batch will be buffered and nothing will be emitted.
- If the trigger function returns numbers, the buffer will be split on those indices, with each index being the start of a new window.
Last data batch
The last list will remain in the buffer. This last list can be emitted by returning
the count of the buffer as the last index. To map indices in the current batch to
indices in the buffer, add the offset
parameter to the indices.
The buffered records cannot be modified from the trigger function.
Batches with mixed schemas are only supported when using the mixed_schemas
option.
Caveat when using mixed_schemas
When this is set, the buffer passed to the trigger will be a list of batches, rather than a single table. The indices returned by the trigger function will still work as though the buffer were a single list.
Examples:
>>> from kxi import sp
>>> from datetime import timedelta
>>> import pandas as pd
>>> import pykx as kx
>>> def trig(id, buf, offset, md, data):
return [i + offset for i in range(len(data)) if data['status'][i] == 'new']
>>> sp.run(sp.read.from_callback('publish')
| sp.window.global_window(trig)
| sp.write.to_console())
>>> df1 = pd.DataFrame({'val': [12],'status': ['running']})
>>> df2 = pd.DataFrame({'val': [14,16],'status': ['running', 'running']})
>>> df3 = pd.DataFrame({'val': [18,20,22],'status': ['running', 'running', 'new']})
>>> df4 = pd.DataFrame({'val': [24,26,44, 55, 66, 77],
'status': ['running', 'running', 'new', 'running', 'new', 'running']})
>>> kx.q('publish', df1) # Buffers one record
>>> kx.q('publish', df2) # Buffers two more records
>>> kx.q('publish', df3) # Emits a window, buffering the record with the status 'new'
| val status
-----------------------------| -----------
2023.10.17D17:38:28.883736096| 12 running
2023.10.17D17:38:28.883736096| 14 running
2023.10.17D17:38:28.883736096| 16 running
2023.10.17D17:38:28.883736096| 18 running
2023.10.17D17:38:28.883736096| 20 running
>>> kx.q('publish', df4)
| val status
-----------------------------| -----------
2023.10.17D17:38:51.211669106| 22 new
2023.10.17D17:38:51.211669106| 24 running
2023.10.17D17:38:51.211669106| 26 running
| val status
-----------------------------| -----------
2023.10.17D17:38:51.211736552| 44 new
2023.10.17D17:38:51.211736552| 55 running
Sliding Window
Cuts the stream into potentially overlapping windows based on data time.
.qsp.window.sliding[period; duration; timeColumn]
.qsp.window.sliding[period; duration; timeColumn; .qsp.use (!) . flip (
(`lateness ; lateness);
(`timeAssigner ; timeAssigner);
(`passthrough ; passthrough);
(`sort ; sort);
(`countTrigger ; countTrigger);
(`skipEmptyWindows ; skipEmptyWindows);
(`acceptDictionaries; acceptDictionaries))]
Parameters:
name | type | description | default |
---|---|---|---|
period | timespan, time, second or minute | How frequently windows should fire. | Required |
duration | timespan, time, second or minute | The length of each window. | Required |
timeColumn | symbol | The column containing the timestamps to window on. | Required, unless timeAssigner is specified |
options:
name | type | description | default |
---|---|---|---|
lateness | timespan, time, second or minute | Time delay before emitting a window to allow late events to arrive. | 0D |
timeAssigner | function | A time-assigner function. | None |
passthrough | boolean | Emit late events as a separate batch, rather than dropping them. | 0b |
sort | boolean | Sort the window in ascending time order. | 0b |
countTrigger | long | The number of buffered records at which the buffer will be flushed automatically. | 0W |
skipEmptyWindows | boolean | Only emit non-empty windows. This can increase performance on sparse historical data. | 0b |
acceptDictionaries | boolean | If batches will never be dictionaries, this can be 0b to increase performance. | 1b |
For all common arguments, refer to configuring operators
This operator will split the stream into windows based on the records' timestamps. Windows are based solely on the event time, not the processing time.
Period is the time between starting a new window, and duration is how long a window runs for.
- If the period and duration are the same, the result is tumbling windows.
- If the duration is longer than the period, the result will be overlapping windows.
- If the period is longer than the duration, the result is hopping windows. Hopping windows are currently not supported.
Note that to avoid running out of memory when too much late data is buffered, or
when the current window has too many records, the countTrigger
option can be used to
emit buffered records for the current window and flush late data when the number of buffered
events reaches a given threshold. Buffer counts are calculated per-key, and only the records
for the key exceeding the limit will be emitted.
A window is triggered when a timestamp is encountered past the end of that window. After that, subsequent events with a timestamp within that window will be discarded.
Any partial windows will be emitted on teardown.
This creates a pipeline that fires a window every 5 seconds, containing the previous 10 seconds of data
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.window.sliding[00:00:05; 00:00:10; `time]
.qsp.write.toConsole[]
This sends data to the pipeline
// Five minutes of historical data from an hour ago
// This will emit 60 windows when the timer fires
publish ([] time: (.z.p - 01:00) + 00:00:01 * til 300; data: 300?1f)
// Data from one minute ago
// This will be emitted as a single window when the timer fires
publish ([] time: .z.p - 00:01; data: 10?1f)
// Current window stays open till a timestamp is seen past the end of the window
// The timer will emit the entire in-progress window if there are any changes,
// potentially emitting records multiple times
time: .z.p
// This emits a window with the data points 0 1 2
publish ([] time: time; data: 0 1 2)
// Wait five seconds, then this emits a window with data points 0 1 2 3 4 5
publish ([] time: time; data: 3 4 5)
// Wait five seconds, then this emits a window with data points 0 1 2 3 4 5 6 7 8
publish ([] time: time; data: 6 7 8)
// Wait five seconds, then this emits a new window containing 9 10 11
publish ([] time: time + 00:01; data: 9 10 11)
sp.window.sliding(period, duration, time_column)
Parameters:
name | type | description | default |
---|---|---|---|
period | timespan, time, second or minute | The frequency at which windows should fire. | Required |
duration | timespan, time, second or minute | The length of each window. | Required |
time_column | symbol | Name of the column containing the event timestamps. Mutually exclusive with the time_assigner argument. |
Required, unless timeAssigner is specified |
lateness | timespan, time, second or minute | The time delay before emitting a window to allow late events to arrive. | 0D |
passthrough | boolean | Whether to send late events through the pipeline with the next batch rather than dropping them. | 0b |
sort | boolean | Whether to sort the window in ascending time order. | 0b |
count_trigger | long | The number of buffered records at which the buffer will be flushed automatically. | 0W |
time_assigner | function | A function which will be called with the data (or the parameters specified by the params keyword argument) which should return a list of timestamps with a value for each record in the data. Mutually exclusive with the time_column argument. |
None |
skip_empty_windows | boolean | True to only emit non-empty windows. This can increase performance on sparse historical data. | 0b |
accept_dictionaries | boolean | If batches will never be dictionaries, this can be False to increase performance. | 1b |
Returns:
A sliding
window, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import datetime
>>> import pandas as pd
>>> import pykx as kx
>>> import random
>>> df = pd.DataFrame({
'time': pd.date_range(end=datetime.datetime.now(), periods=30, freq='S'),
'data': [random.uniform(0, 1) for _ in range(30)]
})
>>> sp.run(sp.read.from_callback('publish')
| sp.window.sliding(period=datetime.timedelta(seconds=1),
duration=datetime.timedelta(seconds=30), time_column="time")
| sp.write.to_console())
>>> kx.q('publish', df)
...
| time data
-----------------------------| ---------------------------------------
2023.10.11D13:54:06.933491844| 2023.10.11D13:44:07.330559000 0.4034322
2023.10.11D13:54:06.933491844| 2023.10.11D13:44:08.330559000 0.4260979
2023.10.11D13:54:06.933491844| 2023.10.11D13:44:09.330559000 0.570041
| time data
-----------------------------| ---------------------------------------
2023.10.11D13:54:06.933550232| 2023.10.11D13:44:08.330559000 0.4260979
2023.10.11D13:54:06.933550232| 2023.10.11D13:44:09.330559000 0.570041
| time data
-----------------------------| --------------------------------------
2023.10.11D13:54:06.933634069| 2023.10.11D13:44:09.330559000 0.570041
>>> sp.teardown()
>>> sp.run(sp.read.from_callback('publish')
| sp.window.sliding(period=datetime.timedelta(seconds=5),
duration=datetime.timedelta(seconds=10), time_column="time")
| sp.write.to_console())
>>> kx.q('publish', df)
...
| time data
-----------------------------| ----------------------------------------
2023.10.11D14:27:52.225268783| 2023.10.11D13:58:35.040720000 0.3428878
2023.10.11D14:27:52.225268783| 2023.10.11D13:58:36.040720000 0.592038
2023.10.11D14:27:52.225268783| 2023.10.11D13:58:37.040720000 0.7430601
2023.10.11D14:27:52.225268783| 2023.10.11D13:58:38.040720000 0.6335474
2023.10.11D14:27:52.225268783| 2023.10.11D13:58:39.040720000 0.08378837
2023.10.11D14:27:52.225268783| 2023.10.11D13:58:40.040720000 0.9767207
2023.10.11D14:27:52.225268783| 2023.10.11D13:58:41.040720000 0.2440352
2023.10.11D14:27:52.225268783| 2023.10.11D13:58:42.040720000 0.3866914
2023.10.11D14:27:52.225268783| 2023.10.11D13:58:43.040720000 0.9789462
2023.10.11D14:27:52.225268783| 2023.10.11D13:58:44.040720000 0.5799388
| time data
-----------------------------| ---------------------------------------
2023.10.11D14:27:57.230844829| 2023.10.11D13:58:40.040720000 0.9767207
2023.10.11D14:27:57.230844829| 2023.10.11D13:58:41.040720000 0.2440352
2023.10.11D14:27:57.230844829| 2023.10.11D13:58:42.040720000 0.3866914
2023.10.11D14:27:57.230844829| 2023.10.11D13:58:43.040720000 0.9789462
2023.10.11D14:27:57.230844829| 2023.10.11D13:58:44.040720000 0.5799388
2023.10.11D14:27:57.230844829| 2023.10.11D13:58:45.040720000 0.9282572
2023.10.11D14:27:57.230844829| 2023.10.11D13:58:46.040720000 0.8006013
| time data
-----------------------------| ---------------------------------------
2023.10.11D14:27:57.230957348| 2023.10.11D13:58:45.040720000 0.9282572
2023.10.11D14:27:57.230957348| 2023.10.11D13:58:46.040720000 0.8006013
Timer Window
Cuts the stream into non-overlapping windows solely based on system processing time.
.qsp.window.timer[period]
.qsp.window.timer[period; .qsp.use (!) . flip (
(`countTrigger ; countTrigger);
(`skipEmptyWindows; skipEmptyWindows);
(`acceptDictionaries; acceptDictionaries))];
Parameters:
name | type | description | default |
---|---|---|---|
period | timespan, time, second or minute | How frequently windows should fire. | Required |
options:
name | type | description | default |
---|---|---|---|
countTrigger | long | The number of buffered records at which the buffer will be flushed automatically. | 0W |
skipEmptyWindows | boolean | Only emit non-empty windows. This can increase performance when windowing sparse streams. | 0b |
acceptDictionaries | boolean | If batches will never be dictionaries, this can be 0b to increase performance. | 1b |
For all common arguments, refer to configuring operators
Timer windows aggregate the stream by processing time, with much less overhead than event-time based windowing operators. Any data in the buffer will be emitted each period. As records are not timestamped, there is no notion of late data. Because these windows ignore event time, this will work on streams that do not have event times.
If the timer fires and the buffer is empty, an empty batch with the schema of the previous window will be emitted. However, if no batches have been received yet, then nothing will be emitted, as the schema is unknown.
The start time for each window is in the emitted batches' metadata. Due to variance in when the timer fires, window durations may not be exactly equal, as is the case for sliding and tumbling windows.
Note that to avoid running out of memory when too much data is buffered, the
countTrigger
option can be used to emit buffered records for the current window and
flush late data when the number of buffered events reaches a given threshold. Buffer
counts are calculated per-key, and only the records for the key exceeding the limit
will be emitted.
On teardown, any buffered records will be emitted.
Emit a window every 5 seconds, or when the buffered data exceeds 10,000 records:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.window.timer[00:00:05; .qsp.use enlist[`countTrigger]!enlist 10000]
.qsp.map[count]
.qsp.write.toConsole[]
// As the timestamps are not read, these records will all be emitted together,
// within five seconds of being received.
publish ([] time: .z.p + 00:00:00 00:00:30 00:00:02; data: 3?1f);
// As the number of records exceeds the countTrigger,
// this will cause the buffer to be emitted immediately.
// As the windowing is based on processing time, timestamps are not required.
publish ([] data: 10001?1f);
sp.window.timer(period, count_trigger)
Parameters:
name | type | description | default |
---|---|---|---|
period | timespan, time, second or minute | The frequency at which windows should fire. | Required |
count_trigger | long | The number of buffered records at which the buffer will be flushed automatically. | 0W |
skip_empty_windows | boolean | True to only emit non-empty windows. | 0b |
accept_dictionaries | boolean | If batches will never be dictionaries, this can be False to increase performance. | 1b |
Returns:
A timer
window, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> from datetime import timedelta
>>> import pandas as pd
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish')
| sp.window.timer(period=timedelta(seconds=5), count_trigger=3)
| sp.write.to_console())
# Execute 3 in a row
>>> kx.q('publish', pd.DataFrame({'x': [1]}))
>>> kx.q('publish', pd.DataFrame({'x': [2]}))
>>> kx.q('publish', pd.DataFrame({'x': [3]}))
| x
-----------------------------| -
2023.10.11D13:23:18.819022773| 1
2023.10.11D13:23:18.819022773| 2
2023.10.11D13:23:18.819022773| 3
| x
| -
...
>>> # execute 2 in a row
>>> kx.q('publish', pd.DataFrame({'x': [4]}))
>>> kx.q('publish', pd.DataFrame({'x': [5]}))
| x
-----------------------------| -
2023.10.11D13:24:30.116093087| 4
2023.10.11D13:24:30.116093087| 5
| x
| -
...
Tumbling Window
Cuts the stream into non-overlapping windows based on data time.
.qsp.window.tumbling[period; timeColumn]
.qsp.window.tumbling[period; timeColumn; .qsp.use (!) . flip (
(`lateness ; lateness);
(`timeAssigner ; timeAssigner);
(`passthrough ; passthrough);
(`sort ; sort);
(`countTrigger ; countTrigger);
(`skipEmptyWindows; skipEmptyWindows);
(`acceptDictionaries; acceptDictionaries))]
Parameters:
name | type | description | default |
---|---|---|---|
period | timespan, time, second or minute | The length of each window. | Required |
timeColumn | symbol | The column containing the timestamps to window on. | Required, unless timeAssigner is specified |
options:
name | type | description | default |
---|---|---|---|
lateness | timespan, time, second or minute | Time delay before emitting a window to allow late events to arrive. | 0D |
timeAssigner | function | A time-assigner function. | None |
passthrough | boolean | Emit late events as a separate batch, rather than dropping them. | 0b |
sort | boolean | Sort the window in ascending time order. | 0b |
countTrigger | long | The number of buffered records at which the buffer will be flushed automatically. Note, this does not specify the size of the partial windows emitted, only the threshold at which the entire buffer will be flushed. | 0W |
skipEmptyWindows | boolean | Only emit non-empty windows. This can increase performance on sparse historical data. | 0b |
For all common arguments, refer to configuring operators
This operator will split the stream into windows based on the records' timestamps. Windows are based solely on the event time, not the processing time.
A window is triggered when a timestamp is encountered past the end of that window. After that, subsequent events with a timestamp within that window will be discarded.
Note that to avoid running out of memory when too much late data is buffered, or
when the current window has too many records, the countTrigger
option can be used to
emit buffered records for the current window and flush late data when the number of buffered
events reaches a given threshold. Buffer counts are calculated per-key, and only the records
for the key exceeding the limit will be emitted.
On teardown, any buffered records will be emitted.
Create 2-second tumbling windows:
// The `timeAssigner` pulls the time from the event-time column `time`
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.window.tumbling[00:00:10; `time]
.qsp.map[{ select avg val from x }]
.qsp.write.toConsole[]
publish ([] time: .z.p + 00:00:10 + til 100; val: 100?1f)
sp.window.tumbling(period, time_column)
Parameters:
name | type | description | default |
---|---|---|---|
period | timespan, time, second or minute | The frequency at which windows should fire. | Required |
time_column | symbol | Name of the column containing the event timestamps. Mutually exclusive with the time_assigner argument. |
Required, unless timeAssigner is specified |
lateness | timespan, time, second or minute | The time delay before emitting a window to allow late events to arrive. | 0D |
passthrough | boolean | True to send late events through the pipeline with the next batch rather than dropping them. | 0b |
sort | boolean | True to sort the window in ascending time order. | 0b |
count_trigger | long | The number of buffered records at which the buffer will be flushed automatically. Note, this does not specify the size of the partial windows emitted, only the threshold at which the entire buffer will be flushed. | 0W |
time_assigner | function | A function which will be called with the data (or the parameters specified by the params keyword argument) which should return a list of timestamps with a value for each record in the data. Mutually exclusive with the time_column argument. |
None |
skip_empty_windows | boolean | True to only emit non-empty windows. This can increase performance on sparse historical data. | 0b |
accept_dictionaries | boolean | If batches will never be dictionaries, this can be False to increase performance. | 0b |
Returns:
A tumbling
operator, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import datetime
>>> import pandas as pd
>>> import pykx as kx
>>> import random
>>> df = pd.DataFrame({
'time': pd.date_range(end=datetime.datetime.now(), periods=30, freq='S'),
'data': [random.uniform(0, 1) for _ in range(30)]
})
>>> sp.run(sp.read.from_callback('publish')
| sp.window.tumbling(period=timedelta(seconds=4), time_column="time")
| sp.write.to_console())
>>> kx.q('publish', df)
| time data
-----------------------------| ---------------------------------------
2023.10.11D16:28:26.552460624| 2023.10.11D16:27:55.250466000 0.4693634
| time data
-----------------------------| ---------------------------------------
2023.10.11D16:28:26.552539018| 2023.10.11D16:27:56.250466000 0.4031693
2023.10.11D16:28:26.552539018| 2023.10.11D16:27:57.250466000 0.1346816
2023.10.11D16:28:26.552539018| 2023.10.11D16:27:58.250466000 0.4040443
2023.10.11D16:28:26.552539018| 2023.10.11D16:27:59.250466000 0.2443313
| time data
-----------------------------| ---------------------------------------
2023.10.11D16:28:26.552712430| 2023.10.11D16:28:00.250466000 0.1977503
2023.10.11D16:28:26.552712430| 2023.10.11D16:28:01.250466000 0.3117616
2023.10.11D16:28:26.552712430| 2023.10.11D16:28:02.250466000 0.5984409
2023.10.11D16:28:26.552712430| 2023.10.11D16:28:03.250466000 0.518701
...
| time data
-----------------------------| ---------------------------------------
2023.10.11D16:28:33.497442545| 2023.10.11D16:28:24.250466000 0.7036805
Time-assigner functions
If preprocessing is required to extract the times from each record or from a batch's metadata, a time assigners function can be used. The time assigner takes an incoming batch as an argument, and returns a list of timestamps or timespans with a value for every record in that batch. Alternatively, it can return a timestamp or timespan atom, to be used for all records in the batch.
This example parses the time from a string, and applies a timezone offset, without changing the value of the underlying column.
.qsp.window.tumbling[period;
.qsp.use ``timeAssigner!(::; {-05:00:00 + "P"$x`start})]
To reference metadata from the time assigner, the params
option can be used. The symbols
data
, metadata
, and operator
can be used to pass in the corresponding value for a
message's data, metadata, and the underlying operator.
.qsp.window.tumbling[period;
.qsp.use `timeAssigner`params!({[x;y;z] y `start}; `data`metadata`operator)]
Performance
Throughput is higher if batches are tables, rather than a list of tuples.
Including late data
To include late data, the acceptable lateness can be set with the lateness
option.
A new timestamp will then trigger a window only when that timestamp exceeds the sum of the
start time, the window duration, and the lateness.
This only applies to tumbling and sliding windows.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.window.tumbling[00:00:10; `time;
.qsp.use enlist[`lateness]!enlist 00:00:02]
.qsp.write.toConsole[]
// The 1 is read one second into the second window, so it gets included in the first window.
// Once the 12 is read though, it closes out the earlier window.
// As such, the 9 will be discarded.
publish each enlist each ([] time: 0p + 00:00:01 * 0 4 3 8 10 1 12 9 20 18 25)
// If the same pattern comes in as a single batch however, all records are included
publish ([] time: 1p + 00:00:01 * 0 4 3 8 11 1 12 9 20 18 25)
Late data passthrough
By default, windows are emitted when the most recent timestamp is greater or equal to the
current window's end time + allowed lateness. Thus, with 10 seconds of lateness allowed,
a window ending at 14:00:00
will be emitted once a record is seen timestamped at or after
14:00:10
.
With passthrough enabled, windows are emitted as soon as they end, and no data will be
discarded due to lateness. Any late data will be buffered until the next window is
triggered, then emitted in a single batch, even when it spans more than one duration.
This batch will be separate from the one that triggered the window, and will have the
isLatePassthrough
flag set in the metadata.
This only applies to tumbling and sliding windows.
time: `timestamp$`date$.z.p;
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.window.tumbling[00:00:10; `timestamp; .qsp.use enlist[`passthrough]!enlist 1b]
.qsp.write.toConsole[]
// The first window starts at time + 60 seconds
publish ([] timestamp: time + 00:00:01 * 60 68 70; val: 1 2 3)
// Late data is buffered
publish ([] timestamp: time + 00:00:01 * 0 15 30; val: 5 4 6)
// When the next window is emitted, the late data is emitted first, in a single batch
publish ([] timestamp: time + 00:00:01 * 83 84; val: 7 8)
To sort batches
To sort the emitted records ascending by timestamp, use the sort
option.
This does not apply to .qsp.window.timer
, as it has no time-assigner.
This pipeline sorts records by timestamp to calculate the deltas. State is used so the delta can be calculated across window boundaries.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.window.tumbling[00:00:15; `timestamp; .qsp.use ``sort!11b]
.qsp.map[{[op; md; data]
previous: .qsp.get[op; md];
.qsp.set[op; md; last data `val];
1 _ deltas previous , data `val
};
.qsp.use ``state!(::; 0)]
.qsp.write.toConsole[]
shuffled: {neg[count x]?x} til 35;
publish ([] timestamp: .z.p + 00:00:01 * shuffled; val: shuffled)
Window start times
Window start times are included in the metadata for windowed batches as window
.
This only applies to tumbling and sliding windows.
This pipeline displays the average value for each window, along with when that window started:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.window.sliding[00:00:02; 00:00:05; `timestamp]
.qsp.map[
{[md; data] select avg val, windowStart: md`window from data};
.qsp.use ``params!(::; `metadata`data)]
.qsp.write.toConsole[]
publish ([] timestamp: .z.p + 00:00:00.1 * til 200; val: 200?1f)
Temporal timeouts for windows with idle streams
For tumbling and sliding windows, buffered records are normally emitted when a new timestamp is seen past the end of the current window. In the event that a stream goes idle the window thus far will be emitted, triggered by a timer with a frequency matching the window's period.
When the timer fires, if there are buffered records and if the time since the last
window was emitted exceeds the period, all data buffered for the current window will be
emitted. As such, multiple batches can be emitted for the same window. Batches emitted by
the timer will have isPartial
set to true in their metadata.
Triggering windows on buffer size
A window will automatically be triggered when the number of buffered records
exceeds a given threshold. This also triggers a cleanup of any late data. This value can
be set using the countTrigger
option.
Note that this can lead to multiple windows being emitted with the same start time.
Batches emitted by the count trigger will have isPartial
set to true in their metadata.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.window.tumbling[00:00:05; `timestamp;
.qsp.use (enlist `countTrigger)!enlist 10000]
.qsp.map[{count x}]
.qsp.write.toConsole[]
// As this batch exceeds the count trigger threshold, all buffered records will immediately
// be emitted, and flushed from the buffer.
publish ([] timestamp: .z.p + 00:00:00.00001 * til 35000; val: 35000?1f)
// This adds 8000 late records to the buffer
publish ([] timestamp: (.z.p - 1D) + 8000#00:00:01; val: 8000?1f)
// Adding another 2500 records triggers a cleanup of late data,
// after which the buffer size is back under the threshold, and no records will be emitted.
publish ([] timestamp: .z.p + 2500#00:00:01; val: 2500?1f)
Reducing partial windows
When a countTrigger or idle-stream timeout causes a partial window to be emitted, those windows can be aggregated using the reduce operator.