Skip to content

Data Access interface

The interface for DA’s communication with KX Insights Storage Manager or a custom equivalent responsible for data writedown in an application

Storage registration

The data-access process invokes .sm.api.register, passing its mount name, a boolean as to whether it should receive signals synchronously, and a callback function. Data Access (DA) can find a Storage Manager process either through discovery, looking for a process with a serviceName of SM, or by reaching out to the endpoints defined in assembly config at smConn. Through this connection DA will receive and process the Reload signal.

Reload signal

The Reload signal is sent only to DA processes that need to reload their mounted database. For the ‘stream mount’ (RDB-like DA processes) this means purging old table records from memory.

It is invoked in a DA process (via q IPC) lambda (specified at DA registration) which takes a dictionary:

Common entries for all DA processes:

ts       time when the migration (e.g. EOI or EOD) processing
         was started by SM (timestamp)
minTS    inclusive purview start time (timestamp)
         (for stream mount: 1 ns + maxTS of the next mount)

For non-stream mounted DA processes (IDB/HDB):

maxTS    inclusive purview end time (timestamp)

For ‘stream mounted’ DA processes (RDB):

startTS  parameter of the corresponding _prntEnd signal (timestamp)
endTS    parameter of the corresponding _prntEnd signal (timestamp)
pos      stream position of the corresponding _prntEnd signal (long)

DA-GW interface

This is the interface for DA’s communication with either KX Insights Service Gateway, or a custom equivalent responsible for managing client query requests in an application.

Gateway registration

The Data Access (DA) process invokes .sgrc.registerDAP, passing its current availability and its purview as arguments.

DA can find this gateway process through either

  • discovery, looking for a process with a serviceName of KXI-RC with a metadata key gwa matching the name defined in the DA’s gwAssembly element config
  • reaching out to the endpoints defined in the assembly config at gwEndpoints

Through this connection DA sends availability and purview updates by invoking .sgrc.updDapStatus with arguments of availability and its current purview.

API execution

When a gateway invokes an API on a data-access process it does so by calling .da.execute with three arguments.

Invoked in a DA process (via q IPC) lambda .da.reload which takes a dictionary with keys:

api    name of API to call
hdr    header dictionary
args   dictionary of arguments to call API with
       Missing arguments are replaced with `(::)`

and returns a two-item list:

  1. response dictionary indicating success or failure of the request
  2. payload from the API itself, according to the API

Gateway architecture

Two types of gateway architecture are supported, symmetric and asymmetric. They must be set in the element config of the Data Access process under gwArch. This setting controls how the Data Access process responds to an API request that calls .da.execute.

If gwArch is set to symmetric then the response from .da.execute will be returned directly to the gateway on completion, using .z.w and it will invoke .sgagg.onPartial, which must be defined in the gateway to which DA is sending its payload.

If gwArch is set to asymmetric, the gateway must set an agg key in the hdr argument of .da.execute, defining the endpoint where DA should send the payload. Upon completing execution of the API, DA will send the response header and payload to the process defined in agg and then call .sgagg.onPartial, which must be defined on the aggregator.

Custom purviews

You can establish custom purview keys in a process by adding labels to the assembly. These labels become associated with the data access process and get pushed anywhere purviews are pushed.

Get purviews

API call .da.getPurviews gets purview information from data-access processes. Called within the Data Access operator process, if enabled, and returns the current state of Data Access process purviews.

Invoked into a Data Access operator process, with a dictionary of argument keys to select for, returns records for DAs that satisfy all constraints specified.

Example keys:

id        identifier of data-access process
avail     process availability
startTS   inclusive purview start time
endTS     inclusive purview end time
ver       purview version
host      host of data-access process
port      port of data-access process

and any assembly-specific keys.

The response is a table matching the filters with all purview keys and connection information as columns.

DA-RT interface

Data Access uses this interface to interact with Reliable Transport (RT) service (or a custom tickerplant):

.rt.sub[topic;position]     subscribe to a topic (string) 
                            starting from a given position (long) in the stream
.rt.pub[topic]              register as a publisher for a topic
.rt.push[message]           publish a message; arbitrary type: a custom tickerplant 
                            is likely to support a pair (table name;table data)
.rt.upd[message;position]   callback for receiving a message

For a custom tickerplant, implement this interface in a client library (q file), to be loaded at startup from the location specified in the KXI_RT_LIB environment variable.

For the basic tickerplant implementation, the RT client library can be:

// === internal tables without time/sym columns ===
.rt.NO_TIME_SYM:`$("_prtnEnd";"_reload")

// === rt publish and push functions ===
.rt.push:{'"cannot push unless you have called .rt.pub first"} // will be overridden

.rt.pub:{[topic]
  if[not 10h=type topic;'"topic must be a string"];
  h:neg hopen hsym`$getenv `KXI_RT_NODES;
  .rt.push:{[nph;payload]
    if[type x:last payload; x:value flip x];
    if[(t:first payload)in .rt.NO_TIME_SYM; x:(count[first x]#'(0Nn;`)),x];
    nph(`.u.upd;t;x);}[h;]; }

// === rt update and subscribe ===

if[`upd in key `.;  '"do not define upd: rt+tick will implement this"]
if[`end in key `.u; '"do not define .u.end: rt+tick will implement this"]

if[not type key`.rt.upd; .rt.upd:{[payload;idx] '"need to implement .rt.upd"}]

.rt.sub:{[topic;startIdx]
  if[not 10h=type topic;'"topic must be a string"];

  //connect to the tickerplant
  h:hopen hsym`$getenv `KXI_RT_NODES;

  //initialize our message counter
  .rt.idx:0;

  // === tick.q will call back to these ===
  upd::{[t;x] 
    if[t in .rt.NO_TIME_SYM; x:`time`sym _x];
    .rt.upd[(t;x);.rt.idx];
    .rt.idx+:1; };

  .u.end:{.rt.idx:.rt.date2startIdx x+1};

  //replay log file and continue the live subscription
  // null means follow only, not start from beginning
  if[null startIdx;startIdx:0W]; 

  //subscribe
  res:h "(.u.sub[`;`]; .u `i`L; .u.d)";

  //if start index is less than current index, then recover
  if[startIdx<.rt.idx:(.rt.date2startIdx res 2)+res[1;0]; 
    .rt.recoverMultiDay[res[1];
    startIdx]]; }

//100 billion records per day
.rt.MAX_LOG_SZ:"j"$1e11

.rt.date2startIdx:{("J"$(string x) except ".")*.rt.MAX_LOG_SZ}

.rt.recoverMultiDay:{[iL;startIdx]
  //iL - index and Log (as can be fed into -11!)
  i:first iL; L:last iL;
  //get all files in the same folder as the tp log file
  files:key dir:first pf:` vs last L;
  //get the name of the logfile itself
  fileName:last pf;
  //get all the lognameXXXX.XX.XX files 
  //(logname is sym by default - so usually the files are of the form 
    //sym2021.01.01, sym2021.01.02, sym2021.01.03, etc)
  files:files where files like (-10_ string fileName),"*";
  //from those files, get those with dates in the range we are interested in
  files:` sv/: dir,/:asc files 
    where ("J"$(-10#/:string files) except\: ".")>=startIdx div .rt.MAX_LOG_SZ;
  //set up upd to skip the first part of the file 
  //and revert to regular definition when you hit start index
  upd::{[startIdx;updo;t;x] 
      $[.rt.idx>=startIdx; [upd::updo; upd[t;x]]; .rt.idx+:1]
    }[startIdx;upd];
  //read all of all the log files except the last, where you read up to 'i'
  files:0W,/:files; files[(count files)-1;0]:i;
  //reset .rt.idx for each new day and replay the log file
  {.rt.idx:.rt.date2startIdx "D"$-10#string x 1; -11!x}each files; }