Windowing (event time)

This example uses .qsp.window.sliding to aggregate a stream of random numbers into overlapping windows, based on event time rather than processing time.

When a record is received whose timestamp exceeds the end of the current window, that window will be emitted. The lateness option allows a grace period, so in this case a window will not be emitted until a record 500ms past the end of that window has been seen. If data is coming from multiple upstream processes, this allows some time for delayed processes, or messages routed separately to others, to arrive to the Worker before the window fires.

The incoming data will be grouped into three second windows, each covering a timespan one second later than the previous window. The median value of each window will be written to the console when each window is emitted.

// To correctly resolve the "publish" name,
// sendMsg must be defined in the global context
\d .

.qsp.run
    .qsp.read.fromCallback[`publish]
    .qsp.window.sliding[00:00:01; 00:00:03; {y `time}; .qsp.use ``lateness!(::; 0D00:00:00.5)]
    .qsp.map[{med x `data}]
    .qsp.write.toConsole["Window Median "];

// Create five minutes of historical data for yesterday,
// with one record per millisecond
n: 300000;
yesterday: .z.p - 1D;
data: ([] time: yesterday + 00:00:00.001 * til n; data: n?1f);

// Run the pipeline on the historical data, emitting 300 windows
publish data

// The same pipeline can then switch to live data.
// This emits 10,000 records timestamped with the current time.
sendMsg: {publish ([] time: .z.p; data: 10000?1f);}

// Send a message every 100 ms
.tm.add[`sendMsg; (`sendMsg; ()); 100; 0];

// After 5 seconds, stop sending messages and clean up the pipeline
.tm.add1shot[`stop; ({.tm.del `sendMsg; .qsp.teardown[]}; ()); 0D00:00:05];

// This closes the pipeline, allowing a new pipeline to be run
.qsp.teardown[]