Windowing (processing time)

This example uses .qsp.window.timer to aggregate a stream of randomly generated strings into windows based on their processing time, rather than event time. This means that data will be grouped and emitted based on the local clock of the Worker process, rather than any data field in the message. A window is emitted every period containing all buffered data, and the total length of all the strings in each window is written to the console when each window is emitted.

The batchSize option is used to set an upper bound on the number of records that are emitted in one batch. If the number of records buffered exceeds this limit, it will trigger one or more windows to be emitted immediately.

// To correctly resolve the "publish" name,
// sendMsg must be defined in the global context
\d .

.qsp.run
    .qsp.read.fromCallback[`publish; .qsp.use``name!``stream]
    .qsp.window.timer[00:00:01; .qsp.use enlist[`batchSize]!enlist 10000]
    // When a window is triggered by the batch size being reached, it will have exactly 10k records.
    // The following window, emitted by the timer, will then contain fewer records than normal
    .qsp.map[{enlist (count x; sum count each x)}]
    .qsp.write.toConsole["Window Sum "];

// Emit between 0-1999 records, where each is a string 0-9 characters long
sendMsg: {publish (rand[2000]?10)?\:.Q.a;}

// Send a message every 100 ms
.tm.add[`sendMsg; (`sendMsg; ()); 100; 0];

// After 5 seconds, stop sending messages and clean up the pipeline
.tm.add1shot[`stop; ({.tm.del `sendMsg; .qsp.teardown[]}; ()); 0D00:00:05];