Skip to content

Windows

.qsp.window.sliding

Aggregates the stream into potentially overlapping windows based on event time.

Signature:

    .qsp.window.sliding[period; duration; timeAssigner]

        - `period`       - how often windows should fire (e.g., 00:00:05)
        - `duration`     - how much data should be contained in each window (e.g., 00:00:10)
        - `timeAssigner` - function returning timestamps for each event

Period is the time between starting a new window, and duration is how long a window runs for. If the period and duration are the same, the result is tumbling windows. If the duration is longer than the period, the result will be overlapping windows. If the period is longer than the duration, the result is hopping windows. Hopping windows are currently not supported.

timeAssigner is a function that takes two arguments, a message's metadata and data, and returns a list of timestamps with a value for every record in data.

Windows are based completely on the event time, not the processing time.

Throughput will be higher if batches are tables, rather than a list of tuples. Throughput will also be higher if all tables have the same schema. By default, uniform schemas are required. To allow tables with different schemas, use the option .qsp.use enlist[`mixedSchemas]!enlist 1b. Pipelines will be slower when this option is used, even if all batches have the same schema.

A window is triggered when a timestamp is encountered past the end of that window. After that, subsequent events with a timestamp within that window will be discarded. To include late data, the acceptable lateness can be set with the lateness option. A new timestamp will then trigger a window only when that timestamp exceeds the sum of the start time, the window duration, and the lateness.

.qsp.use enlist[`lateness]!enlist 00:00:02

Windows are also triggered by a timer, with a frequency matching the window's period. When the timer fires, if there have been changes since the last window was emitted, and if the time since the last window was emitted exceeds the period, all data buffered for the current window will be emitted. Data emitted by the timer remains in the buffer until the window is closed by future events. This means that individual events can be emitted multiple times as the window is updated.

Example: A window firing every 5 seconds on each 10 seconds of data

   .qsp.run
       .qsp.read.fromCallback[`publish]
       .qsp.window.sliding[00:00:05; 00:00:10; {y`time}]
       .qsp.write.toConsole[]

   // Five minutes of historical data from an hour ago
   // This will emit 60 windows when the timer fires
   publish ([] time: (.z.p - 01:00) + 00:00:01 * til 300; data: 300?1f)

   // Data from one minute ago
   // This will be emitted as a single window when the timer fires
   publish ([] time: .z.p - 00:01; data: 10?1f);

   // The current window stays open until a timestamp is seen past the end of the window.
   // The timer will emit the entire in-progress window if there are any changes,
   // potentially emitting records multiple times
   time: .z.p;
   // This emits a window with the data points 0 1 2
   publish ([] time: time; data: 0 1 2);
   // Wait five seconds, then run this, and a window will be emited with the data points 0 1 2 3 4 5
   publish ([] time: time; data: 3 4 5);
   // Wait five seconds, then run this, and a window will be emited with the data points 0 1 2 3 4 5 6 7 8
   publish ([] time: time; data: 6 7 8);
   // Wait five seconds, then run this, and a new window containing 9 10 11 will be emitted
   publish ([] time: time + 00:01; data: 9 10 11);

Example:

    .qsp.run
        .qsp.read.fromCallback[`publish]
        .qsp.window.sliding[00:00:02; 00:00:05; {y`timestamp}]
        .qsp.map[{[md; data]
            update window: md`window from select avg val from data
            }; .qsp.use``params!(::; `metadata`data)]
        .qsp.write.toConsole[]

    publish ([] timestamp: .z.p + 1000?00:00:10; val: 1000?1f)

Example: A pipeline that can accept heterogenous schemas, allowing 2 seconds of lateness

   .qsp.run
       .qsp.read.fromCallback[`publish]
       .qsp.window.sliding[00:00:05; 00:00:10; {y`time}; .qsp.use `mixedSchema`lateness!(1b; 00:00:02)]
       .qsp.write.toConsole[]

.qsp.window.timer

Aggregates the stream by processing time, with much less overhead than other windowing operations. Any data in the buffer will be emitted each period. As event time is ignored, data will not be dropped for being late.

Signature:

    .qsp.window.timer[period]

        - `period`       - how often windows should fire (e.g., 00:00:05)

Because these windows ignore event time, this will work on streams that do not have event times. Due to variance in when the timer fires, window durations may not be exactly equal, as is the case for sliding and tumbling windows. Batches with mixed schemas are supported.

Example: A window firing every 5 seconds

  .qsp.run
      .qsp.read.fromCallback[`publish]
      .qsp.window.timer[00:00:05]
      .qsp.write.toConsole[]

  // As the timestamps are not read, these records will all be emitted together
  publish ([] time: .z.p + 00:00:00 00:00:30 00:00:02; data: 3?1f);

  // Neither a consistent schema, nor a timestamp are required
  publish ([] data: `a`b`c);

Example: A window will be emitted every 5 seconds, or when the buffered data exceeds 10,000 records

 .qsp.run
       .qsp.read.fromCallback[`publish; `publish]
       .qsp.window.timer[00:00:05; .qsp.use enlist[`batchSize]!enlist 10000]
       .qsp.map[{count x}]
       .qsp.write.toConsole[]
 // This will cause three batches of 10,000 records to be emitted immediately
 // with the remaining 4,000 records emitted when the timer fires
  publish ([] data: 34000?1f);

.qsp.window.tumbling

Aggregates the stream into non-overlapping windows based on event time.

Signature:

    .qsp.window.tumbling[period; timeAssigner]

        - `period`       - how often windows should fire (e.g., 00:00:05)
        - `timeAssigner` - function returning timestamps for each event

timeAssigner is a function that takes two arguments, a message's metadata and data, and returns a list of timestamps with a value for every record in data.

Windows are based completely on the event time, not the processing time.

Throughput will be higher if batches are tables, rather than a list of tuples. Throughput will also be higher if all tables have the same schema. By default, uniform schemas are required. To allow tables with different schemas, use the option .qsp.use enlist[`mixedSchemas]!enlist 1b. Pipelines will be slower when this option is used, even if all batches have the same schema.

A window is triggered when a timestamp is encountered past the end of that window. After that, subsequent events with a timestamp within that window will be discarded. To include late data, the acceptable lateness can be set with the lateness option. A new timestamp will then trigger a window only when that timestamp exceeds the sum of the start time, the window duration, and the lateness.

.qsp.use enlist[`lateness]!enlist 00:00:02

Windows are also triggered by a timer, with a frequency matching the window's period. When the timer fires, if there have been changes since the last window was emitted, and if the time since the last window was emitted exceeds the period, all data buffered for the current window will be emitted. Data emitted by the timer remains in the buffer until the window is closed by future events. This means that individual events can be emitted multiple times as the window is updated.

Example: This pipeline creates 2 second tumbling windows

    // Note the `timeAssigner` pulls the time from the event time column `timestamp` from the data.

    .qsp.run
        .qsp.read.fromCallback[`publish]
        .qsp.window.tumbling[00:00:02; {y`timestamp}]
        .qsp.map[{ select avg val from x }]
        .qsp.write.toConsole[]

    publish ([] timestamp: .z.p + 1000?00:00:10; val: 1000?1f)

Example:

    .qsp.run
        .qsp.read.fromCallback[`publish]
        .qsp.window.tumbling[00:00:02; {y`timestamp}]
        .qsp.map[{[md; data]
            update window: md`window from select avg val from data
            }; .qsp.use``params!(::; `metadata`data)]
        .qsp.write.toConsole[]

    publish ([] timestamp: .z.p + 1000?00:00:10; val: 1000?1f)

Example: A pipeline that can accept heterogenous schemas, allowing 2 seconds of lateness

   .qsp.run
       .qsp.read.fromCallback[`publish]
       .qsp.window.tumbling[00:00:05; {y`time}; .qsp.use `mixedSchema`lateness!(1b; 00:00:02)]
       .qsp.write.toConsole[]