Skip to content

Enriching streams

Join records from two streams, enriching a live stream with historical reference data

.qsp.merge is different from .qsp.union, which concatenates one stream to another while leaving the records unchanged.

The join function can be any user-defined function.

This example does a left join to fill in the name column in a stream of tables, using another table which has been wrapped as a stream.

// Create a table of random strings
// This could also be a query to another kdb+ process
refData: ([] id: 100?`4; name: {rand[20]?" "} each til 100)

// Create a table of random numbers, using the IDs from refData
// This is used to simulate data in the live stream for this example
genData: {[ref;n]
  `time xasc ([] id: n?ref`id; time: .z.p + 00:00:01*til n; val: n?100f)
  } refData

// Create a pipeline from refData
ref: .qsp.read.fromExpr["refData"]

// Create a pipeline to merge the two streams
.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.window.tumbling[00:00:05; `time]
  // x is the data from the current pipeline, and y is the data from ref.
  // This fills in the "name" column for all the records in x.
  .qsp.merge[ref; {x lj `id xkey y}]
  .qsp.map[{update val*5 from x where name like "a*"}]
  .qsp.write.toConsole[]

// This will print nine batches, each covering five seconds, and with five records.
// With no records to mark the window as complete, the tenth batch will be buffered
// until the timer fires, which it does once per period (five seconds in this case).
// If this is run again, most of the records will be discarded,
// as the timestamps will refer to windows already emitted.
publish genData 50

// This closes the pipeline, allowing a new pipeline to be run
.qsp.teardown[]