Skip to content

Service Gateway interface

Two types of processes can interact with the SG:

  • client processes making API calls
  • data access processes (DAPs) servicing requests (e.g. RDB, IDB, HDB)

Client

To make an API call to the SG from a q process

  1. hopen a connection to a GW process
  2. issue an API call by passing a 4-item list:

apiName API name (symbol) args named arguments to the API (dictionary) callback binary callback function to be invoked on request completion in the async case (see below) (symbol) opts optional out-of-band information to accompany the request

args must include at least

startTS   Start time (inclusive) (timestamp)
endTS     End time (exclusive)  (timestamp)

and any other assembly label values.

opts must be an untyped dictionary; for no opts, use (0#`)!().

API requests are synchronous or asynchronous. On completion of the request, the GW uses the hopened handle to invoke the callback function with arguments

  1. response header (dictionary)
  2. response payload

Examples:

h:hopen`:gwHost:gwPort;
myCallbackFn:{[hdr;payload] ...};

Async: myCallBackFn will be invoked.

neg[h](`myAPI;
  `startTS`endTS`region`assetClass`arg1`arg2!(-0Wp;0Wp;`amer;`equity;1;"abc");
  `myCallbackFn;
  `logCorr`appCorr!("request_1";10))

Sync: response is a two-item list: header and payload

res:h(`myAPI;
  `startTS`endTS`region`assetClass`arg1`arg2!(-0Wp;0Wp;`amer;`equity;1;"abc");
  `myCallbackFn;
  `logCorr`appCorr!("request_1";10))

Regardless of whether the API itself uses the argument, all API calls must include startTS, endTS, and all labels in the SG’s assembly definition.

You may choose to target one or multiple labels: the SG will route the request to DAPs covering all specified labels, i.e. the Cartesian product of them.

For example, if the labels of our assembly were:

labels:
  region: amer,emea,apac
  assetClass: equity,futures

all API calls would have to include startTS, endTS, region and assetClass.

Multiple regions, one asset class:

args:`startTS`endTS`region`assetClass`other`args!(-0Wp;0Wp;`amer`emea;`futures;1;0b)

The SG will route the request to DAPs covering `amer`futures and `emea`futures.

Discovery

If your client process uses KX Insights Discovery, Gateway processes can be found by discovery.

Select all active GWs:

sn:`$"KXI-SG-GW" / GW service name
select from .com_kx_sd.getServices[]where serviceName=sn,status=`UP

REST

Optionally, API requests for the pre-configured APIs can be made by the REST interface if it is configured to run and the DAPs implement the APIs in question (i.e. you are using the KX Insights Data Access microservice, or your own custom DAPs with your implementations of the APIs.

Data Access process

To integrate with the GW microservice, a DAP must

  • register with the Resource Coordinator

    Open a connection to the Resource Coordinator (hopen) and invoke .sgrc.registerDAP with its data purview. The purview can be updated by calling .sgrc.updDapStatus.

  • execute APIs invoked by the client

    The GW asynchronously invokes the following function on the DAP, which must be defined:

.da.execute[apiName;hdr;args]

Where

  • apiName is an API name (symbol)
  • hdr is a request header (dictionary)
  • args are named API arguments (dictionary)

the DAP executes the named API and

  • sends a response header and a response payload to the correct Aggregator by invoking .sgagg.onPartial on the host/port specified by hdr.agg
  • sends a message to the Resource Coordinator indicating it is available to do more work, by invoking .sgrc.onPartial on the Resource Coordinator with an appropriate response header

The DAP must respond even in the event of an error. Proper error handling should be implemented in order to ensure this.

If using the KX Insights Data Access microservice, all of the above is automatically implemented; the DA need only be configured with its purview and the name of the SG it should connect to.

Purview

A purview describes the portion of the data each individual DAP offers and is the means by which the SG makes its routing decisions. The purview is a dictionary with the following keys:

ver       Purview version number (long)
          used for keeping track of purview changes, not routing
startTS   Start time covered by DAP (inclusive) (timestamp)
endTS     End time covered by DAP (exclusive) (timestamp)

and all other label values defined in the assembly.

DAPs register and update their purviews with the RC using the APIs described below. Every request must include all the purview keys except ver. The RC chooses DAPs that service the request and splits up the arguments as necessary. For example, suppose our SG assembly labels were:

labels:
  city: toronto,montreal,vancouver
  sensorType: gas,electric

Then every request must include startTS and endTS and one or multiple of each city and sensorType. Likewise, every DAP must register itself with startTS and endTS, and one city and one sensorType. Suppose the DAPs registered with the following purviews (ver omitted):

dap city sensorType startTS endTS
dap1 toronto gas -0Wp 0Wp
dap2 toronto electric -0Wp 0Wp
dap3 montreal gas -0Wp 2021.06.01D
dap3 montreal gas 2021.05.01D 0Wp
dap4 montreal electric -0Wp 0Wp
dap5 vancouver gas -0Wp 0Wp
dap6 vancouver electric -0Wp 0Wp

If a request arrived for

startTS`endTS`city`sensorType!(2021.05.10D;2021.06.15D;`toronto`montreal;`gas)

the RC would split up the request as sent to the DAPs as follows:

dap1   `startTS`endTS`city`sensorType!(2021.05.10D;2021.06.15D;`toronto;`gas)
dap3   `startTS`endTS`city`sensorType!(2021.05.10D;2021.06.01D;`montreal;`gas)
dap4   `startTS`endTS`city`sensorType!(2021.06.01D;2021.06.15D;`montreal;`gas)

Any additional arguments (not related to purview) are sent unchanged to all DAPs participating in the request.

The SG will not send the same portion of a request to two different DAPs if they overlap.

Once the SG enlists a DAP to service a request, it is marked unavailable until it reports that it has completed (.sgrc.onPartial). If no DAPs are available to service all or part of a request, the SG sends out what it can, and enqueues the portions that can’t be completed until a DAP registers or updates itself as able to satisfy (wholly or partially) the outstanding portion of the request.

On the aggregator

With the exception of the pre-configured APIs, the only supported aggregation function is raze. (Future versions will support more aggregators.)

So all API results returned to the aggregator should be ‘raze-able’ (i.e. raze(res 0; res 1;...)).

Metadata

DAPs may register with metadata information on the APIs it offers, which is subsequently surfaced through the .kxi.getMeta API. Metadata is a table with the following columns:

column type description example
fn symbol Function/API name. `myAPI
custom boolean Whether this is a custom (true) or built-in (false) API. 0b
params dictionary[] Parameters to the API (see "SAPI - metadata"). See "SAPI - metadata"
return dictionary Return of the API (see "SAPI - metadata"). See "SAPI - metadata"

The custom field is conventionally used in the KX Insights DA microservice to distinguish between built-in KXI APIs and user-defined custom APIs. If you are implementing your own DAP, this field can be used at your discretion.

Schema

DAPs may register with schema information, which is then subsequently surfaced through the .kxi.getMeta API. The schema is a table with the following columns:

column type description example
table symbol Table name. `trade
typ symbol Table type. `partitioned
pkCols symbol[] Primary key columns. `sym`time
updTsCol symbol Update time column. `updTime
prtnCol symbol Column the data is partitioned on. `time
sortColsMem symbol[] Columns the data is sorted on in memory (e.g. RDB). ,`updTime
sortColsIDisk symbol[] Columns the data is sorted on on-intraday-disk (e.g. IDB). `sym`updTime
sortColsDisk symbol[] Columns the data is sorted on on-disk (e.g. HDB). `sym`time
columns table Table columns (see below). See below.

Each columns cell is a table with the following columns:

column type description example
column symbol Column name. `price
typ short KDB+ type. 9h
attrMem symbol In-memory attribute (e.g. RDB). `
attrIDisk symbol On-intraday-disk attribute (e.g. IDB). `s
attrDisk symbol On-disk attribute (e.g. HDB). `p
isSerialized boolean Whether column is serialized. 0b
fk symbol Foreign table information. instrument.sym

The schema is strictly enforced when registering with the RC (see .sgrc.registerDAP), but the columns schema in individual cells is not.

Discovery

The DAP can use discovery to find the RC to report to, for example:

name:`mySG; / "name" in SG assembly definition
sn:`$"KXI-SG-RC"; / RC service name
select from .com_kx_sd.getServices[]
  where serviceName=sg, status=`UP, name=metadata@\:`gwa

The uniqueness of the RC process guarantees there will be only one result from this select.

APIs

Processes a DAP can connect to and the APIs it can call

Resource Coordinator .sgrc.registerDAP register DAP as an API target to service requests .sgrc.updDapStatus update current target status .sgrc.onPartial tell RC we are available for more work

Aggregator .sgagg.onPartial forward the DAP's partial results for aggregation

Resource Coordinator

.sgrc.registerDAP[host;port;avail;purview;asm;metadata;schema]

Where

  • host is the DAP host (symbol)
  • port is the DAP port (int)
  • avail is whether the DAP is available to service requests (boolean)
  • purview is the purview (dictionary)
  • asmis an assembly name, used to identify a family of like DAPs (symbol)
  • metadata is a table of metadata) or an empty list
  • schema is a schema) table or an empty list

registers DAP as an API target to service requests.

Omitting metadata or schema information has no ill effects on query execution, but the DAP/assembly will come up empty on a .kxi.getMeta call. All DAPs within the same assembly (asm) are assumed to share the same table schema.

.sgrc.updDapStatus[avail;purview]

Where

  • avail is whether the DAP is available to service requests (boolean)
  • purview is the purview (dictionary)

updates current target status.

Certain subsets of the purview keys are allowed for efficiency.

The allowed combinations are:

  • all the keys specified above
  • only ver, startTS, and endTS, if no other purview dimensions have changed
  • none, if the purview has not changed, and we want only to update availability
.sgrc.onPartial hdr

Where hdr is a response header (dictionary) containing at least the following keys

rc  response code (byte)
ac  application code (byte)

and all other keys present in the request header, informs the RC that we are available for more work following an API request.

The DAP may also use the API to notify the RC it was unable to send its response to the Aggregator. This is done by including a sendErr key in the response header. (The value is ignored.)

If the sendErr key is present in the response header and hdr.rc is non-zero, the RC attempts to notify the Aggregator/GW of the failure in order to issue a response to the client.

Aggregator

.sgagg.onPartial[hdr;payload]

Where

  • hdr is a response header (dictionary)
  • payload is the response payload (any)

forwards this DAP’s partial results to the Aggregator for aggregation. This function should be executed on the process pointed to by agg in the request header (The value is a symbol of the form `:host:port).

hdr must contain at least the following keys

rc  response code (byte)
ac  application code (byte)

and all other keys present in the request header.

Example

Suppose the SG assembly file is

name: mySG
labels:
  region: amer,emea,apac
  assetClass: equity,futures
elements:
  rc:
    host: rcHost
    port: 1234

The following is a very simple example of a DAP implementation.

//
// Purview - a DAP covers one possible combination of the SG's labels, 
// and some temporal interval. For example, this could be an RDB 
// that covers all of today.
//
// Metadata - Contains metadata on available functions in DAP (optional).
//
// Schema - Contains table/schema info (optional).
//
purview:`ver`region`assetClass`startTS`endTS!(1;`amer;`equity;"p"$.z.D;0Wp)

params:(0#`)!()
params[`myAPI]:(
    `name`type`description`isReq!(`x;-9 9h;"param 1";1b);
    `name`type`description`isReq`default!(`y;10h;"param 2";0b;""));

// etc...

metadata:flip
    (`fn    ,`custom    ,`description   ,`params        ,`return)!flip(
    (`myAPI ;1b         ;"My API."      ;params`myAPI   ;`type`description!(98h;"Return value"))
    // etc...

columns:(0#`)!();
columns[`trade]:flip
    (`column    ,`typ   ,`attrMem   ,`attrIDisk ,`attrDisk  ,`isSerialized  ,`fk)!flip(
    (`sym       ;11h    ;`          ;`g         ;`p         ;0b             ;`instrument.sym);
    (`time      ;12h    ;`          ;`          ;`          ;0b             ;`);
    (`price     ;9h     ;`          ;`          ;`          ;0b             ;`);
    (`size      ;7h     ;`          ;`          ;`          ;0b             ;`);
    (`updTime   ;12h    ;`          ;`          ;`          ;0b             ;0b))

// etc...

schema:flip
    (`table ,`typ           ,`pkCols    ,`updTsCol  ,`sortColsMem   ,`sortColsIDisk ,`sortColsDisk  ,`columns)!flip(
    (`trade ;`partitioned   ;`sym`time  ;`updTime   ;`$()           ;enl`updTime    ;`sym`time      ;columns`trade);
    // etc...

rc:hopen`:rcHost:1234 // Connect to the RC (alternatively, RC can be found via discovery)
neg[rc](`.sgrc.registerDAP;.z.h;system"p";1b;purview;`myAssembly;metadata;schema) // Register with the RC

// Note that if we don't want to provide metadata/schema, we could register with the RC thusly:
//  neg[rc](`.sgrc.registerDAP;.z.h;system"p";1b;purview;`myAssembly;();())

//
// Purview can be updated. For example, at the start of an EOD, a DAP may wish to
// announce itself as unavailable until the EOD ends. Upon EOD completion, it can
// roll its date (and its version number) forward and announce itself as available
// again.
//
onEodStart:{[] neg[rc](`.sgrc.updDapStatus;0b;()!()); }

onEodEnd:{[]
    purview[`ver]+:1; / Roll our version number forward
    purview[`startTS]+:1D; / Roll the date forward
    neg[rc](`.sgrc.updDapStatus;1b;`ver`startTS#purview) }

//
// API execution. 
// This function gets invoked by the GW on an API request. The DAP is
// obligated to respond to the Agg with its result, and the RC with an update that
// it available for more work. It should do this even in the event of an error.
//
// Note the name and signature of the function are important.
//
.da.execute:{[apiName;hdr;args]
    //
    // The hdr includes, among other things, `pvVer`, which is the version number
    // the RC used to make its selection. If the number is out of date, we should
    // NOT proceed as normal, since we may no longer cover the portion of the
    // request that the SG needs from us. We should error or trigger a retry
    // (Note: automatic retries NYI).
    //
    res:$[purview[`ver]=hdr`pvVer;
        @[apiName;args;`execErr]; / Run the API as normal
        `pvErr]; / Purview versions don't match

    //
    // The Agg and RC expect a response header with, at minimum:
    //  - the keys that, and
    //  - response and application codes (where 0 represents OK/no errors).
    //
    // Note: Error mapping in the Agg NYI.
    //
    respHdr:hdr,`rc`ac!
        $[res~`execErr;
            `rc`ac!10 10h; / General error code
        res~`pvErr;
            `rc`ac!13 30h; / Version mismatch error
        / OK
            `rc`ac!0 0h]; / OK

    //
    // Send response to the Agg. It is good practice to check whether the send was
    // successful. In the event that the send failed, we can notify the RC who
    // can complete the request on our behalf.
    //
    // Note, the Agg we are to report to is in hdr`agg.
    //
    sent:.[;(hdr`agg;respHdr;agg);0b]{
        h:hopen x; / Open connection to agg
        neg[h](`.sgagg.onPartial;y;z); / Send response
        hclose h; / Alternatively, cache this for future responses
        1b / Success
        }

    //
    // Notify the RC of completion so that it can select us for more work.
    // If the send to the Agg failed, we can use this opportunity to inform
    // the RC of the failure with the `sendErr` key in the response header.
    //
    if[not sent;respHdr[`sendErr]:1b];   / Include sendErr (value is arbitrary)
    neg[rc](`.sgrc.onPartial;respHdr); } / Notify RC of completion

//
// Implementation of APIs.
//
...