Stream Processor + kdb+tick (callback)
Wrap a callback reader within an onStart
lifecycle hook to connect to an arbitrary tickerplant
// Read any data passed to the local callback function `publish` - this function
// will be created in the global namespace when running the stream
stream: .qsp.read.fromCallback[`publish]
// Create a sliding window firing every 5 seconds in event time on the
// last 10 seconds of data. ".z.d+x`time" assigns the event-time timestamp
// associated with each record in order to assign buckets to the correct
// windows.
//
// This window is also snapped, so will fire at `00:00:05, 00:00:10, 00:00:15,
// etc` regardless of when the pipeline is started.
.qsp.window.sliding[00:00:05; 00:00:10; .qsp.use`timeAssigner`snap!({.z.d+x`time}; 1b)]
// Run a stateful map calculating a custom analytic over the incoming data.
.qsp.map[
{[o;m;x]
s: .qsp.get[o;m];
d: select size:sums size, sp:sums size * price by sym from x;
l: first each/: select last each size, last each sp by sym from d;
vw: update ts: m`window, sym:x`sym from
select vw:sp % size from ungroup key[d]!(0^s key d) + value d;
.qsp.set[o;m;@[s;key l;:;value l]]; vw
};
.qsp.use``state!(::; ([sym:0#`]size:0#0;sp:0#0f))
]
// Select the maximum value for each symbol of the last analytic for each window
.qsp.map[{ select max vw by sym, ts from x }]
// Write output events to the console for local debugging
.qsp.write.toConsole[]
// When the pipeline is started by the Controller, the `onStart` hook is fired.
// At this point, a tickerplant, or any other pub/sub interface, could be subscribed
// to, using the local function `publish` as the callback.
.qsp.onStart {
// Link up to TP and subscribe for updates
s: .z.p; while[(null h:@[hopen;`:tp:5000;0N])&.z.p<s+00:00:30;0];
// Handle TP log replay being lists instead of tables
upd::enlist[`trade]!enlist{publish flip cols[trade]!x};
// Subscribe to the relevant table for this pipeline
{(set). x;-11!y}. h"(.u.sub[`trade;`]; .u`i`L)";
// Define Tick callback for live (non-log replay) updates
upd::enlist[`trade]!enlist publish; }
.qsp.onCheckpoint { -1 "Checkpoint callback"; }
.qsp.onRecover { -1 "Recover callback"; }
// Purge in-memory state at eod
.u.end: { .qsp.checkpoint[]; .qsp.set[`apply;()!();()]; }
// Start the pipeline
.qsp.run stream