Skip to content

Windowing on process time

Aggregate a stream of randomly generated strings into windows based on their processing time, rather than event time

This means data will be grouped and emitted based on the local clock of the Worker process, rather than any data field in the message.

A window is emitted every period, containing all buffered data, and the total length of all the strings in each window is written to the console when each window is emitted.

A window will automatically be triggered when the number of buffered records exceeds a given threshold. This threshold can be modified from its default value using the countTrigger option.

// To correctly resolve the "publish" name,
// sendMsg must be defined in the global context
\d .

.qsp.run
  .qsp.read.fromCallback[`publish; .qsp.use``name!``stream]
  .qsp.window.timer[00:00:01; .qsp.use enlist[`countTrigger]!enlist 10000]
  // When a window is triggered by the batch size being reached,
  // it will have exactly 10k records.
  // The following window, emitted by the timer,
  // will then contain fewer records than normal
  .qsp.map[{enlist (count x; sum count each x)}]
  .qsp.write.toConsole["Window Sum "]

// Emit between 0-1999 records, where each is a string 0-9 characters long
sendMsg: {publish (rand[2000]?10)?\:.Q.a;}

// Send a message every 100 ms
.tm.add[`sendMsg; (`sendMsg; ()); 100; 0]

// After 5 seconds, stop sending messages and clean up the pipeline
.tm.add1shot[`stop; ({.tm.del `sendMsg; .qsp.teardown[]}; ()); 0D00:00:05]

.qsp.window.timer