Skip to content

Interface

Storage Manager (SM) interacts with the following:

  • Data Access (DA) service (or its replacement, e.g. set of RDB or HDB processes)
  • Reliable Transport (RT) service (or its replacement, e.g. a custom tickerplant)

Interface with DA (or its replacement)

SM exposes APIs for registration and querying the write-down status, supporting both q-IPC and REST (over HTTP) interfaces.

Registration

In order to receive the "reload" signal, a client DA registers with SM by invoking register API.

For q-IPC, a client makes a synchronous call to .sm.api.register function (in sm service), which takes the following parameters:

parameter datatype description
mount symbol name of a mount (from assembly configuration)
sync boolean indicates if "reload" signals should be sent synchronously; should be set to 0b for "stream"; setting to 1b for local IDB or HDB mounts prevents the DA process from losing its on-disk database in the middle of a query (acknowledging that long queries can block the SM processing)
callback symbol name of function invoked on each "reload" signal, takes the dictionary argument

The result is a dictionary that is the last "reload" signal for the given mount.

For REST, a client makes a synchronous request to http://host:port/register endpoint, which expects a Registration JSON object containing:

element datatype description
mount string See above
sync boolean See anove
callback string URL of REST endpoint to be invoked on each "reload" signal

The result is a ReloadSignal JSON object, that is the last "reload" signal for the given mount.

Querying the write-down status

A non-registered client can invoke status API to get the status of all mounts.

For q-IPC, a client makes a synchronous call to the niladic .sm.api.getStatus function (in sm service).

The result is a table containing the following columns: - mount (symbol) - name of the mount - params (dictionary) - current status of the mount (values sent in the last "reload" signal for the mount)

For REST, a client makes a synchronous request to http://host:port/status endpoint.

The result if is an array of MountStatus JSON objects containing:

element datatype description
mount string Name of mount (from assembly configuration)
params object Dictionary object of corresponding mount

"Reload" signal

This signal is sent to registered DA clients that need to reload their mounted database. For stream based mount (RDB-like DA clients) this means purging old table records from memory.

The signal is sent via tha callback specified to the registration API. This signal is a dictionary whose content vary depending on the type of the mount specified to registration API.

  • common entries for all mount types:
  • ts (timestamp) - time when the migration (e.g. EOI or EOD) processing was started by SM
  • minTS (timestamp) - inclusive purview start time (for stream mount: 1 ns + maxTS of the next mount)
  • for non-stream based mounts (IDB/HDB)
  • maxTS (timestamp) - inclusive purview end time
  • for stream based mounted (RDB):
  • startTS, endTS (timestamp) - parameters of the corresponding _prntEnd signal
  • pos(long) - stream position of the corresponding _prntEnd signal
  • Duplicated as a table sent via RT. This table contains multiple rows corresponding to all affected mounts for a given tier migration event.
  • Table name = _reload
  • Columns:
    • mount (symbol) - name of mount
    • params - the dictionary argument of the qIPC signal for the corresponding mount

"Stream partition end" signal

This signal is used by only one type of DA processes - with the mount next to the "stream mount" (IDB-like DA processes in a setup with IDB mount, or HDB-like DA processes otherwise) - in order to "draw a line in the sand" (e.g. by starting a new bucket for new incoming records) for records to be purged from memory on the subsequent corresponding "reload" signal from SM.

Format: one row table (for compatibility with custom TPs), sent via EMS

  • Table name = _prtnEnd ("prtn" = stream partition; underscore to avoid name collisions with customers' tables)
  • Columns:
  • startTS - inclusive start timestamp of the stream partition (SM time when it published the previous signal)
  • endTS - exclusive end timestamp of the stream partition (SM current time)
  • opts - dictionary of additional options (for internal SM use)

Since the timestamps in this signal are based on the current time assigned by SM, they should only be used for correlation with the corresponding EOXz signal (and, possibly, to check that time ranges of subsequent EOXa signals are adjacent) - not for comparison with the timesamps used for partitioning (partCol).

Interface with RT (or its replacement)

SM uses the following interface to interact with Reliable Transport service (or a custom tickerplant):

  • .rt.sub[topic;position] - subscribe to a topic (string) starting from a given position (long) in the stream
  • .rt.pub[topic] - register as a publisher for a topic
  • .rt.push[message] - publish a message (arbitrary type; a custom tickerplant is likely to support a pair (table name;table data))
  • .rt.upd[message;position] - callback for receiving a message

For a custom tickerplant, this interface needs to be implemented in a client library (q file), which will be loaded at startup from the location specified in KXI_RT_LIB environment variable. For the basic tickerplant implementation, available at https://github.com/KxSystems/kdb-tick, the RT client library can be the following code:

// === internal tables without time/sym columns ===
.rt.NO_TIME_SYM:`$("_prtnEnd";"_reload")

// === rt publish and push functions ===
.rt.push:{'"cannot push unless you have called .rt.pub first"} // will be overridden

.rt.pub:{[topic]
  if[not 10h=type topic;'"topic must be a string"];
  h:neg hopen hsym`$getenv `KXI_RT_NODES;
  .rt.push:{[nph;payload]
    if[type x:last payload; x:value flip x];
    if[(t:first payload)in .rt.NO_TIME_SYM; x:(count[first x]#'(0Nn;`)),x];
    nph(`.u.upd;t;x);}[h;];
  }

// === rt update and subscribe ===

if[`upd in key `.;  '"do not define upd: rt+tick will implement this"]
if[`end in key `.u; '"do not define .u.end: rt+tick will implement this"]

if[not type key`.rt.upd; .rt.upd:{[payload;idx] '"need to implement .rt.upd"}]

.rt.sub:{[topic;startIdx] 
  if[not 10h=type topic;'"topic must be a string"];

  //connect to the tickerplant
  h:hopen hsym`$getenv `KXI_RT_NODES;

  //initialise our message counter
  .rt.idx:0; 

  // === tick.q will call back to these ===
  upd::{[t;x] if[t in .rt.NO_TIME_SYM; x:`time`sym _x]; .rt.upd[(t;x);.rt.idx]; .rt.idx+:1;};
  .u.end:{.rt.idx:.rt.date2startIdx x+1};

  //replay log file and continue the live subscription
  if[null startIdx;startIdx:0W]; // null means follow only, not start from beginning

  //subscribe
  res:h "(.u.sub[`;`]; .u `i`L; .u.d)";

  //if start index is less than current index, then recover
  if[startIdx<.rt.idx:(.rt.date2startIdx res 2)+res[1;0]; .rt.recoverMultiDay[res[1];startIdx]];
  }

//100 billion records per day
.rt.MAX_LOG_SZ:"j"$1e11

.rt.date2startIdx:{("J"$(string x) except ".")*.rt.MAX_LOG_SZ}

.rt.recoverMultiDay:{[iL;startIdx]
  //iL - index and Log (as can be fed into -11!)
  i:first iL; L:last iL;
  //get all files in the same folder as the tp log file
  files:key dir:first pf:` vs last L; 
  //get the name of the logfile itself
  fileName:last pf;
  //get all the lognameXXXX.XX.XX files (logname is sym by default - so usually the files are of the form sym2021.01.01, sym2021.01.02, sym2021.01.03, etc)
  files:files where files like (-10_ string fileName),"*";
  //from those files, get those with dates in the range we are interested in
  files:` sv/: dir,/:asc files where ("J"$(-10#/:string files) except\: ".")>=startIdx div .rt.MAX_LOG_SZ;
  //set up upd to skip the first part of the file and revert to regular definition when you hit start index
  upd::{[startIdx;updo;t;x] $[.rt.idx>=startIdx; [upd::updo; upd[t;x]]; .rt.idx+:1]}[startIdx;upd];
  //read all of all the log files except the last, where you read up to 'i'
  files:0W,/:files; files[(count files)-1;0]:i;
  //reset .rt.idx for each new day and replay the log file
  {.rt.idx:.rt.date2startIdx "D"$-10#string x 1; -11!x}each files; 
  }