Storage Manager interface
Storage Manager (SM) interacts with two entities:
- A KX Insights Data Access (DA) service or its equivalent e.g. set of RDB or HDB processes
- A Reliable Transport based messaging service or its equivalent, e.g. a custom tickerplant
Interface with Data Access processes
SM communicates with either the KX Insights Data Access service, or a custom equivalent responsible for serving data from the database. For simplicity, we refer to this entity as DA in the interface definition.
SM exposes APIs for registration and for querying the writedown status, supporting both q IPC and REST (over HTTP) interfaces.
Registration
To receive the reload signal, a client DA registers with SM by invoking the register
API with three parameters:
mount name of a mount from the assembly configuration
sync whether reload signals should be sent synchronously (q IPC only)
callback function name (IPC) or URL (REST)
Set sync
to
- false for stream mounts
- true for local IDB or HDB mounts, which prevents the DAP losing its on-disk database in the middle of a query. (Long queries can block the SM processing.)
Client makes a synchronous call:
.sm.api.register[mount;sync;callback]
Where
mount
is a symbolsync
is a booleancallback
is the name of the function (unary, takes dictionary argument) invoked on each reload signal (symbol)
the response is a dictionary of the last reload signal for the given mount, or an error symbol if one occurs.
Client makes a POST request to the
http://host:port/register
endpoint with a Registration
JSON object in which
mount
is a stringcallback
the URL of the REST endpoint to be invoked on each reload signal.
The response is a ReloadSignal
JSON object of the last reload signal for the given mount.
The following is a list of errors that might be returned
mount
if the specified mount name is not validstate
if SM is not ready to process the callcallback
if callback argument is missing
404
if the specified mount name is not valid503
if SM is not ready to process the call400
if callback argument is missing
Querying the writedown status
An unregistered client can use the status
API to get the status of all mounts.
The API returns
mount
: name of the mountparams
: current status of the mount, i.e. values sent in the last reload signal for the mount
Client makes a synchronous call
.sm.api.getStatus[]
The response is a table with columns mount
(symbol) and params
(dictionary).
Client makes a GET request to endpoint
http://host:port/status
The response is an array of MountStatus
JSON objects each containing mount
(string) and params
(dictionary object).
Reload signal
The reload signal is sent to registered DA clients that need to reload their mounted database. For stream-based mount (RDB-like DA clients) this means purging old table records from memory.
The signal is sent via the callback specified to the registration API. This signal is a dictionary whose content varies according to the type of the mount specified to the registration API.
Common entries for all mount types:
ts
: time when the migration (e.g. EOI or EOD) processing was started by SM (timestamp)minTS
: inclusive purview start time (for stream mount: 1 ns +maxTS
of the next mount) (timestamp)
For non-stream-based mounts (IDB/HDB)
maxTS
: inclusive purview end time (timestamp)
For stream-based mounted (RDB):
startTS
,endTS
: parameters of the corresponding_prntEnd
signal (timestamp)pos
: stream position of the corresponding_prntEnd
signal (long)
The reload signal is also sent via RT as a table _reload
(so that it can be processed during replay). The rows in this table correspond to all affected mounts for a given tier migration event, and the schema is
mount name of mount (symbol)
params the dictionary argument of the q IPC signal for the corresponding mount
Stream partition end signal
This signal is used by just one type of DAP – with the mount next to the ‘stream mount’ IDB-like DAPs set up with a IDB mount, or HDB-like DAPs otherwise) – in order to ‘draw a line in the sand’ (e.g. by starting a new bucket for new incoming records) for records to be purged from memory on the subsequent corresponding reload signal from SM.
The signal consists of a one-row table (for compatibility with custom tickerplants), sent via RT.
The table name is _prtnEnd
(prtn for stream partition; underscore to avoid name collisions with customers’ tables) and has schema:
startTS inclusive start timestamp of the stream partition
(SM time when it published the previous signal)
endTS exclusive end timestamp of the stream partition (SM current time)
opts dictionary of additional options (for internal SM use)
Comparing timestamps
Since the timestamps in this signal are based on the current time assigned by SM, they should only be used for correlation with the corresponding reload signal (and, possibly, to check that time ranges of subsequent stream partition end signals are adjacent) – not for comparison with the timestamps used for partitioning (prtnCol
).
Interface with a messaging service
For simplicity, we refer to the interface for SM’s communication with a messaging service as RT.
SM uses the following interface to interact with the Reliable Transport service (or a custom tickerplant).
.rt.sub[topic;position;uf] subscribe to topic starting from position
.rt.pub topic register as publisher for topic
pf[message] publish a message - this function is returned by .rt.pub
uf[message;position] callback for receiving a message - this function is provided to .rt.sub as `uf` argument
topic
is the name of a topic (string)position
is a position in the stream (long)uf
is a callback function for receiving topic-specific messages (function|symbol)message
may be of arbitrary type: a custom tickerplant is likely to support a pair (table name;table data)
For a custom tickerplant, implement this interface in a client library (q file), which will be loaded at startup from the location specified in the KXI_RT_LIB
environment variable.
For the basic tickerplant implementation the RT client library could be:
// === internal tables without time/sym columns ===
.rt.NO_TIME_SYM:`$("_prtnEnd";"_reload")
// === rt publish and push functions ===
.rt.push:{'"cannot push unless you have called .rt.pub first"} // will be overridden
.rt.pub:{[topic]
if[not 10h=type topic;'"topic must be a string"];
h:neg hopen hsym`$getenv `KXI_RT_NODES;
.rt.push:{[nph;payload]
if[type x:last payload; x:value flip x];
if[(t:first payload)in .rt.NO_TIME_SYM; x:(count[first x]#'(0Nn;`)),x];
nph(`.u.upd;t;x);}[h;];
.rt.push }
// === rt update and subscribe ===
if[`upd in key `.; '"do not define upd: rt+tick will implement this"]
if[`end in key `.u; '"do not define .u.end: rt+tick will implement this"]
if[not type key`.rt.upd; .rt.upd:{[payload;idx] '"need to implement .rt.upd"}]
.rt.sub:{[topic;startIdx;uf]
if[not 10h=type topic;'"topic must be a string"];
//connect to the tickerplant
h:hopen hsym`$getenv `KXI_RT_NODES;
//initialise our message counter
.rt.idx:0;
// === tick.q will call back to these ===
upd::{[uf;t;x]
if[not type x; x:flip(cols .rt.schema t)!x]; // for log replay
if[t in .rt.NO_TIME_SYM; x:`time`sym _x];
uf[(t;x);.rt.idx];
.rt.idx+:1; }[uf];
.u.end:{.rt.idx:.rt.date2startIdx x+1};
//replay log file and continue the live subscription
if[null startIdx;startIdx:0W]; // null means follow only, not start from beginning
//subscribe
res:h "(.u.sub[`;`]; .u `i`L; .u.d)";
.rt.schema:(!/)flip res 0; // used to convert arrays to tables during log replay
//if start index is less than current index, then recover
if[startIdx<.rt.idx:(.rt.date2startIdx res 2)+res[1;0];
.rt.recoverMultiDay[res[1];startIdx]]; }
//100 billion records per day
.rt.MAX_LOG_SZ:"j"$1e11
.rt.date2startIdx:{("J"$(string x) except ".")*.rt.MAX_LOG_SZ}
.rt.recoverMultiDay:{[iL;startIdx]
//iL - index and Log (as can be fed into -11!)
i:first iL; L:last iL;
//get all files in the same folder as the tp log file
files:key dir:first pf:` vs last L;
//get the name of the logfile itself
fileName:last pf;
//get all the lognameXXXX.XX.XX files (logname is sym by default - so usually
// the files are of the form sym2021.01.01, sym2021.01.02, sym2021.01.03, etc)
files:files where files like (-10_ string fileName),"*";
//from those files, get those with dates in the range we are interested in
files:` sv/: dir,/:asc files where
("J"$(-10#/:string files) except\: ".")>=startIdx div .rt.MAX_LOG_SZ;
//set up upd to skip the first part of the file and revert
// to regular definition when you hit start index
upd::{[startIdx;updo;t;x]
$[.rt.idx>=startIdx; [upd::updo; upd[t;x]]; .rt.idx+:1]
}[startIdx;upd];
//read all of all the log files except the last, where you read up to 'i'
files:0W,/:files; files[(count files)-1;0]:i;
//reset .rt.idx for each new day and replay the log file
{.rt.idx:.rt.date2startIdx "D"$-10#string x 1; -11!x}each files; }