Stream configuration
Streams move and sequence data and messages between components within kdb Insights. kdb Insights includes Reliable Transport (RT) as the primary stream bus. Custom streams can also be used, but they must comply with the RT interface.
Configuration
Within an assembly configuration, the bus
section consists of a dictionary of stream entries. The names internal
and external
are suggested for a stream used for communication within the assembly, and communication with the outside world (perhaps other assemblies), but assemblies may contain further entries for user-defined purposes.
In kdb Insights Microservices mode, the bus field is required and lists the nodes to connect to.
bus:
internal:
protocol: rt
topic: internal
In kdb Insights Enterprise, the bus field is optional if the spec.elements.sequencer
key is provided. If provided, the bus
key must be nested under a spec
key.
spec:
bus:
internal:
protocol: rt
topic: assembly-internal
Each bus entry provides:
name | type | required | description |
---|---|---|---|
protocol |
string | Yes | The protocol of the messaging system, one of rt or custom . See protocols below for details. |
topic |
string | No | The name of this stream's topic. For RT streams, this is the name of the assembly concatenated to the topic name with a dash. |
nodes |
string[] | No | Connection strings to the host and port of the node to connect to for subscribing to the bus. |
Protocols
A bus configuration can use one of two possible protocols, rt
or custom
.
rt
Protocol
For streams using RT, you can either use an enterprise configuration or deploy the RT microservice. In a microservice deployment of RT, the combination of the topic
and nodes
must resolve to the hostname of the RT deployment.
custom
Protocol
The custom protocol can be used to provide a custom stream protocol. To connect to a custom stream, you must include a custom RT library that implements the RT interface as detailed below. The implementation of this interface must be loaded into all included DAPs and the Storage Manager by setting KXI_RT_LIB
as an environment variable pointing to the disk location of the implementation.
Interface:
At a minimum, the RT interface must implement .rt.pub
and .rt.sub
with the following signatures.
// @overview
// Returns a publisher function for publishing data into the stream. The function
// accepts a single `topic` parameter as provided from by the `bus` configuration.
// The returned function must accept one argument, the message payload to publish.
//
// @param topic {string} The topic configured in the `bus` configuration.
// @return {fn(any)}
.rt.pub: {[topic]
// Your implementation here ...
}
// @overview
// Subscribes to a topic from an offset index with a callback function. The offset index
// is used to replay messages after a recover event. Messages that are sent to the subscriber
// must be sequential and in the same order when performing a replay. The callback function
// must be passed two arguments, a tuple of the a table name and data, and the current offset
// index.
//
// @param topic {string} The topic configured in the `bus` configuration.
// @param start {long} The index in the message queue to subscribe from. Initial value will be `0`.
// @param cb {fn((symbol;table);long)} Message callback function.
// @return {null}
.rt.sub:{[topic;start;cb]
// Your implementation here ...
}
Tickerplant Interface
An example custom RT client is included below for connecting to a kdb+ tickerplant using the RT interface.
// === internal tables without time/sym columns ===
.rt.NO_TIME_SYM:`$("_prtnEnd";"_reload")
// === rt publish and push functions ===
.rt.push:{'"cannot push unless you have called .rt.pub first"}; // will be overridden
.rt.pub:{[topic]
if[not 10h=type topic;'"topic must be a string"];
h:neg hopen hsym`$getenv `KXI_RT_NODES;
.rt.push:{[nph;payload]
if[type x:last payload; x:value flip x];
if[(t:first payload)in .rt.NO_TIME_SYM; x:(count[first x]#'(0Nn;`)),x];
nph(`.u.upd;t;x);}[h;];
.rt.push }
// === rt update and subscribe ===
if[`upd in key `.; '"do not define upd: rt+tick will implement this"];
if[`end in key `.u; '"do not define .u.end: rt+tick will implement this"];
if[not type key`.rt.upd; .rt.upd:{[payload;idx] '"need to implement .rt.upd"}];
.rt.sub:{[topic;startIdx;uf]
if[not 10h=type topic;'"topic must be a string"];
//connect to the tickerplant
h:hopen hsym`$getenv `KXI_RT_NODES;
//initialise our message counter
.rt.idx:0;
// === tick.q will call back to these ===
upd::{[uf;t;x]
if[not type x; x:flip(cols .rt.schema t)!x]; // for log replay
if[t in .rt.NO_TIME_SYM; x:`time`sym _x];
uf[(t;x);.rt.idx];
.rt.idx+:1; }[uf];
.u.end:{.rt.idx:.rt.date2startIdx x+1};
//replay log file and continue the live subscription
if[null startIdx;startIdx:0W]; // null means follow only, not start from beginning
//subscribe
res:h "(.u.sub[`;`]; .u `i`L; .u.d)";
.rt.schema:(!/)flip res 0; // used to convert arrays to tables during log replay
//if start index is less than current index, then recover
if[startIdx<.rt.idx:(.rt.date2startIdx res 2)+res[1;0];
.rt.recoverMultiDay[res[1];startIdx]]; }
//100 billion records per day
.rt.MAX_LOG_SZ:"j"$1e11;
.rt.date2startIdx:{("J"$(string x) except ".")*.rt.MAX_LOG_SZ};
.rt.recoverMultiDay:{[iL;startIdx]
//iL - index and Log (as can be fed into -11!)
i:first iL; L:last iL;
//get all files in the same folder as the tp log file
files:key dir:first pf:` vs last L;
//get the name of the logfile itself
fileName:last pf;
//get all the lognameXXXX.XX.XX files (logname is sym by default - so usually the files are of the form sym2021.01.01, sym2021.01.02, sym2021.01.03, etc)
files:files where files like (-10_ string fileName),"*";
//from those files, get those with dates in the range we are interested in
files:` sv/: dir,/:asc files where ("J"$(-10#/:string files) except\: ".")>=startIdx div .rt.MAX_LOG_SZ;
//set up upd to skip the first part of the file and revert to regular definition when you hit start index
upd::{[startIdx;updo;t;x] $[.rt.idx>=startIdx; [upd::updo; upd[t;x]]; .rt.idx+:1]}[startIdx;upd];
//read all of all the log files except the last, where you read up to 'i'
files:0W,/:files; files[(count files)-1;0]:i;
//reset .rt.idx for each new day and replay the log file
{.rt.idx:.rt.date2startIdx "D"$-10#string x 1; -11!x}each files;
};
//100 billion updates per day - 1e11
//30210610*1e11
kdb Insights Enterprise
In kdb Insights Enterprise, all streams use Reliable Transport to move data. In this mode, streams are configured under the spec.elements.sequencer
key of the assembly.
User interface configuration
This guide discusses configuration using YAML files. If you are using kdb Insights Enterprise, you can configure your system using the Insights user interface