Skip to content

Interface

The following section describes the interface for DA's communication with either KX Insights Storage Manager, or a custom equivalent that is responsible for data writedown in an application.

Storage registration

The data access process will invoke .sm.api.register passing its mount name, a boolean as to whether it should receive signals synchrounously, and a callback function. Data Access (DA) can find a storage manager process either through discovery, looking for a process with a serviceName of SM, or by reaching out to the endpoints defined in assembly config at smEndpoints. Through this connection DA will receive and process the Reload signal.

Reload signal

This signal is sent only to DA processes that need to reload their mounted database. For the "stream mount" (RDB-like DA processes) this means purging old table records from memory

  • Format:
    • Invoked in a DA process (via qIPC) lambda (specified at DA registration) which takes a dictionary:
      • common entries for all DA processes:
        • ts (timestamp) - time when the migration (e.g. EOI or EOD) processing was started by SM
        • minTS (timestamp) - inclusive purview start time (for stream mount: 1 ns + maxTS of the next mount)
      • for non-stream mounted DA processes (IDB/HDB)
        • maxTS (timestamp) - inclusive purview end time
      • for "stream mounted" DA processes (RDB):
        • startTS, endTS (timestamp) - parameters of the corresponding _prntEnd signal
        • pos(long) - stream position of the corresponding _prntEnd signal

DA-SG interface

The following section describes the interface for DA's communication with either KX Insights Service Gateway, or a custom equivalent that is responsible for managing client query requests in an application.

Resource Coordinator registration

  • The data access process will invoke .sgrc.registerDAP passing its current availability and its purview as arguments. Data Access Processes (DAPs) can find the resource coordinator process either through discovery, looking for a process with a serviceName of KXI-SG-RC with a metadata key name matching the name defined in the DA's rcName element config. If discovery is not being used, then the DA will reach out to the endpoints defined in assembly config at rcEndpoints. Through this connection DA will send availability and purview updates by invoking .sgrc.updDapStatus with arguments of availability and its current purview.

On the registration call the resource coordinator can trigger .da.registrationErr to state whether the registration was successful. On TYPE, MISMATCH, or DOMAIN errors DA will log the error and not retry registering with that process, as this is usually caused by a configuration issue.

API execution

When a gateway invokes an API on a data access process it does so by calling .da.execute with three arguments.

  • Format:

    • Invoked in a DA process (via qIPC) lambda .da.reload which takes a dictionary with keys:

      • api - Name of API to call.
      • hdr - Header dictionary.
      • args - Dictionary of arguments to call API with. Missing arguments will be replaced with (::) when calling API.
    • Response will be a two element list, where the first element is a response dictionary indicating success or failure of the request, and the second element is the payload from the API itself which can vary depending on the API.

SG architecture

Two types of GW architectures are supported, symmetric and asymmetric, and they are must be set in the element config of the data access process under sgArch. This setting controls how the data access process responds on an API request that calls .da.execute.

If sgArch is set to symmetric then the response from .da.execute will be returned directly to the gateway upon completion, using .z.w and it will invoke .sgagg.onPartial, which must be defined in the gateway that DA is sending its payload to.

If sgArch is set to asymmetric, then the gateway must set an agg key in the hdr argument of .da.execute defining the endpoint where DA should send the payload. Upon completing execution of the API, DA will send the response header and payload to the process defined in agg and then call .sgagg.onPartial, which must be defined on the aggregator.

Custom purviews

User can establish custom purview keys in a process by defining adding to the labels to the assembly. These labels were become associated with the data access process and get pushed anywhere purviews are pushed.

DA-RT interface

Data Access uses the following interface to interact with Reliable Transport service (or a custom tickerplant): * .rt.sub[topic;position] - subscribe to a topic (string) starting from a given position (long) in the stream * .rt.pub[topic] - register as a publisher for a topic * .rt.push[message] - publish a message (arbitrary type; a custom tickerplant is likely to support a pair (table name;table data)) * .rt.upd[message;position] - callback for receiving a message

The subscription topic can be set with the RT_SOURCE environment variable or the assembly. If using the assembly a stream bus needs to be set with the appropriate topic. Example below where the topic is called dataStream:

bus:
  stream:
    protocol: custom
    nodes: tp:5000
    topic: dataStream

For a custom tickerplant, this interface needs to be implemented in a client library (q file), which will be loaded at startup from the location specified in KXI_RT_LIB environment variable. For the basic tickerplant implementation, available at https://github.com/KxSystems/kdb-tick, the RT client library can be the following code:

// === internal tables without time/sym columns ===
.rt.NO_TIME_SYM:`$("_prtnEnd";"_reload")

// === rt publish and push functions ===
.rt.push:{'"cannot push unless you have called .rt.pub first"} // will be overridden

.rt.pub:{[topic]
  if[not 10h=type topic;'"topic must be a string"];
  h:neg hopen hsym`$getenv `KXI_RT_NODES;
  .rt.push:{[nph;payload]
    if[type x:last payload; x:value flip x];
    if[(t:first payload)in .rt.NO_TIME_SYM; x:(count[first x]#'(0Nn;`)),x];
    nph(`.u.upd;t;x);}[h;];
  }

// === rt update and subscribe ===

if[`upd in key `.;  '"do not define upd: rt+tick will implement this"]
if[`end in key `.u; '"do not define .u.end: rt+tick will implement this"]

if[not type key`.rt.upd; .rt.upd:{[payload;idx] '"need to implement .rt.upd"}]

.rt.sub:{[topic;startIdx]
  if[not 10h=type topic;'"topic must be a string"];

  //connect to the tickerplant
  h:hopen hsym`$getenv `KXI_RT_NODES;

  //initialise our message counter
  .rt.idx:0;

  // === tick.q will call back to these ===
  upd::{[t;x] if[t in .rt.NO_TIME_SYM; x:`time`sym _x]; .rt.upd[(t;x);.rt.idx]; .rt.idx+:1;};
  .u.end:{.rt.idx:.rt.date2startIdx x+1};

  //replay log file and continue the live subscription
  if[null startIdx;startIdx:0W]; // null means follow only, not start from beginning

  //subscribe
  res:h "(.u.sub[`;`]; .u `i`L; .u.d)";

  //if start index is less than current index, then recover
  if[startIdx<.rt.idx:(.rt.date2startIdx res 2)+res[1;0]; .rt.recoverMultiDay[res[1];startIdx]];
  }

//100 billion records per day
.rt.MAX_LOG_SZ:"j"$1e11

.rt.date2startIdx:{("J"$(string x) except ".")*.rt.MAX_LOG_SZ}

.rt.recoverMultiDay:{[iL;startIdx]
  //iL - index and Log (as can be fed into -11!)
  i:first iL; L:last iL;
  //get all files in the same folder as the tp log file
  files:key dir:first pf:` vs last L;
  //get the name of the logfile itself
  fileName:last pf;
  //get all the lognameXXXX.XX.XX files (logname is sym by default - so usually the files are of the form sym2021.01.01, sym2021.01.02, sym2021.01.03, etc)
  files:files where files like (-10_ string fileName),"*";
  //from those files, get those with dates in the range we are interested in
  files:` sv/: dir,/:asc files where ("J"$(-10#/:string files) except\: ".")>=startIdx div .rt.MAX_LOG_SZ;
  //set up upd to skip the first part of the file and revert to regular definition when you hit start index
  upd::{[startIdx;updo;t;x] $[.rt.idx>=startIdx; [upd::updo; upd[t;x]]; .rt.idx+:1]}[startIdx;upd];
  //read all of all the log files except the last, where you read up to 'i'
  files:0W,/:files; files[(count files)-1;0]:i;
  //reset .rt.idx for each new day and replay the log file
  {.rt.idx:.rt.date2startIdx "D"$-10#string x 1; -11!x}each files;
  }