Interface
The following section describes the interface for DA's communication with either KX Insights Storage Manager, or a custom equivalent that is responsible for data writedown in an application.
Storage registration
The data access process will invoke .sm.api.register
passing its mount name, a boolean as to whether it should receive signals synchrounously, and a callback function. Data Access (DA
) can find a storage manager process either through discovery, looking for a process with a serviceName
of SM
, or by reaching out to the endpoints defined in assembly config at smEndpoints
. Through this connection DA
will receive and process the Reload
signal.
Reload signal
This signal is sent only to DA processes that need to reload their mounted database. For the "stream mount" (RDB-like DA processes) this means purging old table records from memory
- Format:
- Invoked in a DA process (via qIPC) lambda (specified at DA registration) which takes a dictionary:
- common entries for all DA processes:
ts
(timestamp) - time when the migration (e.g. EOI or EOD) processing was started by SMminTS
(timestamp) - inclusive purview start time (for stream mount: 1 ns +maxTS
of the next mount)
- for non-stream mounted DA processes (IDB/HDB)
maxTS
(timestamp) - inclusive purview end time
- for "stream mounted" DA processes (RDB):
startTS
,endTS
(timestamp) - parameters of the corresponding_prntEnd
signalpos
(long) - stream position of the corresponding_prntEnd
signal
- common entries for all DA processes:
- Invoked in a DA process (via qIPC) lambda (specified at DA registration) which takes a dictionary:
DA-SG interface
The following section describes the interface for DA's communication with either KX Insights Service Gateway, or a custom equivalent that is responsible for managing client query requests in an application.
Resource Coordinator registration
- The data access process will invoke
.sgrc.registerDAP
passing its currentavailability
and itspurview
as arguments. Data Access Processes (DAPs
) can find the resource coordinator process either through discovery, looking for a process with aserviceName
ofKXI-SG-RC
with a metadata keyname
matching the name defined in the DA'srcName
element config. If discovery is not being used, then the DA will reach out to the endpoints defined in assembly config atrcEndpoints
. Through this connectionDA
will send availability and purview updates by invoking.sgrc.updDapStatus
with arguments ofavailability
and its currentpurview
.
On the registration call the resource coordinator can trigger .da.registrationErr
to state whether the registration was successful. On TYPE, MISMATCH, or DOMAIN errors DA will log the error and not retry registering with that process, as this is usually caused by a configuration issue.
API execution
When a gateway invokes an API on a data access process it does so by calling .da.execute
with three arguments.
-
Format:
-
Invoked in a DA process (via qIPC) lambda
.da.reload
which takes a dictionary with keys:api
- Name of API to call.hdr
- Header dictionary.args
- Dictionary of arguments to call API with. Missing arguments will be replaced with(::)
when calling API.
-
Response will be a two element list, where the first element is a response dictionary indicating success or failure of the request, and the second element is the payload from the API itself which can vary depending on the API.
-
SG architecture
Two types of GW architectures are supported, symmetric
and asymmetric
, and they are must be set in the element config of the data access process under sgArch
. This setting controls how the data access process responds on an API request that calls .da.execute
.
If sgArch
is set to symmetric
then the response from .da.execute
will be returned directly to the gateway upon completion, using .z.w
and it will invoke .sgagg.onPartial
, which must be defined in the gateway that DA is sending its payload to.
If sgArch
is set to asymmetric
, then the gateway must set an agg
key in the hdr
argument of .da.execute
defining the endpoint where DA should send the payload. Upon completing execution of the API, DA will send the response header and payload to the process defined in agg
and then call .sgagg.onPartial
, which must be defined on the aggregator.
Custom purviews
User can establish custom purview keys in a process by defining adding to the labels to the assembly. These labels were become associated with the data access process and get pushed anywhere purviews are pushed.
DA-RT interface
Data Access uses the following interface to interact with Reliable Transport service (or a custom tickerplant):
* .rt.sub[topic;position]
- subscribe to a topic (string) starting from a given position (long) in the stream
* .rt.pub[topic]
- register as a publisher for a topic
* .rt.push[message]
- publish a message (arbitrary type; a custom tickerplant is likely to support a pair (table name;table data))
* .rt.upd[message;position]
- callback for receiving a message
The subscription topic can be set with the RT_SOURCE
environment variable or the assembly. If using the assembly a stream
bus needs to be set with the appropriate topic. Example below where the topic is called dataStream
:
bus:
stream:
protocol: custom
nodes: tp:5000
topic: dataStream
For a custom tickerplant, this interface needs to be implemented in a client library (q file), which will be loaded at startup from the location specified in KXI_RT_LIB
environment variable. For the basic tickerplant implementation, available at https://github.com/KxSystems/kdb-tick, the RT client library can be the following code:
// === internal tables without time/sym columns ===
.rt.NO_TIME_SYM:`$("_prtnEnd";"_reload")
// === rt publish and push functions ===
.rt.push:{'"cannot push unless you have called .rt.pub first"} // will be overridden
.rt.pub:{[topic]
if[not 10h=type topic;'"topic must be a string"];
h:neg hopen hsym`$getenv `KXI_RT_NODES;
.rt.push:{[nph;payload]
if[type x:last payload; x:value flip x];
if[(t:first payload)in .rt.NO_TIME_SYM; x:(count[first x]#'(0Nn;`)),x];
nph(`.u.upd;t;x);}[h;];
}
// === rt update and subscribe ===
if[`upd in key `.; '"do not define upd: rt+tick will implement this"]
if[`end in key `.u; '"do not define .u.end: rt+tick will implement this"]
if[not type key`.rt.upd; .rt.upd:{[payload;idx] '"need to implement .rt.upd"}]
.rt.sub:{[topic;startIdx]
if[not 10h=type topic;'"topic must be a string"];
//connect to the tickerplant
h:hopen hsym`$getenv `KXI_RT_NODES;
//initialise our message counter
.rt.idx:0;
// === tick.q will call back to these ===
upd::{[t;x] if[t in .rt.NO_TIME_SYM; x:`time`sym _x]; .rt.upd[(t;x);.rt.idx]; .rt.idx+:1;};
.u.end:{.rt.idx:.rt.date2startIdx x+1};
//replay log file and continue the live subscription
if[null startIdx;startIdx:0W]; // null means follow only, not start from beginning
//subscribe
res:h "(.u.sub[`;`]; .u `i`L; .u.d)";
//if start index is less than current index, then recover
if[startIdx<.rt.idx:(.rt.date2startIdx res 2)+res[1;0]; .rt.recoverMultiDay[res[1];startIdx]];
}
//100 billion records per day
.rt.MAX_LOG_SZ:"j"$1e11
.rt.date2startIdx:{("J"$(string x) except ".")*.rt.MAX_LOG_SZ}
.rt.recoverMultiDay:{[iL;startIdx]
//iL - index and Log (as can be fed into -11!)
i:first iL; L:last iL;
//get all files in the same folder as the tp log file
files:key dir:first pf:` vs last L;
//get the name of the logfile itself
fileName:last pf;
//get all the lognameXXXX.XX.XX files (logname is sym by default - so usually the files are of the form sym2021.01.01, sym2021.01.02, sym2021.01.03, etc)
files:files where files like (-10_ string fileName),"*";
//from those files, get those with dates in the range we are interested in
files:` sv/: dir,/:asc files where ("J"$(-10#/:string files) except\: ".")>=startIdx div .rt.MAX_LOG_SZ;
//set up upd to skip the first part of the file and revert to regular definition when you hit start index
upd::{[startIdx;updo;t;x] $[.rt.idx>=startIdx; [upd::updo; upd[t;x]]; .rt.idx+:1]}[startIdx;upd];
//read all of all the log files except the last, where you read up to 'i'
files:0W,/:files; files[(count files)-1;0]:i;
//reset .rt.idx for each new day and replay the log file
{.rt.idx:.rt.date2startIdx "D"$-10#string x 1; -11!x}each files;
}