Skip to content

Storage Manager interface

Storage Manager (SM) interacts with two entities:

  • A KX Insights Data Access (DA) service or its equivalent e.g. set of RDB or HDB processes
  • An enterprise messaging service or its equivalent, e.g. a custom tickerplant

Interface with Data Access processes

SM communicates with either the KX Insights Data Access service, or a custom equivalent responsible for serving data from the database. For simplicity, we refer to this entity as DA in the interface definition.

SM exposes APIs for registration and for querying the writedown status, supporting both q IPC and REST (over HTTP) interfaces.

Registration

To receive the reload signal, a client DA registers with SM by invoking the register API with three parameters:

mount      name of a mount from the assembly configuration 
sync       whether reload signals should be sent synchronously (q IPC only)
callback   function name (IPC) or URL (REST)

Set sync to

  • false for stream mounts
  • true for local IDB or HDB mounts, which prevents the DAP losing its on-disk database in the middle of a query. (Long queries can block the SM processing.)

Client makes a synchronous call:

.sm.api.register[mount;sync;callback]

Where

  • mount is a symbol
  • sync is a boolean
  • callback is the name of the function (unary, takes dictionary argument) invoked on each reload signal (symbol)

the response is a dictionary of the last reload signal for the given mount.

Client makes a POST request to the

http://host:port/register

endpoint with a Registration JSON object in which

  • mount is a string
  • callback the URL of the REST endpoint to be invoked on each reload signal.

The response is a ReloadSignal JSON object of the last reload signal for the given mount.

Querying the writedown status

An unregistered client can use the status API to get the status of all mounts.

The API returns

  • mount: name of the mount
  • params: current status of the mount, i.e. values sent in the last reload signal for the mount

Client makes a synchronous call

.sm.api.getStatus[]

The response is a table with columns mount (symbol) and params (dictionary).

Client makes a GET request to endpoint

http://host:port/status

The response is an array of MountStatus JSON objects each containing mount (string) and params (dictionary object).

Reload signal

The reload signal is sent to registered DA clients that need to reload their mounted database. For stream-based mount (RDB-like DA clients) this means purging old table records from memory.

The signal is sent via the callback specified to the registration API. This signal is a dictionary whose content varies according to the type of the mount specified to the registration API.

Common entries for all mount types:

  • ts: time when the migration (e.g. EOI or EOD) processing was started by SM (timestamp)
  • minTS: inclusive purview start time (for stream mount: 1 ns + maxTS of the next mount) (timestamp)

For non-stream-based mounts (IDB/HDB)

  • maxTS: inclusive purview end time (timestamp)

For stream-based mounted (RDB):

  • startTS, endTS: parameters of the corresponding _prntEnd signal (timestamp)
  • pos: stream position of the corresponding _prntEnd signal (long)

The reload signal is also sent via RT as a table _reload (so that it can be processed during replay). The rows in this table correspond to all affected mounts for a given tier migration event, and the schema is

mount    name of mount (symbol)
params   the dictionary argument of the q IPC signal for the corresponding mount

Stream partition end signal

This signal is used by just one type of DAP – with the mount next to the ‘stream mount’ IDB-like DAPs set up with a IDB mount, or HDB-like DAPs otherwise) – in order to ‘draw a line in the sand’ (e.g. by starting a new bucket for new incoming records) for records to be purged from memory on the subsequent corresponding reload signal from SM.

The signal consists of a one-row table (for compatibility with custom tickerplants), sent via RT.

The table name is _prtnEnd (prtn for stream partition; underscore to avoid name collisions with customers’ tables) and has schema:

startTS  inclusive start timestamp of the stream partition
         (SM time when it published the previous signal)
endTS    exclusive end timestamp of the stream partition (SM current time)
opts     dictionary of additional options (for internal SM use)

Comparing timestamps

Since the timestamps in this signal are based on the current time assigned by SM, they should only be used for correlation with the corresponding reload signal (and, possibly, to check that time ranges of subsequent stream partition end signals are adjacent) – not for comparison with the timestamps used for partitioning (prtnCol).

Interface with a messaging service

For simplicity, we refer to the interface for SM’s communication with a messaging service as RT.

SM uses the following interface to interact with the Reliable Transport service (or a custom tickerplant).

.rt.sub[topic;position]    subscribe to topic starting from position
.rt.pub topic              register as publisher for topic
.rt.push message           publish a message
.rt.upd[message;position]  callback for receiving a message
  • topic is the name of a topic (string)
  • position is a position in the stream (long)
  • message may be of arbitrary type: a custom tickerplant is likely to support a pair (table name;table data)

For a custom tickerplant, implement this interface in a client library (q file), which will be loaded at startup from the location specified in the KXI_RT_LIB environment variable.

For the basic tickerplant implementation the RT client library could be:

// === internal tables without time/sym columns ===
.rt.NO_TIME_SYM:`$("_prtnEnd";"_reload")

// === rt publish and push functions ===
.rt.push:{'"cannot push unless you have called .rt.pub first"} // will be overridden

.rt.pub:{[topic]
  if[not 10h=type topic;'"topic must be a string"];
  h:neg hopen hsym`$getenv `KXI_RT_NODES;
  .rt.push:{[nph;payload]
    if[type x:last payload; x:value flip x];
    if[(t:first payload)in .rt.NO_TIME_SYM; x:(count[first x]#'(0Nn;`)),x];
    nph(`.u.upd;t;x);}[h;]; }

// === rt update and subscribe ===

if[`upd in key `.;  '"do not define upd: rt+tick will implement this"]
if[`end in key `.u; '"do not define .u.end: rt+tick will implement this"]

if[not type key`.rt.upd; .rt.upd:{[payload;idx] '"need to implement .rt.upd"}]

.rt.sub:{[topic;startIdx]
  if[not 10h=type topic;'"topic must be a string"];

  //connect to the tickerplant
  h:hopen hsym`$getenv `KXI_RT_NODES;

  //initialise our message counter
  .rt.idx:0;

  // === tick.q will call back to these ===
  upd::{[t;x]
    if[not type x; x:flip(cols .rt.schema t)!x]; // for log replay 
    if[t in .rt.NO_TIME_SYM; x:`time`sym _x]; 
    .rt.upd[(t;x);.rt.idx]; 
    .rt.idx+:1; };

  .u.end:{.rt.idx:.rt.date2startIdx x+1};

  //replay log file and continue the live subscription
  if[null startIdx;startIdx:0W]; // null means follow only, not start from beginning

  //subscribe
  res:h "(.u.sub[`;`]; .u `i`L; .u.d)";
  .rt.schema:(!/)flip res 0; // used to convert arrays to tables during log replay

  //if start index is less than current index, then recover
  if[startIdx<.rt.idx:(.rt.date2startIdx res 2)+res[1;0]; 
     .rt.recoverMultiDay[res[1];startIdx]]; }

//100 billion records per day
.rt.MAX_LOG_SZ:"j"$1e11

.rt.date2startIdx:{("J"$(string x) except ".")*.rt.MAX_LOG_SZ}

.rt.recoverMultiDay:{[iL;startIdx]
  //iL - index and Log (as can be fed into -11!)
  i:first iL; L:last iL;
  //get all files in the same folder as the tp log file
  files:key dir:first pf:` vs last L;
  //get the name of the logfile itself
  fileName:last pf;
  //get all the lognameXXXX.XX.XX files (logname is sym by default - so usually 
  // the files are of the form sym2021.01.01, sym2021.01.02, sym2021.01.03, etc)
  files:files where files like (-10_ string fileName),"*";
  //from those files, get those with dates in the range we are interested in
  files:` sv/: dir,/:asc files where 
    ("J"$(-10#/:string files) except\: ".")>=startIdx div .rt.MAX_LOG_SZ;
  //set up upd to skip the first part of the file and revert 
  // to regular definition when you hit start index
  upd::{[startIdx;updo;t;x] 
    $[.rt.idx>=startIdx; [upd::updo; upd[t;x]]; .rt.idx+:1]
  }[startIdx;upd];
  //read all of all the log files except the last, where you read up to 'i'
  files:0W,/:files; files[(count files)-1;0]:i;
  //reset .rt.idx for each new day and replay the log file
  {.rt.idx:.rt.date2startIdx "D"$-10#string x 1; -11!x}each files; }