kdb+ tick (callback)

This example demonstrates wrapping a callback reader within an onStart lifecycle hook to connect to an arbitrary tickerplant.

// Read any data passed to the local callback function `publish` - this function
// will be created in the global namespace when running the stream
stream: .qsp.read.fromCallback[`publish]

    // Create a sliding window firing every 5 seconds in event time on the
    // last 10 seconds of data. ".z.d+y`time" assigns the event-time timestamp
    // associated with each record in order to assign buckets to the correct
    // windows.
    // This window is also snapped, so will fire at `00:00:05, 00:00:10, 00:00:15,
    // etc` regardless of when the pipeline is started.
    .qsp.window.sliding[00:00:05; 00:00:10; {.z.d+y`time}; .qsp.use``snap!11b]

    // Run a stateful map calculating a custom analytic over the incoming data.
        s:  .qsp.get[o;m];

        d:  select size:sums size, sp:sums size * price by sym from x;
        l:  first each/: select last each size, last each sp by sym from d;
        vw: update ts: m`window, sym:x`sym from select vw:sp % size from ungroup key[d]!(0^s key d) + value d;

        .qsp.set[o;m;@[s;key l;:;value l]]; vw
        }; .qsp.use``state!(::; ([sym:0#`]size:0#0;sp:0#0f))]

    // Select the maximum value for each symbol of the last analytic for each window
    .qsp.map[{ select max vw by sym, ts from x }]

    // Write output events to the console for local debugging

// When the pipeline is started by the Controller, the `onStart` hook is fired.
// At this point, a tickerplant, or any other pub/sub interface, could be subscribed
// to, using the local function `publish` as the callback.
.qsp.onStart {
    // Link up to TP and subscribe for updates
    s: .z.p; while[(null h:@[hopen;`:tp:5000;0N])&.z.p<s+00:00:30;0];
    // Handle TP log replay being lists instead of tables
    upd::enlist[`trade]!enlist{publish flip cols[trade]!x};
    // Subscribe to the relevant table for this pipeline
    {(set). x;-11!y}. h"(.u.sub[`trade;`]; .u`i`L)";
    // Define Tick callback for live (non-log replay) updates
    upd::enlist[`trade]!enlist publish;

.qsp.onCheckpoint { -1 "Checkpoint callback"; };
.qsp.onRecover { -1 "Recover callback"; };

// Purge in-memory state at eod
.u.end: { .qsp.checkpoint[]; .qsp.set[`apply;()!();()]; }

// Start the pipeline
.qsp.run stream;