Enriching streams
Join records from two streams, enriching a live stream with historical reference data
.qsp.merge
is different from .qsp.union
,
which concatenates one stream to another while leaving the records unchanged.
The join function can be any user-defined function.
This example does a left join to fill in the name
column in a stream of tables,
using another table which has been wrapped as a stream.
// Create a table of random strings
// This could also be a query to another kdb+ process
refData: ([] id: 100?`4; name: {rand[20]?" "} each til 100)
// Create a table of random numbers, using the IDs from refData
// This is used to simulate data in the live stream for this example
genData: {[ref;n]
`time xasc ([] id: n?ref`id; time: .z.p + 00:00:01*til n; val: n?100f)
} refData
// Create a pipeline from refData
ref: .qsp.read.fromExpr["refData"]
// Create a pipeline to merge the two streams
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.window.tumbling[00:00:05; `time]
// x is the data from the current pipeline, and y is the data from ref.
// This fills in the "name" column for all the records in x.
.qsp.merge[ref; {x lj `id xkey y}]
.qsp.map[{update val*5 from x where name like "a*"}]
.qsp.write.toConsole[]
// This will print nine batches, each covering five seconds, and with five records.
// With no records to mark the window as complete, the tenth batch will be buffered
// until the timer fires, which it does once per period (five seconds in this case).
// If this is run again, most of the records will be discarded,
// as the timestamps will refer to windows already emitted.
publish genData 50
// This closes the pipeline, allowing a new pipeline to be run
.qsp.teardown[]