Skip to content

Service Gateway Configuration

In its most basic form, the SG is a set of Docker images that are combined using minimal configuration. Below is an explanation of the images required, what configuration parameters need to be defined, and an some example configurations.

Images

There is one image per process type in the SG. The SG architecture allows for multiple of any process. The table below summarizes the process types

process description image
GW Receives requests and responds to requests. kxi-sg-gw
RC Routes requests to DAPs. kxi-sg-rc
Agg Aggregates responses from DAPs. kxi-sg-agg

In addition, the SG can optionally use KX Insights Discovery in order for processes to discover and connect with each other seamlessly (see "Discovery"). Images required are as follows.

process description image
sidecar Discovery sidecar (for the RC). kxi_sidecar
discovery Discovery client. Configure one, which all processes seamlessly connect to. kxi-eureka-discovery
proxy Discovery proxy. discovery_proxy

Environment variables

The GW microservice relies on certain environment variables to be defined in the containers. The variables are described below.

variable required containers description
KXI_NAME Yes RC, Agg Unique Process name.
KXI_PORT Yes RC, Agg Port.
KXI_LOG_FORMAT No RC, Agg, sidecar Message format (see "Logging" page).
KXI_LOG_DEST No RC, Agg, sidecar Endpoints (see "Logging" page).
KXI_LOG_LEVELS No all Component routing (see "Logging" page).
DISCOVERY_PROXY No GW Discovery proxy address (not required if not using discovery).
GATEWAY_QIPC_PORT Yes GW Gateway port for QIPC traffic.
GATEWAY_HTTP_PORT Yes GW Gateway port for HTTP traffic.
KXI_CONFIG_FILE Yes sidecar Discovery configuration file (see "Discovery" page).
KXI_CUSTOM_FILE No Agg File containing custom code to load into Agg process.
KXI_GC_FREQ No RC, Agg Frequency in milliseconds to run garbage collect in a timer (default 600000, set to 0 to disable).
KXI_SG_TIMEOUT No RC, Agg Default request timeout in milliseconds (default is 30,000).
KXI_SG_DISC No RC Multi-RC discovery mode (see Multi-RC.
KXI_SG_RC_ADDR Yes GW, Agg host:port of RC process to connect to.
KXI_SG_CONN_TIMEOUT No RC, Agg Timeout on connection open.
KDB_LICENSE_B64 Yes all KDB license.
KXI_SG_BUFFER_INITIAL No GW Initial size in bytes to allocate per connected aggregator. Default 2MB
KXI_SG_BUFFER_RETAIN No GW Maximum size in bytes to hold on to per connected aggregator. Default 2MB
KXI_AUTH_DISABLED No GW Set KXI_AUTH_DISABLED=0 to use the gateway with Keycloak.
KXI_OIDC_JSON_PATH No GW OIDC JSON used for mapping Keycloak authentication endpoints.
KXI_SG_MAX_RETRY No RC Maximum number of retries the RC is willing to do for retryable errors.
KXI_ALLOWED_SBX_APIS No RC Comma-delimited list of sandbox APIs to allow in non-sandbox RCs (ex: ".kxi.sql,.kxi.qsql").
KXI_SG_REQ_DEL_FREQ No RC, Agg Time in milliseconds to run the request table delete routine (default is 10,000).
KXI_SG_QUEUE_DEL_FREQ No RC Time in milliseconds to run the queue table delete routine (default is 10,000).
KXI_SAPI_HB_FREQ No RC, Agg Time in milliseconds to run the heartbeat to connected processes (default is 30,000).
KXI_SAPI_HB_TOL No RC, Agg Number of heartbeat intervals a process can miss before being disconnected (default is 2).
KXI_SG_MULTI_RC_TO_MAX_WAIT No RC On a timeout of a multi-RC request, number of 10-second iterations to wait for timeout details from all RCs (default 1).

See example section below.

Multi RC

Multiple RCs can work together spread request load. The means by which RCs discover each other is specified by the KXI_SG_DISC environment variable. Currently supported discovery methods are

  • kubernetes

    RCs discover each other via the Kubernetes control plane (this discovery method only works if using Kubernetes). This mode allows for dynamic scalability, but at requires polling the control plane to discover new RCs as they come up. The following label must be added to the pod's metadata.labels in order for RC containers to properly identify each other:

    kind: Pod
    metadata:
      labels:
        app.kubernetes.io/name: "resource-coordinator" # Required label
    ...
    spec:
      containers:
      - ...
        ports:
        - name: arbitrary-name
          containerPort: 5050 # Required port
          protocol: TCP
    

    Note that the resource coordinator must be the last container in the pod other than the optional sidecar.

    Note also that the RC pod(s)'s Service Account need the necessary privileges to call the Kubernetes control plane API. This can be achieved by running the following command:

    kubectl create rolebinding default-view --clusterrole=view --serviceaccount=$NAMESPACE:default --namespace=$NAMESPACE
    

    where $NAMESPACE is your cluster's namespace.

Kubernetes secrets

To setup image pull secrets, and license secrets:

Create a new secret named kx-repo-access.

kubectl create secret docker-registry kx-repo-access \
    --docker-username=<username> \
    --docker-password=<password> \
    --docker-server=registry.dl.kx.com

Create and apply new secret named kx-license-info.

$ cat kx-license-secret.yaml
apiVersion: v1
kind: Secret
metadata:
  name: kx-license-info
type: Opaque
stringData:
  license: <base64 license string>

$ kubectl apply -f kx-license-secret.yaml

Custom file

The default aggregation for any API is raze. However, the Agg processes will load the q file pointed to by the KXI_CUSTOM_FILE environment variable. In this file, you can:

  • define/register custom aggregation functions,
  • define which aggregation functions to invoke for which APIs,
  • define metadata pertaining to the aggregation function (which is subsequently query-able using the .kxi.getMeta API, see API page for details).

Note that while SG only supports loading a single file, you can load other files from within this file using \l (allowing you to control load order). The current working directory (pwd) at load time is the base directory of the file.

This can be combined with the Data Access microservice (which allows custom function definitions in the DAPs) to create full custom API support within KX Insights (see "Data Access" for details).

Note: It's recommended to avoid .sg* and .sapi namespaces to avoid colliding with SG functions.

Aggregation functions can be anything. Their input is a list of the output of the corresponding API defined in the DAPs. The output is a double consisting of a response header (dictionary) and response payload (anything). See header for details on response headers.

To define which aggregation function to use for an API, use the .sgagg.registerAggFn API, whose signature is as follows.

  • aggFn - symbol - Aggregation function name.
  • metadata - list|string|dictionary - Aggregation function metadata (see "SAPI - Metadata" documentation).
  • apis - symbol|symbol[] - API(s) for which this should be the default aggregation function.

Aggregation function MUST be registered with .sgagg.registerAggFn in order to be invoke-able by the Aggregator. See Custom file example below for an example.

Deferring responses

In some situations, it may be necessary to defer a response, e.g. data is incomplete and we need to retry. In such situations, the aggregation function can

  1. Save partial results to the SAPI context (see below).
  2. Returns a .sapi.defer response (see header).
  3. Recover saved context values on the response.
  4. Continue aggregation.

An example is provided below (Custom file example).

The SAPI context APIs are as follows:

  • .sapi.getCtx - Gets stored context values.

    • Parameters:
      • k - symbol|symbol[]|:: Key(s) to get, or null for the entire set of key value pairs.
    • Output:
      • (any|dictionary) Value(s) or full context dictionary.
  • .sapi.setCtx - Sets context values.

    • Parameters:
      • k - symbol|symbol[] Key(s) to set.
      • v - any Value(s) to set.
  • .sapi.addToCtx - Adds to an existing context list/table.

    • Parameters:
      • k - symbol|symbol[] Key(s) to add to.
      • v - any Value(s) to add.

Example

See the deployment example for an example of running in Docker.

RC discovery sidecar

Config file (see "Discovery" for more details):

{
    "connection": ":sgrc:5050",
    "frequencySecs": 5,
    "discovery":
    {
        "registry": ":proxy:4000",
        "adaptor": "discEurekaAdaptor.q",
        "heartbeatSecs": 30,
        "leaseExpirySecs": 90
    }
}

Discovery proxy config

Proxy config file (see "Discovery" for more details):

{
    "discovery":
    {
        "registry": ":eureka:8761",
        "adaptor": "discEurekaAdaptor.q"
    }
}

Custom file example

The Agg process can load a custom code file, wherein you can define custom aggregation functions and define API to aggregation function mappings for APIs. Below is an example file, as some example API calls that exercise the custom code. Note that aggregations functions must return a response header in addition to the response payload. Here we use the .sapi response functions (see header for details).

// Sample Agg custom file.

// Can load other files within this file. Note that the current directory
// is the directory of this file (in this example: /opt/kx/custom).
\l subFolder/otherFile1.q
\l subFolder/otherFile2.q

//
// @desc An override to the default ping aggregation function. Instead of doing a raze,
// we just take the min (so true indicates all targets successful).
//
// @param res   {boolean[]}             Results from the DAPs.
//
// @return      {(dictionary;boolean)}  Response header + min of all DAP results.
//
pingAggOverride:{[res]
    .sapi.ok min res
    }


//
// @desc Agg function that does a plus join on a list of tables.
//
// @param tbls  {table[]}               List plus-joinable tables.
//
// @return      {(dictionary;table)}    Response header + plus join of tables.
//
pjAgg:{[tbls]
    .sapi.ok(pj/)tbls
    }


//
// @desc Agg function that does an average daily count by sym.
//
// @param tbls  {table[]}               List of tables with ``` `sym`date`cnt``` columns.
//
// @return      {(dictionary;table)}    Response header + average count by sym.
//
avAgg:{[tbls]
    res:select sum cnt by sym,date from raze 0!'tbls; / Join common dates
    .sapi.ok select avg cnt by sym from res / Average
    }


//
// @desc Aggregates trade data, but defers if not enough responses.
//
// @param data  {list[]}                    Partial responses from `.kxi.getData`.
//
// @return      {dictionary;table|list)}    Response header and payload or sub-request if deferring.
//
aggMinTrade:{[data]
    t:.sgagg.getData data; / Execute normal getData aggregation
    hdr:first t; / Response header
    tbl:last t; / Table data

    if[.RC.OK<>first[t]`rc; / If the response is no good
        :.sapi.response[()!();(hdr`rc;hdr`ac;"Failed to aggregate getData: ",hdr`ai);()]]; / Fail

    if[100<count tbl; / If we have enough data
        :.sapi.ok tbl]; / Succeed

    //
    // Not enough data.
    //
    .sapi.setCtx[`prevData;tbl]; / Store existing values for later
    .sapi.defer[`.kxi.getData;`table`startTS`agg!(`trade;1+max tbl`time;cols tbl);`.resume.aggMinTrade;()!()] / Defer
    }


//
// @desc Resume function for `aggMinTrade`.
//
// @param data  {list[]}                    Partial responses from `.kxi.getData`.
//
// @return      {dictionary;table|list)}    Response header and payload or sub-request if re-deferring.
//
.resume.aggMinTrade:{[data]
    t:.sgagg.getData data; / Execute normal getData aggregation
    hdr:first t; / Response header
    tbl:last t; / Table data

    if[.RC.OK<>first[t]`rc; / If the response is no good
        :.sapi.response[()!();(hdr`rc;hdr`ac;"Failed to aggregate getData: ",hdr`ai);()]]; / Fail

    prevData:.sapi.getCtx`tradeData; / Recover previous data

    if[100<count[prevData]+count tbl; / Now if we have enough data
        :sapi.ok prevData,tbl] / Succeed

    //
    // Still not enough data.
    //
    .sapi.addToCtx[`prevData;tbl]; / Accumulate
    .sapi.defer[`.kxi.getData;`table`startTS`agg!(`trade;1+max tbl`time;cols tbl);`resume.aggMinTrade;()!()] / Redefer
    }


//
// In order to be usable, aggregation functions MUST be registered with the Agg process. When registering,
// one can also set the aggregation function as the default aggregation function for one or more APIs.
// For example, Suppose we had an API defined in the DAPs that peforms a "count by" operation on a table:
//
// countBy:{[table;startTS;endTS;byCols]
//     ?[table;enlist(within;`realTime;(startTS;endTS-1));{x!x,:()}byCols;enlist[`cnt]!enlist(count;`i)]
//     }
//
// We can then register our aggregations functions thusly:
//
.sgagg.registerAggFn[`pingAggOverride;
    .sapi.metaDescription["Custom override to .kxi.ping"],
    .sapi.metaParam[`name`type`description!(`res;1h;"List of booleans indicating ping was successful")],
    .sapi.metaReturn`type`description!(-1h;"The worst of all results"];
    `$()
    ]

.sgagg.registerAggFn[`pjAgg;
    .sapi.metaDescription["Plus join aggregation"],
    .sapi.metaParam[`name`type`description!(`tbls;0h;"Tables received from DAPs")],
    .sapi.metaReturn`type`description!(98h;"The plus join (over) of the tables");
    `countBy // Register as default aggregation function for this API
    ]

.sgagg.registerAggFn[`avAgg;
    .sapi.metaDescription["Average join aggregation"],
    .sapi.metaParam[`name`type`description!(`tbls;0h;"Tables received from DAPs")],
    .sapi.metaReturn`type`description!(98h;"The average join (over) of the tables");
    `$()
    ]

.sgagg.registerAggFn[`custom;
    .sapi.metaDescription["Gets more than 100 trade data rows"],
    .sapi.metaParam[`name`type`description!(`data;0h;"getData response")],
    .sapi.metaReturn`type`description(98h;"100+ rows of trade data");
    `$()
    ]


//
// Note also that an aggregation function can be default aggregation function for multiple APIs. E.g.
//  .sgagg.registerAggFn[`myAggFn;();`api1`api2`api3]
//

Calls to the GW can now reference these new agg functions. Note that agg function mappings can always be overridden with the aggFn key in the request header.

q)h:hopen`:gwHost:5678 // Connect to GW

// Call ping without specifying the agg function. Defaults to raze.
q)last h(`.kxi.ping;`region`assetClass`startTS`endTS!(`amer`emea`apac;`equity;-0Wp;0Wp);`;(0#`)!())
111b

// Call with an override; we'll now just get the min value.
q)last h(`.kxi.ping;`region`assetClass`startTS`endTS!(`amer`emea`apac;`equity;-0Wp;0Wp);`;``aggFn!("";`pingAggOverride))
1b

// Call to our countBy API without specifying the aggregation function. Defaults to `pjAgg` as specified
// in the Agg's mapping.
q)last h(`countBy;`region`assetClass`startTS`endTS`table`byCols!(`amer;`equity;-0Wp;0Wp;`trade;`sym);(0#`)!())
sym    | cnt
-------| ------
AAPL.N | 103212
MSFT.OQ| 100213
..

// Call to our countBy API and specify an aggFn override.
q)last h(`countBy;`region`assetClass`startTS`endTS`table`byCols!(`amer;`equity;-0Wp;0Wp;`trade;`sym`date);``aggFn!("";`avAgg))
sym    | cnt
-------| ------
AAPL.N | 344.5
MSFT.OQ| 301.22
..

Tuning the log level

The kxi-sg-gw log level can be configured with `KXI_LOG_LEVELS as above.

At runtime, the log level can be dynamically adjusted RESTfully by POSTing to the /log endpoint.

curl -X POST "https://${HOSTNAME}/log?level=debug

Garbage Collection

To configure garbage collection for the aggregator and resource-coordinator, set the garbage collection flag.

The default garbage collection mode is deferred.

For the service-gateway, message buffers are allocated per connected aggregator.

Upon connection, each thread is allocated KXI_SG_BUFFER_INITIAL bytes. Message buffers will expand to fit any response up to the 2GB limit. After returning a response, the buffers will shrink back to KXI_SG_BUFFER_RETAIN bytes.

By default the KXI_SG_BUFFER_INITIAL and the KXI_SG_BUFFER_RETAIN are 2MB.