Enriching streams

This example uses .qsp.merge to join together records from two streams, enriching a live stream with historical reference data. .qsp.merge is different from .qsp.union, which concatenates one stream to another while leaving the records unchanged.

The join function can be any user defined function.

This example does a left join to fill in the "name" column in a stream of tables, using another table which has been wrapped as a stream.

// Create a table of random strings
// This could also be a query to another kdb+ process
refData: ([] id: 100?`4; name: {rand[20]?" "} each til 100);

// Create a table of random numbers, using the IDs from refData
// This is used to simulate data in the live stream for this example
genData: {[ref;n] `time xasc ([] id: n?ref`id; time: .z.p + 00:00:01*til n; val: n?100f)} refData;

// Create a pipeline from refData
ref: .qsp.read.fromExpr["refData"];

// Create a pipeline to merge the two streams
.qsp.run
    .qsp.read.fromCallback[`publish]
    .qsp.window.tumbling[00:00:05; {y`time}]
    // x is the data from the current pipeline, and y is the data from ref.
    // This fills in the "name" column for all the records in x.
    .qsp.merge[ref; {x lj `id xkey y}]
    .qsp.map[{update val*5 from x where name like "a*"}]
    .qsp.write.toConsole[];

// This will print nine batches, each covering five seconds, and containing five records.
// With no records to mark the window as complete, the tenth batch will be buffered until the timer fires,
// which it does once per period (five seconds in this case).
// If this is run again, most of the records will be discarded,
// as the timestamps will refer to windows already emitted.
publish genData 50

// This closes the pipeline, allowing a new pipeline to be run
.qsp.teardown[]