Skip to content

Windowing on event time

Aggregate a stream of random numbers into overlapping windows, based on event time rather than processing time

Motivation

To facilitate further analytics, this example calculates the median of a column using a sliding window. The incoming data will be grouped into 3-second windows, each covering a timespan one second later than the previous window. The median value of each window will be written to the console when each window is emitted.

When a record is received whose timestamp exceeds the end of the current window, that window will be emitted. The lateness option allows a grace period, so in this case a window will not be emitted until a record 500ms past the end of that window has been seen. If data is coming from multiple upstream processes, this allows some time for delayed processes, or messages routed separately to others, to arrive to the Worker before the window fires.

// Because there is a one-day gap in the data, corresponding to tens of thousands of
// empty windows, the `skipEmptyWindows` option is used to prevent them from being
// written to the console.
stream:
  .qsp.read.fromCallback[`publish]
  .qsp.window.sliding[00:00:01; 00:00:03; `time; .qsp.use `skipEmptyWindows`lateness!(1b; 0D00:00:00.5)]
  .qsp.map[{med x`data}]
  .qsp.write.toConsole["Window Median "];

.qsp.onStart {
    // Create five minutes of historical data for yesterday,
    // with one record per millisecond
    n: 300000;
    yesterday: .z.p - 1D;
    data: ([] time: yesterday + 00:00:00.001 * til n; data: n?1f);

    // Run the pipeline on the historical data, emitting 300 windows
    publish data;

    // The same pipeline can then switch to live data.
    // This emits 10,000 records timestamped with the current time.
    sendMsg: {publish ([] time: .z.p; data: 10000?1f);};

    // Send a message every 100 ms
    .tm.add[`sendMsg; (sendMsg; ()); 100; 0];

    // After 5 seconds, stop sending messages and clean up the pipeline
    .tm.add1shot[`stop; ({.tm.del `sendMsg; .qsp.teardown[]}; ()); 0D00:00:05];
    }

.qsp.run stream;
import sched
import time
from datetime import datetime, timedelta
from statistics import median

import numpy as np
import pandas as pd

import pykx as kx
from kxi import sp

stream = (sp.read.from_callback('publish')
    | sp.window.sliding(timedelta(seconds=1), timedelta(seconds=3), time_column='time', skip_empty_windows=True)
    # As it is slow to iterate over q vectors from Python,
    # this is converted to a Numpy array.
    | sp.map(lambda x: median(x['data'].np()))
    | sp.write.to_console('Window Median '))


# The same pipeline can then switch to live data.
# This emits 10,000 records timestamped with the current time.
def emit(s, remaining):
    if remaining == 0:
        sp.teardown()
    else:
        s.enter(.1, 1, emit, (s, remaining - 1))
        n = 10_000
        data = pd.DataFrame({
            'time': [datetime.now()] * n,
            'data': np.random.randn(n),
        })
        kx.q('publish', data)


def on_start():
    # Create five minutes of historical data for yesterday,
    # with one record per millisecond
    n = 300_000
    yesterday = datetime.now() - timedelta(days=1)
    data = pd.DataFrame({
        'time': yesterday + np.array(range(0, n)) * timedelta(milliseconds=1),
        'data': np.random.randn(n),
    })

    # Run the pipeline on the historical data, emitting 300 windows
    kx.q('publish', data)

    # Send a message every 100 ms for five seconds.
    s = sched.scheduler(time.time, time.sleep)
    s.enter(.1, 1, emit, (s, 50))
    s.run()


sp.lifecycle.on_start(on_start)
sp.run(stream)

Summary

This pipeline calculates the median, every second, for the previous three seconds. It ran on an initial ingest of historical data, then on real-time data. When successful, this prints hundreds of lines such as the following.

Window Median 2022.03.08D23:17:29.970213100 | 0.5131609
Window Median 2022.03.08D23:17:29.970477800 | 0.5076699
Window Median 2022.03.08D23:17:29.970683800 | 0.4977496
Window Median 2022.03.08D23:17:29.970933600 | 0.495833
Window Median 2022.03.08D23:17:29.971081500 | 0.5007027

Next steps