Skip to content

Service Gateway Configuration

In its most basic form, the SG is a set of Docker images that are combined using minimal configuration. Below is an explanation of the images required, what configuration parameters need to be defined, and an some example configurations.

Images

There is one image per process type in the SG. The SG architecture allows for multiple of any process. The table below summarizes the process types:

process description image
GW Receives requests and responds to requests. kxi-sg-gw
RC Routes requests to DAPs. kxi-sg-rc
Agg Aggregates responses from DAPs. kxi-sg-agg

In addition, the SG can optionally use kdb Insights Discovery in order for processes to discover and connect with each other seamlessly (see "Discovery"). Images required are as follows:

process description image
sidecar Discovery sidecar (for the RC). kxi_sidecar
discovery Discovery client. Configure one, which all processes seamlessly connect to. kxi-eureka-discovery
proxy Discovery proxy. discovery_proxy

Environment variables

The GW relies on certain environment variables to be defined in the containers. The variables are described below:

variable required containers description
KXI_NAME Yes RC, Agg Unique Process name.
KXI_PORT Yes RC, Agg Port.
KXI_LOG_FORMAT No RC, Agg, sidecar Message format (see "Logging" page).
KXI_LOG_DEST No RC, Agg, sidecar Endpoints (see "Logging" page).
KXI_LOG_LEVELS No all Component routing (see "Logging" page).
DISCOVERY_PROXY No GW Discovery proxy address (not required if not using discovery).
GATEWAY_QIPC_PORT Yes GW Gateway port for QIPC traffic.
GATEWAY_HTTP_PORT Yes GW Gateway port for HTTP traffic.
KXI_CONFIG_FILE Yes sidecar Discovery configuration file (see "Discovery" page).
KXI_CUSTOM_FILE No Agg File containing custom code to load into Agg process.
KXI_GC_FREQ No RC, Agg Frequency in milliseconds to run garbage collect in a timer (default 600000, set to 0 to disable).
KXI_SG_TIMEOUT No GW, RC, Agg Default request timeout in milliseconds (default is 30,000).
KXI_SG_DISC No RC Multi-RC discovery mode.
KXI_SG_RC_ADDR Yes GW, Agg host:port of RC process to connect to.
KXI_SG_CONN_TIMEOUT No RC, Agg Timeout on connection open.
KDB_LICENSE_B64 Yes all KDB license.
KXI_RC_SERVICE_NAME No GW The key that the gateway will look for to find Resource Coordinators within the discovery registry. Default is KXI-SG-RC.
KXI_SG_RC_LABEL_SELECTOR No GW Allow the gateway to find Resource Coordinators by label selector. Required for Kubernetes-based discovery.
KXI_SG_DA_LABEL_SELECTOR No GW Allow the gateway to find data-access pods by label selector.
KXI_SG_BUFFER_INITIAL No GW Initial size in bytes to allocate per connected aggregator. Default 2MB
KXI_SG_BUFFER_RETAIN No GW Maximum size in bytes to hold on to per connected aggregator. Default 2MB
KXI_AUTH_DISABLED No GW Set KXI_AUTH_DISABLED=0 to use the gateway with Keycloak.
KXI_OIDC_JSON_PATH No GW OIDC JSON used for mapping Keycloak authentication endpoints.
KXI_SG_MAX_RETRY No RC Maximum number of retries the RC is willing to do for retryable errors.
KXI_ALLOWED_SBX_APIS No RC Comma-delimited list of sandbox APIs to allow in non-sandbox RCs (ex: ".kxi.sql,.kxi.qsql").
KXI_SG_REQ_DEL_FREQ No RC, Agg Time in milliseconds to run the request table delete routine (default is 10,000).
KXI_SG_QUEUE_DEL_FREQ No RC Time in milliseconds to run the queue table delete routine (default is 10,000).
KXI_SAPI_HB_FREQ No RC, Agg Time in milliseconds to run the heartbeat to connected processes (default is 30,000).
KXI_SAPI_HB_TOL No RC, Agg Number of heartbeat intervals a process can miss before being disconnected (default is 2).
KXI_SG_MULTI_RC_TO_MAX_WAIT No RC On a timeout of a multi-RC request, number of 10-second iterations to wait for timeout details from all RCs (default 1).
KXI_ENABLE_FLUSH No RC, Agg Set to "true" to enable async flush on messages from RC to DA/Agg and from Agg to GW (default "false").

Service Discovery

Kubernetes RBAC

For microservices being deployed in Kubernetes, the RC and SG pods should be configured with permissions to get, list, and watch pods.

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: insights-sg-gateway
  labels:
    app.kubernetes.io/name: sg-gateway
    insights.kx.com/serviceName: insights-sg-gateway
rules:
- apiGroups: [""]
  resources: ["pods"]
  verbs: ["get", "watch", "list"]

A rolebinding should be used to tie the role to a service account.

Service Gateway

The Service Gateway will detect Resource Coordinators with a hierarchy of options:

  • Look for KXI_SG_RC_ADDR
  • This option is used to prototype and hardcode an RC host and port
  • Failing that, looks for KXI_SG_RC_LABEL_SELECTOR and will use the Kubernetes control plane.
  • Requires RBAC for get, watch and list pods.
  • Additionally, KXI_DA_LABEL_SELECTOR should be set to DA labels, to enable qsql functionality.
  • This is the only option that supports multiple resource coordinators
  • Look for DISCOVERY_PROXY
  • This method will use the kdb Insights discovery registry to look for KXI-SG-RC entries
  • Usable outside of Kubernetes

Eureka

With DISCOVERY_PROXY, the service gateway will connect to the first service registered as KXI-SG-RC (configurable with KXI_RC_SERVICE_NAME). If the connection is ever dropped, the gateway will re-read from the registry and reconnect.

Kubernetes Control Plane

KXI_SG_RC_LABEL_SELECTOR should be set to a distinct label and value pair that is only set on RC pods. For example: app.kubernetes.io/name=resource-coordinator. Note the use of the equal sign to denote the value.

The label selector can be a complex filter for equalities and inequalities. Refer to Kuberentes labels and selectors for the syntax. For example, environment=production,app.kubernetes.io/name=resource-coordinator would filter for Resource Coordinators that also have the label environment: production.

Multiple RCs

Only the Kubernetes control plane discovery method supports multiple RCs.

Resource Coordinator

kdb Insights Service Discovery

A discovery JSON configuration file should be provided to the sidecar container. See our example on discovery proxy configuration.

Kubernetes Control Plane

Multiple RCs can work together spread request load. RCs may discover each other by setting KXI_SG_DISC=kubernetes.

This setting allows for dynamic scalability, but requires polling the control plane to discover new RCs as they come up. app.kubernetes.io/name: resource-coordinator must added to the RC pod's metadata.labels in order for RC containers to properly identify each other.

kind: Pod
metadata:
  labels:
    app.kubernetes.io/name: "resource-coordinator" # Required label
...
spec:
  containers:
  - ...
    ports:
    - name: arbitrary-name
      containerPort: 5050 # Required port
      protocol: TCP

Note that the Resource Coordinator must be the last container in the pod other than the optional sidecar.

Kubernetes secrets

To setup image pull secrets, and license secrets:

Create a new secret named kx-repo-access.

kubectl create secret docker-registry kx-repo-access \
    --docker-username=<username> \
    --docker-password=<password> \
    --docker-server=registry.dl.kx.com

Create and apply new secret named kx-license-info.

$ cat kx-license-secret.yaml
apiVersion: v1
kind: Secret
metadata:
  name: kx-license-info
type: Opaque
stringData:
  license: <base64 license string>

$ kubectl apply -f kx-license-secret.yaml

Custom file

The default aggregation for any API is raze. However, the Agg processes will load the q file pointed to by the KXI_CUSTOM_FILE environment variable. In this file, you can:

  • define/register custom aggregation functions,
  • define which aggregation functions to invoke for which APIs,
  • define metadata pertaining to the aggregation function (which is subsequently query-able using the .kxi.getMeta API, see API page for details).

Note that while SG only supports loading a single file, you can load other files from within this file using \l (allowing you to control load order). The current working directory (pwd) at load time is the base directory of the file.

This can be combined with the Data Access microservice (which allows custom function definitions in the DAPs) to create full custom API support within kdb Insights (see "Data Access" for details).

Note: It's recommended to avoid .sg* and .sapi namespaces to avoid colliding with SG functions.

Aggregation functions can be anything. Their input is a list of the output of the corresponding API defined in the DAPs. The output is a double consisting of a response header (dictionary) and response payload (anything). See header for details on response headers.

To define which aggregation function to use for an API, use the .sgagg.registerAggFn API, whose signature is as follows.

  • aggFn - symbol - Aggregation function name.
  • metadata - list|string|dictionary - Aggregation function metadata (see "SAPI - Metadata" documentation).
  • apis - symbol|symbol[] - API(s) for which this should be the default aggregation function.

Aggregation function MUST be registered with .sgagg.registerAggFn in order to be invoke-able by the Aggregator. See Custom file example below for an example.

Deferring responses

In some situations, it may be necessary to defer a response, e.g. data is incomplete and we need to retry. In such situations, the aggregation function can:

  1. Save partial results to the SAPI context (see below).
  2. Returns a .sapi.defer response (see header).
  3. Recover saved context values on the response.
  4. Continue aggregation.

An example is provided below (Custom file example).

The SAPI context APIs are as follows:

  • .sapi.getCtx - Gets stored context values.

    • Parameters:
      • k - symbol|symbol[]|:: Key(s) to get, or null for the entire set of key value pairs.
    • Output:
      • (any|dictionary) Value(s) or full context dictionary.
  • .sapi.setCtx - Sets context values.

    • Parameters:
      • k - symbol|symbol[] Key(s) to set.
      • v - any Value(s) to set.
  • .sapi.addToCtx - Adds to an existing context list/table.

    • Parameters:
      • k - symbol|symbol[] Key(s) to add to.
      • v - any Value(s) to add.

Example

See the deployment example for an example of running in Docker.

RC discovery sidecar

Config file (see Discovery Configuration for more details):

{
    "connection": ":sgrc:5050",
    "frequencySecs": 5,
    "discovery":
    {
        "registry": ":proxy:4000",
        "adaptor": "discEurekaAdaptor.q",
        "heartbeatSecs": 30,
        "leaseExpirySecs": 90
    }
}

Discovery proxy config

Proxy config file (see Discovery Configuration for more details):

{
    "discovery":
    {
        "registry": ":eureka:8761",
        "adaptor": "discEurekaAdaptor.q"
    }
}

Custom file example

The Agg process can load a custom code file, wherein you can define custom aggregation functions and define API to aggregation function mappings for APIs. Below is an example file, as some example API calls that exercise the custom code. Note that aggregations functions must return a response header in addition to the response payload. Here we use the .sapi response functions (see header for details).

// Sample Agg custom file.

// Can load other files within this file. Note that the current directory
// is the directory of this file (in this example: /opt/kx/custom).
\l subFolder/otherFile1.q
\l subFolder/otherFile2.q

//
// @desc An override to the default ping aggregation function. Instead of doing a raze,
// we just take the min (so true indicates all targets successful).
//
// @param res   {boolean[]}             Results from the DAPs.
//
// @return      {(dictionary;boolean)}  Response header + min of all DAP results.
//
pingAggOverride:{[res]
    .sapi.ok min res
    }


//
// @desc Agg function that does a plus join on a list of tables.
//
// @param tbls  {table[]}               List plus-joinable tables.
//
// @return      {(dictionary;table)}    Response header + plus join of tables.
//
pjAgg:{[tbls]
    .sapi.ok(pj/)tbls
    }


//
// @desc Agg function that does an average daily count by sym.
//
// @param tbls  {table[]}               List of tables with ``` `sym`date`cnt``` columns.
//
// @return      {(dictionary;table)}    Response header + average count by sym.
//
avAgg:{[tbls]
    res:select sum cnt by sym,date from raze 0!'tbls; / Join common dates
    .sapi.ok select avg cnt by sym from res / Average
    }


//
// @desc Aggregates trade data, but defers if not enough responses.
//
// @param data  {list[]}                    Partial responses from `.kxi.getData`.
//
// @return      {dictionary;table|list)}    Response header and payload or sub-request if deferring.
//
aggMinTrade:{[data]
    t:.sgagg.getData data; / Execute normal getData aggregation
    hdr:first t; / Response header
    tbl:last t; / Table data

    if[.RC.OK<>first[t]`rc; / If the response is no good
        :.sapi.response[()!();(hdr`rc;hdr`ac;"Failed to aggregate getData: ",hdr`ai);()]]; / Fail

    if[100<count tbl; / If we have enough data
        :.sapi.ok tbl]; / Succeed

    //
    // Not enough data.
    //
    .sapi.setCtx[`prevData;tbl]; / Store existing values for later
    .sapi.defer[`.kxi.getData;`table`startTS`agg!(`trade;1+max tbl`time;cols tbl);`.resume.aggMinTrade;()!()] / Defer
    }


//
// @desc Resume function for `aggMinTrade`.
//
// @param data  {list[]}                    Partial responses from `.kxi.getData`.
//
// @return      {dictionary;table|list)}    Response header and payload or sub-request if re-deferring.
//
.resume.aggMinTrade:{[data]
    t:.sgagg.getData data; / Execute normal getData aggregation
    hdr:first t; / Response header
    tbl:last t; / Table data

    if[.RC.OK<>first[t]`rc; / If the response is no good
        :.sapi.response[()!();(hdr`rc;hdr`ac;"Failed to aggregate getData: ",hdr`ai);()]]; / Fail

    prevData:.sapi.getCtx`tradeData; / Recover previous data

    if[100<count[prevData]+count tbl; / Now if we have enough data
        :sapi.ok prevData,tbl] / Succeed

    //
    // Still not enough data.
    //
    .sapi.addToCtx[`prevData;tbl]; / Accumulate
    .sapi.defer[`.kxi.getData;`table`startTS`agg!(`trade;1+max tbl`time;cols tbl);`resume.aggMinTrade;()!()] / Redefer
    }


//
// In order to be usable, aggregation functions MUST be registered with the Agg process. When registering,
// one can also set the aggregation function as the default aggregation function for one or more APIs.
// For example, Suppose we had an API defined in the DAPs that peforms a "count by" operation on a table:
//
// countBy:{[table;startTS;endTS;byCols]
//     ?[table;enlist(within;`realTime;(startTS;endTS-1));{x!x,:()}byCols;enlist[`cnt]!enlist(count;`i)]
//     }
//
// We can then register our aggregations functions thusly:
//
.sgagg.registerAggFn[`pingAggOverride;
    .sapi.metaDescription["Custom override to .kxi.ping"],
    .sapi.metaParam[`name`type`description!(`res;1h;"List of booleans indicating ping was successful")],
    .sapi.metaReturn`type`description!(-1h;"The worst of all results"];
    `$()
    ]

.sgagg.registerAggFn[`pjAgg;
    .sapi.metaDescription["Plus join aggregation"],
    .sapi.metaParam[`name`type`description!(`tbls;0h;"Tables received from DAPs")],
    .sapi.metaReturn`type`description!(98h;"The plus join (over) of the tables");
    `countBy // Register as default aggregation function for this API
    ]

.sgagg.registerAggFn[`avAgg;
    .sapi.metaDescription["Average join aggregation"],
    .sapi.metaParam[`name`type`description!(`tbls;0h;"Tables received from DAPs")],
    .sapi.metaReturn`type`description!(98h;"The average join (over) of the tables");
    `$()
    ]

.sgagg.registerAggFn[`custom;
    .sapi.metaDescription["Gets more than 100 trade data rows"],
    .sapi.metaParam[`name`type`description!(`data;0h;"getData response")],
    .sapi.metaReturn`type`description(98h;"100+ rows of trade data");
    `$()
    ]


//
// Note also that an aggregation function can be default aggregation function for multiple APIs. E.g.
//  .sgagg.registerAggFn[`myAggFn;();`api1`api2`api3]
//

Calls to the GW can now reference these new agg functions. Note that agg function mappings can always be overridden with the aggFn key in the request header.

q)h:hopen`:gwHost:5678 // Connect to GW

// Call ping without specifying the agg function. Defaults to raze.
q)last h(`.kxi.ping;`region`assetClass`startTS`endTS!(`amer`emea`apac;`equity;-0Wp;0Wp);`;(0#`)!())
111b

// Call with an override; we'll now just get the min value.
q)last h(`.kxi.ping;`region`assetClass`startTS`endTS!(`amer`emea`apac;`equity;-0Wp;0Wp);`;``aggFn!("";`pingAggOverride))
1b

// Call to our countBy API without specifying the aggregation function. Defaults to `pjAgg` as specified
// in the Agg's mapping.
q)last h(`countBy;`region`assetClass`startTS`endTS`table`byCols!(`amer;`equity;-0Wp;0Wp;`trade;`sym);(0#`)!())
sym    | cnt
-------| ------
AAPL.N | 103212
MSFT.OQ| 100213
..

// Call to our countBy API and specify an aggFn override.
q)last h(`countBy;`region`assetClass`startTS`endTS`table`byCols!(`amer;`equity;-0Wp;0Wp;`trade;`sym`date);``aggFn!("";`avAgg))
sym    | cnt
-------| ------
AAPL.N | 344.5
MSFT.OQ| 301.22
..

Tuning the log level

The kxi-sg-gw log level can be configured with `KXI_LOG_LEVELS as above.

At runtime, the log level can be dynamically adjusted RESTfully by POSTing to the /log endpoint.

curl -X POST "https://${HOSTNAME}/log?level=debug

Garbage Collection

To configure garbage collection for the aggregator and resource-coordinator, set the garbage collection flag.

The default garbage collection mode is deferred.

For the service-gateway, message buffers are allocated per connected aggregator.

Upon connection, each thread is allocated KXI_SG_BUFFER_INITIAL bytes. Message buffers will expand to fit any response up to the 2GB limit. After returning a response, the buffers will shrink back to KXI_SG_BUFFER_RETAIN bytes.

By default the KXI_SG_BUFFER_INITIAL and the KXI_SG_BUFFER_RETAIN are 2MB.