Skip to content

Stream configuration

Streams move and sequence data and messages between components within kdb Insights. kdb Insights includes Reliable Transport (RT) as the primary stream bus. Custom streams can also be used, but they must comply with the RT interface.

Configuration

Within an assembly configuration, the bus section consists of a dictionary of stream entries. The names internal and external are suggested for a stream used for communication within the assembly, and communication with the outside world (perhaps other assemblies), but assemblies may contain further entries for user-defined purposes.

When running kdb Insights microservices, the bus field is required and lists the nodes to connect to.

bus:
  internal:
    protocol: rt
    topic: internal

Each bus entry provides:

name type required description
protocol string Yes The protocol of the messaging system, one of rt or custom. See protocols below for details.
topic string No The name of this stream's topic. For RT streams, this is the name of the assembly concatenated to the topic name with a dash.
nodes string[] No Connection strings to the host and port of the node to connect to for subscribing to the bus.

Protocols

A bus configuration can use one of two possible protocols, rt or custom.

rt Protocol

For streams using RT, you must deploy the RT microservice. In a microservice deployment of RT, the combination of the topic and nodes must resolve to the hostname of the RT deployment.

custom Protocol

The custom protocol can be used to provide a custom stream protocol. To connect to a custom stream, you must include a custom RT library that implements the RT interface as detailed below. The implementation of this interface must be loaded into all included DAPs and the Storage Manager by setting KXI_RT_LIB as an environment variable pointing to the disk location of the implementation.

Interface:

At a minimum, the RT interface must implement .rt.pub and .rt.sub with the following signatures.

// @overview
// Returns a publisher function for publishing data into the stream. The function
// accepts a single `topic` parameter as provided from by the `bus` configuration.
// The returned function must accept one argument, the message payload to publish.
//
// @param topic {string} The topic configured in the `bus` configuration.
// @return {fn(any)}
.rt.pub: {[topic]
  // Your implementation here ...
  }

// @overview
// Subscribes to a topic from an offset index with a callback function. The offset index
// is used to replay messages after a recover event. Messages that are sent to the subscriber
// must be sequential and in the same order when performing a replay. The callback function
// must be passed two arguments, a tuple of the a table name and data, and the current offset
// index.
//
// @param topic {string} The topic configured in the `bus` configuration.
// @param start {long}   The index in the message queue to subscribe from. Initial value will be `0`.
// @param cb    {fn((symbol;table);long)} Message callback function.
// @return {null}
.rt.sub:{[topic;start;cb]
  // Your implementation here ...
  }
Tickerplant Interface

An example custom RT client is included below for connecting to a kdb+ tickerplant using the RT interface.

// === internal tables without time/sym columns ===
.rt.NO_TIME_SYM:`$("_prtnEnd";"_reload")

// === rt publish and push functions ===
.rt.push:{'"cannot push unless you have called .rt.pub first"}; // will be overridden

.rt.pub:{[topic]
  if[not 10h=type topic;'"topic must be a string"];
  h:neg hopen hsym`$getenv `KXI_RT_NODES;
  .rt.push:{[nph;payload]
    if[type x:last payload; x:value flip x];
    if[(t:first payload)in .rt.NO_TIME_SYM; x:(count[first x]#'(0Nn;`)),x];
    nph(`.u.upd;t;x);}[h;];
    .rt.push }

// === rt update and subscribe ===

if[`upd in key `.;  '"do not define upd: rt+tick will implement this"];
if[`end in key `.u; '"do not define .u.end: rt+tick will implement this"];

if[not type key`.rt.upd; .rt.upd:{[payload;idx] '"need to implement .rt.upd"}];

.rt.sub:{[topic;startIdx;uf]
  if[not 10h=type topic;'"topic must be a string"];

  //connect to the tickerplant
  h:hopen hsym`$getenv `KXI_RT_NODES;

  //initialise our message counter
  .rt.idx:0;

  // === tick.q will call back to these ===
  upd::{[uf;t;x]
    if[not type x; x:flip(cols .rt.schema t)!x]; // for log replay
    if[t in .rt.NO_TIME_SYM; x:`time`sym _x];
    uf[(t;x);.rt.idx];
    .rt.idx+:1; }[uf];

  .u.end:{.rt.idx:.rt.date2startIdx x+1};

  //replay log file and continue the live subscription
  if[null startIdx;startIdx:0W]; // null means follow only, not start from beginning

  //subscribe
  res:h "(.u.sub[`;`]; .u `i`L; .u.d)";
  .rt.schema:(!/)flip res 0; // used to convert arrays to tables during log replay

  //if start index is less than current index, then recover
  if[startIdx<.rt.idx:(.rt.date2startIdx res 2)+res[1;0];
     .rt.recoverMultiDay[res[1];startIdx]]; }

//100 billion records per day
.rt.MAX_LOG_SZ:"j"$1e11;

.rt.date2startIdx:{("J"$(string x) except ".")*.rt.MAX_LOG_SZ};

.rt.recoverMultiDay:{[iL;startIdx]
  //iL - index and Log (as can be fed into -11!)
  i:first iL; L:last iL;
  //get all files in the same folder as the tp log file
  files:key dir:first pf:` vs last L;
  //get the name of the logfile itself
  fileName:last pf;
  //get all the lognameXXXX.XX.XX files (logname is sym by default - so usually the files are of the form sym2021.01.01, sym2021.01.02, sym2021.01.03, etc)
  files:files where files like (-10_ string fileName),"*";
  //from those files, get those with dates in the range we are interested in
  files:` sv/: dir,/:asc files where ("J"$(-10#/:string files) except\: ".")>=startIdx div .rt.MAX_LOG_SZ;
  //set up upd to skip the first part of the file and revert to regular definition when you hit start index
  upd::{[startIdx;updo;t;x] $[.rt.idx>=startIdx; [upd::updo; upd[t;x]]; .rt.idx+:1]}[startIdx;upd];
  //read all of all the log files except the last, where you read up to 'i'
  files:0W,/:files; files[(count files)-1;0]:i;
  //reset .rt.idx for each new day and replay the log file
  {.rt.idx:.rt.date2startIdx "D"$-10#string x 1; -11!x}each files;
  };

//100 billion updates per day - 1e11
//30210610*1e11