Skip to content

Windowing on process time

Aggregate a stream of randomly generated strings into windows based on their processing time, rather than event time

Motivation

This means data will be grouped and emitted based on the local clock of the Worker process, rather than any data field in the message.

A window is emitted every period, containing all buffered data, and the total length of all the strings in each window is written to the console when each window is emitted.

A window will automatically be triggered when the number of buffered records exceeds a given threshold. This threshold can be modified from its default value using the countTrigger option.

stream:
  .qsp.read.fromCallback[`publish]
  .qsp.window.timer[00:00:01; .qsp.use enlist[`countTrigger]!enlist 10000]
  // When a window is triggered by the batch size being reached,
  // it will have over 10k records.
  // The following window, emitted by the timer,
  // will then contain fewer records than normal.
  .qsp.map[{enlist (count x; sum count each x)}]
  .qsp.write.toConsole["Window Sum "];

.qsp.onStart {
    // Emit between 0-1999 records, where each is a string 0-9 characters long
    sendMsg: {publish (rand[2000]?10)?\:.Q.a;};

    // Send a message every 100 ms
    .tm.add[`sendMsg; (sendMsg; ()); 100; 0];

    // After 5 seconds, stop sending messages and clean up the pipeline
    .tm.add1shot[`stop; ({.tm.del `sendMsg; .qsp.teardown[]}; ()); 0D00:00:05];
    };

.qsp.run stream
import random
import sched
import string
import time
from datetime import datetime, timedelta

import numpy as np
import pandas as pd

import pykx as kx
from kxi import sp

# When a window is triggered by the batch size being reached, 
# it will have over 10k records.
# The following window, emitted by the timer,
# will then contain fewer records than normal.
stream = (sp.read.from_callback('publish')
    | sp.window.timer(timedelta(seconds=1), count_trigger=10_000)
    | sp.map(lambda x: [(len(x), sum(map(len, x)))])
    | sp.write.to_console('Window Sum '))

def random_word():
    # The type must be made explicit as strings are converted to kx.SymbolAtom
    # by default, and the `len` function doesn't work on symbols.
    return kx.CharVector(
        ''.join(random.choices(string.ascii_lowercase, k=random.randint(0,10))))


# Emit between 0-1999 records, where each is a string 0-9 characters long
def emit(s, remaining):
    if remaining == 0:
        sp.teardown()
    else:
        s.enter(.1, 1, emit, (s, remaining - 1))
        kx.q('publish', [random_word() for i in range(random.randint(0,2000))])


def on_start():
    # Send a message every 100 ms for five seconds
    s = sched.scheduler(time.time, time.sleep)
    s.enter(.1, 1, emit, (s, 50))
    s.run()


sp.lifecycle.on_start(on_start)
sp.run(stream)

.qsp.window.timer

Summary

This pipeline calculates the count and byte size of all buffered messages, flushing the buffer every second, and when the buffer exceeds 10,000 records. When successful, it will print several records to the console similar to the following. The count of zero is due to the timer triggering a window right after the buffer was flushed by the count trigger.

Window Sum 2022.03.08D23:20:27.886988800 | 1608 7336
Window Sum 2022.03.08D23:20:28.890176600 | 9410 42559
Window Sum 2022.03.08D23:20:29.788736400 | 10785 48758
Window Sum 2022.03.08D23:20:29.895211800 | 0 ()
Window Sum 2022.03.08D23:20:30.686138200 | 10724 47812

Next steps