Service Gateway interface
There are two types of processes that can interact with the SG: client processes making API calls, and data access processes (DAPs) servicing requests (e.g. RDB/IDB/HDB).
Client
To make an API call to the SG from a q process, do the following:
1. hopen
a connection to a GW process.
2. Issue an API call by passing a 4-element list with the following elements:
apiName
- symbol - API name.args
- dictionary - Named arguments to the API. These can include:table
- symbol - Table name.startTS
- timestamp - Start time (inclusive) -- used for routing.endTS
- timestamp - End time (exclusive) -- used for routing.inputTZ
- symbol - Timezone ofstartTS
andendTS
.outputTZ
- symbol - Timezone of API response.- DAP labels -- used for routing.
- API-specific arguments.
callback
-symbol - Dyadic callback function to be invoked on request completion in the async case (see below).opts
- dictionary - Optional out-of-band information to accompany the request (see "header" page). This MUST be an untyped dictionary. For noopts
, use(0#`)!()
.
API requests are done synchronously or asynchronously. Upon completion of the request, the GW uses the hopened handle to invoke the callback
function with the following arguments:
- hdr - dictionary - Response header (see "header" page).
- payload - any - Response payload.
Example
h:hopen`:gwHost:gwPort;
myCallbackFn:{[hdr;payload] ...};
/ Async -- `myCallBackFn` will be invoked.
neg[h](`myAPI;`startTS`endTS`region`assetClass`arg1`arg2!(-0Wp;0Wp;`amer;`equity;1;"abc");`myCallbackFn;`logCorr`appCorr!("request_1";10));
/ Sync -- response is a two-element list: (hdr;payload).
res:h(`myAPI;`startTS`endTS`region`assetClass`arg1`arg2!(-0Wp;0Wp;`amer;`equity;1;"abc");`myCallbackFn;`logCorr`appCorr!("request_1";10));
res 0 / Header
res 1 / Payload
Moreover, note that, whether the API itself uses the arguments or not, all API calls may includes startTS
, endTS
and any DAP labels. If not included, these default to "all", i.e.
arg | default |
---|---|
startTS |
-0Wp |
endTS |
0Wp |
labels | All available labels |
For example, suppose DAPs are labeled by exchange
and assetClass
, and the list of available labels is:
exchange | assetClass |
---|---|
nyse | equity |
nyse | futures |
tsx | equity |
Some examples of defaulting:
-
No routing args is equivalent to querying for all available exchanges/asset classes/time, i.e.
exchange assetClass startTS endTS nyse equity -0Wp
0Wp
nyse futures -0Wp
0Wp
tsx equity -0Wp
0Wp
-
If specifying
exchange=nyse
andstartTS=2021.01.31D
, thenassetClass
defaults to`equity`futures
andendTS
defaults to0Wp
. -
If specifying
exchange=futures
andendTS=2021.10.06D
, thenassetClass
defaults to`nyse
(as there is notsx
-futures
andstartTS
defaults to-0Wp
.
Discovery
If your client process uses kdb Insights Discovery, Gateway processes can be found via discovery. The following sample code will select all active GWs:
sn:`$"KXI-SG-GW" / GW service name
select from .com_kx_sd.getServices[]where serviceName=sn,status=`UP
REST
Optionally, API requests for the pre-configured APIs (see "API" page) can be made via the REST interface if it is configured to run (see "Configuration") and the DAPs implement the APIs in question (i.e. you are using the kdb Insights Data Access microservcie, or your own custom DAPs with your implementations of the APIs -- see "API" page for details).
Data Access Process
In order to integrate with the GW microservice, a DAP must do the following (all functions are documented below and a sample example is provided):
- Register with the Resource Coordinator. This can be done by opening a connection to the Resource Coordinator (
hopen
) and invoking.sgrc.registerDAP
with its data purview, metadata, schemas and partitions (see Purview, Metadata, Schema and Partitions, respectively). The purview and partitions can be updated by calling.sgrc.updDapStatus
. - On a failed DAP registration or update to status, the Resource Coordinator will call the function
.da.registrationErr
on the DAP with a response header that describes the error. - Execute APIs invoked by the client. The GW asynchronously invokes the following function on the DAP, which must be defined:
.da.execute
- Executes an API.apiName
- symbol - API name.hdr
- dictionary - Request header (see "header" page).args
- dictionary - Named API arguments.
In response to this, the DAP must:
- Send a response header and a response payload to the correct Aggregator by invoking
.sgagg.onPartial
on the host/port specified byhdr.agg
. - Send a message to the Resource Coordinator indicating that it is available to do more work by invoking
.sgrc.onPartial
on the Resource Coordinator with an appropriate response header.
Of particular interest in the request header are the following keys:
pvVer
- long - Purview version (see Purview-and-Routing).refVintage
- long - Reference vintage (see Reference-Vintage).
These values should be used by the DAP to compare its current purview version and reference vintage to the version/vintage that the RC used to make its routing decitions. If these number do not match, a response with reply code VERSION
(13
) can be returned to the RC. This triggers an automatic retry of the request with the updated purview version and reference vintage.
Note
The DAP is expected to respond even in the event of an error. Proper error handling should be implemented in order to guarantee this.
If using the kdb Insights Data Access microservice, all of the above is automatically implemented; the DA only needs to be configured with its purview and the name of the SG that it should connect to (see "Data Access" configuration).
Purview and Routing
The purview describes the portion of the data each individual DAP offers and it is the means by which the SG makes its routing decisions (see also Reference-Vintage). The purview is a dictionary with the following keys:
ver
- long - Purview version number (used for keeping track of purview changes, not routing)startTS
- timestamp - Start time covered by DAP (inclusive).endTS
- timestamp - End time covered by DAP (exclusive).- Any other label values the SG should use in making routing decisions.
DAPs register and update their purviews with the RC using the APIs described below, and the RC makes routing decisions based on request args vs. DAP purviews. If the request args contain the table
key, then table properties are taken into account. The routing logic is summarized in the following table.
isPartitioned | isSharded | routing strategy |
---|---|---|
true |
true |
Route to each distinct purview across time. |
false |
true |
Route to one DAP per distinct purview. |
false |
false |
Route to any one DAP. |
true |
false |
Not supported. |
If the table
is unspecified, then the routing defaults to the isPartitioned=true
and isSharded=true
case.
For example, suppose DAPs are labelled by city
and sensorType
, and the registered DAPs are
dap | city | sensorType | startTS | endTS |
---|---|---|---|---|
dap1 | toronto | gas | -0Wp | 0Wp |
dap2 | toronto | electric | -0Wp | 0Wp |
dap3 | montreal | gas | -0Wp | 2021.06.01D |
dap3 | montreal | gas | 2021.05.01D | 0Wp |
dap4 | montreal | electric | -0Wp | 0Wp |
dap5 | vancouver | gas | -0Wp | 0Wp |
dap6 | vancouver | electric | -0Wp | 0Wp |
-
If we receive a request for a partitioned table, or the table is not specified, with args
`startTS`endTS`city`sensorType!(2021.05.10D;2021.06.15D;`toronto`montreal;`gas)
, then the RC sends the request to all relevant labels and across time as follows:dap args dap1 `startTS`endTS`city`sensorType!(2021.05.10D;2021.06.15D;`toronto;`gas)
dap3 `startTS`endTS`city`sensorType!(2021.05.10D;2021.06.01D;`montreal;`gas)
dap4 `startTS`endTS`city`sensorType!(2021.06.01D;2021.06.15D;`montreal;`gas)
-
If we receive a request for a sharded, non-partitioned table (i.e. a non-timeseries table that is distributed across assemblies), with args
`city`sensorType!(`toronto`montreal;`gas)
, then the RC splits the request across relevant DAPs, but ignores time:dap args dap1 `city`sensorType!`toronto`gas
dap3 or dap4 (but not both) `city`sensorType!`montreal`gas
-
If we receive a request for a non-sharded table (i.e. a table that is identical across all assemblies), with args
`city`sensorType!(`toronto`montreal;`gas)
, then the RC sends to either dap1, dap3, or dap4, but only one of them.
Things to note:
- Any missing purview args not specified default to "all". In the case of time, the defaults are
startTS=-0Wp
andendTS=0Wp
. - Any additional arguments (not related to purview) are sent unchanged to all DAPs participating in the request.
- The SG will not send the same portion of a request to two different DAPs if they overlap.
- Once the SG enlists a DAP to service request, it is marked unavailable until it reports that it has completed (
.sgrc.onPartial
). If no DAPs are available to service all or part of a request, the SG sends out what it can, and enqueues the portions that can't be completed until a DAP registers or updates itself as being able to satisfy (wholly or partially) the outstanding portion of the request. - If different DAPs register with different sets of labels, the SG fills in missing labels with nulls.
- A DAP must register with at least one label.
On the Aggregator
With the exception of the pre-configured APIs (see "API" page), the only supported aggregation function is raze
.
This will be expanded upon in future versions.
Thus, all API results returned to the aggregator should be "raze-able" (i.e. raze(res 0; res 1;...)
).
Reference vintage
Reference vintage is a high watermark of the latest reference data received by a DAP. The RC uses this in making routing decisions Assumptions:
- DAPs with the same purview (assembly labels) have a consistent reference vintage, meaning that equal reference vintages mean both DAPs have exactly the same reference data, and a DAP with a higher reference vintage is more up to date.
- DAPs update on every reference vintage update.
The RC only sends to DAPs that are at the latest reference vintage among all DAPs of the same purview (i.e. those with the same assembly labels). If a request requires a particular DAP that is at a lower reference vintage, the request is queued until the DAP's reference vintage catches up. E.g. suppose we have two DAPs of the same purview:
dap | startTS | endTS | reference vintage |
---|---|---|---|
dap1 | -0Wp | 2021.05.10D | 10 |
dap2 | 2021.05.10D | 0Wp | 11 |
Then request portions from -0Wp
to 2021.05.10D
will be queued until dap1
reaches reference vintage 11
, while dap2
can continue to service request portions from 2021.05.10D
to 0Wp
.
Metadata
DAPs may register with metadata information on the APIs it offers, which is subsequently surfaced through the .kxi.getMeta
API (see "API" page). The metadata returned is described in the getMeta
API.
Partitions
DAPs must register with partition information. Currently, partitioning is assumed to be by date. A DAP's partitions are given as a table with one of the following schemas:
column | type | description |
---|---|---|
date | date | Date of the partition. |
startTS | timestamp | Inclusive start time of data within the partition. |
endTS | timestamp | Exclusive end time of data within the partition. |
OR
column | type | description |
---|---|---|
min_date | date | Inclusive minimum partition date. |
max_date | date | Inclusive maximum partition date. |
Note that in the latter schema, each date partition is assumed to cover the entire date in question.
Discovery
The DAP can use discovery to find the RC it should report to. If the DAP uses discovery, the following sample code can be used to discover the RC:
name:`myRC; / RC's KXI_NAME environment variable
sn:`$"KXI-SG-RC"; / RC service name
select from .com_kx_sd.getServices[]where serviceName=sg,status=`UP,name=metadata@\:`name
APIs
Below are the processes a DAP can connect to and the APIs that it can call.
-
Resource Coordinator
.sgrc.registerDAP
- Registers DAP as an API target to service requests. Takes a single dictionary parameter with the following keys (ALL of which are necessary).addr
- symbol - Host port in the form`:host:1234
.avail
- boolean - Flag indicating if the DAP is available to service requests or not.purview
- dictionary - See Purview.asm
- symbol - Assembly name -- used to identify a family of like DAPs.instance
- symbol - Instance type (e.g. RDB, IDB, HDB).metadata
- table|() - Metadata table (see Metadata) or an empty list to provide no metadata.schema
- table - Schema table (see Schema).prtns
- table - Partition information (see Partitions).refVintage
- long - Long integer representing a high-watermark of the last reference data ingested. Typically stream position, but can be any incrementing value.
Providing no metadata has no ill effects on query execution, however, the DAP/assembly will come up empty on a
.kxi.getMeta
call (see "API" page). Note that it is assumed that all DAPs within the same assembly (asm
) share the same table schemas.-
.sgrc.updDapStatus
- Takes a single dictionary parameter with the following keys (all of which are optional).avail
- boolean - Flag indicating if the DAP is available to service requests or not.purview
- dictionary - See Purview.prtns
- table - See Partitions.refVintage
- long - Long integer representing a high-watermark of the last reference data ingested. Typically stream position, but can be any incrementing value.
-
.sgrc.onPartial
- Informs the RC that we are available for more work following an API request.hdr
- dictionary - Response header (See "header" page). Must contain, at minimum, the following keys:rc
- byte - Response code.ac
- byte - Application code.- All other keys present in the request header.
Additionally, the DAP may use this notification to inform the RC that it was unable to send its response to the Aggregator. This is done by including the following key in the response header:
- sendErr
- any - Value is irrelevant.
If the sendErr
key is present in the response header and hdr.rc
is non-zero, the RC attempts to notify the Aggregator/GW of the failure in order to issue a response to the client.
- Aggregator
.sgagg.onPartial
- Forwards this DAPs partial results to the Aggregator for aggregation. This function should be executed on the process pointed to byagg
in the request header (this request header value is a symbol of the form`:host:port
).hdr
- dictionary - Response header (see "header" page). Must contain, at minimum, the following keys:rc
- short - Response code.ac
- short - Application code.- All other keys present in the request header.
payload
- any - Response payload.
Example
The following is a very simple example of a DAP implementation.
//
// Purview - a DAP covers one possible combination of the SG's labels, and some temporal
// interval. For example, this could be an RDB that covers all of today.
//
// Metadata - Contains metadata on available functions in DAP (optional).
//
// Schema - Contains table/schema info (optional).
//
// Partitions - Partitions in the DAP.
//
purview:`ver`region`assetClass`startTS`endTS!(1;`amer;`equity;"p"$.z.D;0Wp)
params:(0#`)!()
params[`myAPI]:(
`name`type`description`isReq!(`x;-9 9h;"param 1";1b);
`name`type`description`isReq`default!(`y;10h;"param 2";0b;""));
// etc...
metadata:flip
(`fn ,`custom ,`description ,`params ,`return)!flip(
(`myAPI ;1b ;"My API." ;params`myAPI ;`type`description!(98h;"Return value"))
// etc...
columns:(0#`)!();
columns[`trade]:flip
(`column ,`typ ,`attrMem ,`attrIDisk ,`attrDisk ,`isSerialized ,`fk)!flip(
(`sym ;11h ;` ;`g ;`p ;0b ;`instrument.sym);
(`time ;12h ;` ;` ;` ;0b ;`);
(`price ;9h ;` ;` ;` ;0b ;`);
(`size ;7h ;` ;` ;` ;0b ;`);
(`updTime ;12h ;` ;` ;` ;0b ;0b))
// etc...
schema:flip
(`table ,`typ ,`pkCols ,`updTsCol ,`sortColsMem ,`sortColsIDisk ,`sortColsDisk ,`columns)!flip(
(`trade ;`partitioned ;`sym`time ;`updTime ;`$() ;enl`updTime ;`sym`time ;columns`trade);
// etc...
partitions:enlist enlist[`date]!enlist .z.D // We contain data for only today, say
refVintage:0 // To be updated on every reference data update
// Register with the RC.
rc:hopen`:rcHost:1234 // Connect to the RC (alternatively, RC can be found via discovery)
neg[rc](`.sgrc.registerDAP;(!). flip(
(`addr; hsym`$string[.z.h],":",string system"p");
(`avail; 1b);
(`purview; purview);
(`asm; `myAssembly);
(`instance; `RDB);
(`metadata; metadata); // Alternatively () for no metadata
(`schema; schema);
(`prtns; partitions);
(`refVintage; refVintage)));
//
// Purview can be updated. For example, at the start of an EOD, a DAP may wish to
// announce itself as unavailable until the EOD ends. Upon EOD completion, it can
// roll its date (and its version number) forward and announce itself as available
// again.
//
onEodStart:{[]
neg[rc](`.sgrc.updDapStatus;enlist[`avail]!enlist 0b);
}
onEodEnd:{[]
purview[`ver]+:1; / Roll our version number forward
purview[`startTS]+:1D; / Roll the date forward
prtns:enlist enlist[`date]!enlist"d"purview`startTS; / Roll partition date forward
neg[rc](`.sgrc.updDapStatus;`avail`purview`prtns!(1b;`ver`startTS#purview;prtns))
}
//
// API execution. This function gets invoked by the GW on an API request. The DAP is
// obligated to respond to the Agg with its result, and the RC with an update that
// it available for more work. It should do this even in the event of an error.
//
// Note the name and signature of the function are important.
//
.da.execute:{[apiName;hdr;args]
//
// The hdr includes, among other things, `pvVer` and `refVintage, which are
// the purview version and reference vintage numbers the RC used to make its
// selection. If the numbers are out of date, we should NOT proceed as normal,
// since our data may no longer match the data the RC expects us to have. R
// portion of the request that the SG needs from us. We should error or trigger a retry
//
res:$[(purview[`ver]=hdr`pvVer)&refVintage=hdr`refVintage;
@[apiName;args;`execErr]; / Run the API as normal
`pvRvMsimatch]; / Purview versions don't match
//
// The Agg and RC expect a response header with, at minimum:
// - the keys that, and
// - response and application codes (where 0 represents OK/no errors).
//
// Note: Error mapping in the Agg NYI.
//
respHdr:hdr,`rc`ac!
$[res~`execErr;
`rc`ac!10 10h; / General error code
res~`pvRvMismatch;
`rc`ac!13 30h; / Version mismatch error
/ OK
`rc`ac!0 0h]; / OK
//
// Send response to the Agg. It is good practice to check whether the send was
// successful. In the event that the send failed, we can notify the RC who
// can complete the request on our behalf.
//
// Note, the Agg we are to report to is in hdr`agg.
//
sent:.[;(hdr`agg;respHdr;agg);0b]{
h:hopen x; / Open connection to agg
neg[h](`.sgagg.onPartial;y;z); / Send response
hclose h; / Alternatively, cache this for future responses
1b / Success
}
//
// Notify the RC of completion so that it can select us for more work.
// If the send to the Agg failed, we can use this opportunity to inform
// the RC of the failure with the `sendErr` key in the response header.
//
if[not sent;respHdr[`sendErr]:1b]; / Include sendErr (note: value is arbitrary)
neg[rc](`.sgrc.onPartial;respHdr); / Notify RC of completion
}
//
// Implementation of APIs.
//
...