Skip to content

Stream Processor + kdb+tick (callback)

Wrap a callback reader within an onStart lifecycle hook to connect to an arbitrary tickerplant

// Read any data passed to the local callback function `publish` - this function
// will be created in the global namespace when running the stream
stream: .qsp.read.fromCallback[`publish]

  // Create a sliding window firing every 5 seconds in event time on the
  // last 10 seconds of data. ".z.d+x`time" assigns the event-time timestamp
  // associated with each record in order to assign buckets to the correct
  // windows.
  //
  // This window is also snapped, so will fire at `00:00:05, 00:00:10, 00:00:15,
  // etc` regardless of when the pipeline is started.
  .qsp.window.sliding[00:00:05; 00:00:10; .qsp.use`timeAssigner`snap!({.z.d+x`time}; 1b)]

  // Run a stateful map calculating a custom analytic over the incoming data.
  .qsp.map[
    {[o;m;x]
      s:  .qsp.get[o;m];

      d:  select size:sums size, sp:sums size * price by sym from x;
      l:  first each/: select last each size, last each sp by sym from d;
      vw: update ts: m`window, sym:x`sym from
        select vw:sp % size from ungroup key[d]!(0^s key d) + value d;

      .qsp.set[o;m;@[s;key l;:;value l]]; vw
    };
    .qsp.use``state!(::; ([sym:0#`]size:0#0;sp:0#0f))
  ]

  // Select the maximum value for each symbol of the last analytic for each window
  .qsp.map[{ select max vw by sym, ts from x }]

  // Write output events to the console for local debugging
  .qsp.write.toConsole[]


// When the pipeline is started by the Controller, the `onStart` hook is fired.
// At this point, a tickerplant, or any other pub/sub interface, could be subscribed
// to, using the local function `publish` as the callback.
.qsp.onStart {
  // Link up to TP and subscribe for updates
  s: .z.p; while[(null h:@[hopen;`:tp:5000;0N])&.z.p<s+00:00:30;0];
  // Handle TP log replay being lists instead of tables
  upd::enlist[`trade]!enlist{publish flip cols[trade]!x};
  // Subscribe to the relevant table for this pipeline
  {(set). x;-11!y}. h"(.u.sub[`trade;`]; .u`i`L)";
  // Define Tick callback for live (non-log replay) updates
  upd::enlist[`trade]!enlist publish; }

.qsp.onCheckpoint { -1 "Checkpoint callback"; }
.qsp.onRecover { -1 "Recover callback"; }

// Purge in-memory state at eod
.u.end: { .qsp.checkpoint[]; .qsp.set[`apply;()!();()]; }

// Start the pipeline
.qsp.run stream

.qsp.read.fromCallback