Service Gateway Configuration
In its most basic form, the SG is a set of Docker images that are combined using minimal configuration. Below is an explanation of the images required, what configuration parameters need to be defined, and an some example configurations.
Images
There is one image per process type in the SG. The SG architecture allows for multiple of any process. The table below summarizes the process types
process | description | image |
---|---|---|
GW | Receives requests and responds to requests. | kxi-sg-gw |
RC | Routes requests to DAPs. | kxi-sg-rc |
Agg | Aggregates responses from DAPs. | kxi-sg-agg |
In addition, the SG can optionally use KX Insights Discovery in order for processes to discover and connect with each other seamlessly (see "Discovery"). Images required are as follows.
process | description | image |
---|---|---|
sidecar | Discovery sidecar (for the RC). | kxi_sidecar |
discovery | Discovery client. Configure one, which all processes seamlessly connect to. | kxi-eureka-discovery |
proxy | Discovery proxy. | discovery_proxy |
Environment variables
The GW microservice relies on certain environment variables to be defined in the containers. The variables are described below.
variable | required | containers | description |
---|---|---|---|
KXI_NAME | Yes | RC, Agg | Unique Process name. |
KXI_PORT | Yes | RC, Agg | Port. |
KXI_LOG_FORMAT | No | RC, Agg, sidecar | Message format (see "Logging" page). |
KXI_LOG_DEST | No | RC, Agg, sidecar | Endpoints (see "Logging" page). |
KXI_LOG_LEVELS | No | all | Component routing (see "Logging" page). |
DISCOVERY_PROXY | No | GW | Discovery proxy address (not required if not using discovery). |
GATEWAY_QIPC_PORT | Yes | GW | Gateway port for QIPC traffic. |
GATEWAY_HTTP_PORT | Yes | GW | Gateway port for HTTP traffic. |
KXI_CONFIG_FILE | Yes | sidecar | Discovery configuration file (see "Discovery" page). |
KXI_CUSTOM_FILE | No | Agg | File containing custom code to load into Agg process. |
KXI_GC_AFTER_AGG | No | Agg | Whether to garbage collect after aggregation or not (default true ). |
KXI_SG_TIMEOUT | No | RC, Agg | Default request timeout in milliseconds (default is 30,000 ). |
KXI_SG_DISC | No | RC | Multi-RC discovery mode (see Multi-RC. |
KXI_SG_RC_ADDR | Yes | GW, Agg | host:port of RC process to connect to. |
KXI_SG_CONN_TIMEOUT | No | RC, Agg | Timeout on connection open. |
KDB_LICENSE_B64 | Yes | all | KDB license. |
KXI_SG_BUFFER_INITIAL | No | GW | Initial size in bytes to allocate per connected aggregator. Default 2MB |
KXI_SG_BUFFER_RETAIN | No | GW | Maximum size in bytes to hold on to per connected aggregator. Default 2MB |
KXI_AUTH_DISABLED | No | GW | Set KXI_AUTH_DISABLED=0 to use the gateway with Keycloak. |
KXI_OIDC_JSON_PATH | No | GW | OIDC JSON used for mapping Keycloak authentication endpoints. |
KXI_SG_MAX_RETRY | No | RC | Maximum number of retries the RC is willing to do for retryable errors. |
KXI_ALLOWED_SBX_APIS | No | RC | Comma-delimited list of sandbox APIs to allow in non-sandbox RCs (ex: ".kxi.sql,.kxi.qsql"). |
See example section below.
Multi RC
Multiple RCs can work together spread request load. The means by which RCs discover each other is specified by the KXI_SG_DISC
environment variable. Currently supported discovery methods are
-
kubernetes
RCs discover each other via the Kubernetes control plane (this discovery method only works if using Kubernetes). This mode allows for dynamic scalability, but at requires polling the control plane to discover new RCs as they come up. The following label must be added to the pod's
metadata.labels
in order for RC containers to properly identify each other:kind: Pod metadata: labels: app.kubernetes.io/name: "resource-coordinator" # Required label ... spec: containers: - ... ports: - name: arbitrary-name containerPort: 5050 # Required port protocol: TCP
Note that the resource coordinator must be the last container in the pod other than the optional sidecar.
Note also that the RC pod(s)'s Service Account need the necessary privileges to call the Kubernetes control plane API. This can be achieved by running the following command:
kubectl create rolebinding default-view --clusterrole=view --serviceaccount=$NAMESPACE:default --namespace=$NAMESPACE
where
$NAMESPACE
is your cluster's namespace.
Kubernetes secrets
To setup image pull secrets, and license secrets:
Create a new secret named kx-repo-access
.
kubectl create secret docker-registry kx-repo-access \
--docker-username=<username> \
--docker-password=<password> \
--docker-server=registry.dl.kx.com
Create and apply new secret named kx-license-info
.
$ cat kx-license-secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: kx-license-info
type: Opaque
stringData:
license: <base64 license string>
$ kubectl apply -f kx-license-secret.yaml
Custom file
The default aggregation for any API is raze
. However, the Agg processes will load the q file pointed to by the KXI_CUSTOM_FILE
environment variable. In this file, you can:
* define/register custom aggregation functions,
* define which aggregation functions to invoke for which APIs,
* define metadata pertaining to the aggregation function (which is subsequently query-able using the .kxi.getMeta
API, see API page for details).
Note that while SG only supports loading a single file, you can load other files from within this file using \l
(allowing you to control load order). The current working directory (pwd
) at load time is the base directory of the file.
This can be combined with the Data Access microservice (which allows custom function definitions in the DAPs) to create full custom API support within KX Insights (see "Data Access" for details).
Note: It's recommended to avoid .sg*
and .sapi
namespaces to avoid colliding with SG functions.
Aggregation functions can be anything. Their input is a list of the output of the corresponding API defined in the DAPs. The output is a double consisting of a response header (dictionary) and response payload (anything). See header for details on response headers.
To define which aggregation function to use for an API, use the .sgagg.registerAggFn
API, whose signature is as follows.
* aggFn
- symbol - Aggregation function name.
* metadata
- list|string|dictionary - Aggregation function metadata (see "SAPI - Metadata" documentation).
* apis
- symbol|symbol[] - API(s) for which this should be the default aggregation function.
Aggregation function MUST be registered with .sgagg.registerAggFn
in order to be invoke-able by the Aggregator. See Custom file example below for an example.
Deferring responses.
In some situations, it may be necessary to defer a response, e.g. data is incomplete and we need to retry. In such situations, the aggregation function can
- Save partial results to the SAPI context (see below).
- Returns a
.sapi.defer
response (see header). - Recover saved context values on the response.
- Continue aggregation.
An example is provided below (Custom file example).
The SAPI context APIs are as follows:
-
.sapi.getCtx
- Gets stored context values.- Parameters:
k
- symbol|symbol[]|:: Key(s) to get, or null for the entire set of key value pairs.
- Output:
- (any|dictionary) Value(s) or full context dictionary.
- Parameters:
-
.sapi.setCtx
- Sets context values.- Parameters:
k
- symbol|symbol[] Key(s) to set.v
- any Value(s) to set.
- Parameters:
-
.sapi.addToCtx
- Adds to an existing context list/table.- Parameters:
k
- symbol|symbol[] Key(s) to add to.v
- any Value(s) to add.
- Parameters:
Example
Below is a sample configuration in two flavours: docker-compose and Kubernetes. Note: variables ${...}
are user-defined and based on your local directory structure/file names. Sections/lines marked Optional
are optional. Note in particular:
${registry}
: Registry from which you download images (e.g. "gcr.io/cloudpak").${*_version}
: Version of the image. Format is#.#.#
.
Docker-compose
Kubernetes
RC discovery sidecar
Config file (see "Discovery" for more details):
{
"connection": ":sgrc:5050",
"frequencySecs": 5,
"discovery":
{
"registry": ":proxy:4000",
"adaptor": "discEurekaAdaptor.q",
"heartbeatSecs": 30,
"leaseExpirySecs": 90
}
}
Discovery proxy config
Proxy config file (see "Discovery" for more details):
{
"discovery":
{
"registry": ":eureka:8761",
"adaptor": "discEurekaAdaptor.q"
}
}
Custom file example
The Agg process can load a custom code file, wherein you can define custom aggregation functions and define API to aggregation function mappings for APIs. Below is an example file, as some example API calls that exercise the custom code.
Note that aggregations functions must return a response header in addition to the response payload. Here we use the .sapi
response functions (see header for details).
// Sample Agg custom file.
// Can load other files within this file. Note that the current directory
// is the directory of this file (in this example: /opt/kx/custom).
\l subFolder/otherFile1.q
\l subFolder/otherFile2.q
//
// @desc An override to the default ping aggregation function. Instead of doing a raze,
// we just take the min (so true indicates all targets successful).
//
// @param res {boolean[]} Results from the DAPs.
//
// @return {(dictionary;boolean)} Response header + min of all DAP results.
//
pingAggOverride:{[res]
.sapi.ok min res
}
//
// @desc Agg function that does a plus join on a list of tables.
//
// @param tbls {table[]} List plus-joinable tables.
//
// @return {(dictionary;table)} Response header + plus join of tables.
//
pjAgg:{[tbls]
.sapi.ok(pj/)tbls
}
//
// @desc Agg function that does an average daily count by sym.
//
// @param tbls {table[]} List of tables with ``` `sym`date`cnt``` columns.
//
// @return {(dictionary;table)} Response header + average count by sym.
//
avAgg:{[tbls]
res:select sum cnt by sym,date from raze 0!'tbls; / Join common dates
.sapi.ok select avg cnt by sym from res / Average
}
//
// @desc Aggregates trade data, but defers if not enough responses.
//
// @param data {list[]} Partial responses from `.kxi.getData`.
//
// @return {dictionary;table|list)} Response header and payload or sub-request if deferring.
//
aggMinTrade:{[data]
t:.sgagg.getData data; / Execute normal getData aggregation
hdr:first t; / Response header
tbl:last t; / Table data
if[.RC.OK<>first[t]`rc; / If the response is no good
:.sapi.response[()!();(hdr`rc;hdr`ac;"Failed to aggregate getData: ",hdr`ai);()]]; / Fail
if[100<count tbl; / If we have enough data
:.sapi.ok tbl]; / Succeed
//
// Not enough data.
//
.sapi.setCtx[`prevData;tbl]; / Store existing values for later
.sapi.defer[`.kxi.getData;`table`startTS`agg!(`trade;1+max tbl`time;cols tbl);`.resume.aggMinTrade;()!()] / Defer
}
//
// @desc Resume function for `aggMinTrade`.
//
// @param data {list[]} Partial responses from `.kxi.getData`.
//
// @return {dictionary;table|list)} Response header and payload or sub-request if re-deferring.
//
.resume.aggMinTrade:{[data]
t:.sgagg.getData data; / Execute normal getData aggregation
hdr:first t; / Response header
tbl:last t; / Table data
if[.RC.OK<>first[t]`rc; / If the response is no good
:.sapi.response[()!();(hdr`rc;hdr`ac;"Failed to aggregate getData: ",hdr`ai);()]]; / Fail
prevData:.sapi.getCtx`tradeData; / Recover previous data
if[100<count[prevData]+count tbl; / Now if we have enough data
:sapi.ok prevData,tbl] / Succeed
//
// Still not enough data.
//
.sapi.addToCtx[`prevData;tbl]; / Accumulate
.sapi.defer[`.kxi.getData;`table`startTS`agg!(`trade;1+max tbl`time;cols tbl);`resume.aggMinTrade;()!()] / Redefer
}
//
// In order to be usable, aggregation functions MUST be registered with the Agg process. When registering,
// one can also set the aggregation function as the default aggregation function for one or more APIs.
// For example, Suppose we had an API defined in the DAPs that peforms a "count by" operation on a table:
//
// countBy:{[table;startTS;endTS;byCols]
// ?[table;enlist(within;`realTime;(startTS;endTS-1));{x!x,:()}byCols;enlist[`cnt]!enlist(count;`i)]
// }
//
// We can then register our aggregations functions thusly:
//
.sgagg.registerAggFn[`pingAggOverride;
.sapi.metaDescription["Custom override to .kxi.ping"],
.sapi.metaParam[`name`type`description!(`res;1h;"List of booleans indicating ping was successful")],
.sapi.metaReturn`type`description!(-1h;"The worst of all results"];
`$()
]
.sgagg.registerAggFn[`pjAgg;
.sapi.metaDescription["Plus join aggregation"],
.sapi.metaParam[`name`type`description!(`tbls;0h;"Tables received from DAPs")],
.sapi.metaReturn`type`description!(98h;"The plus join (over) of the tables");
`countBy // Register as default aggregation function for this API
]
.sgagg.registerAggFn[`avAgg;
.sapi.metaDescription["Average join aggregation"],
.sapi.metaParam[`name`type`description!(`tbls;0h;"Tables received from DAPs")],
.sapi.metaReturn`type`description!(98h;"The average join (over) of the tables");
`$()
]
.sgagg.registerAggFn[`custom;
.sapi.metaDescription["Gets more than 100 trade data rows"],
.sapi.metaParam[`name`type`description!(`data;0h;"getData response")],
.sapi.metaReturn`type`description(98h;"100+ rows of trade data");
`$()
]
//
// Note also that an aggregation function can be default aggregation function for multiple APIs. E.g.
// .sgagg.registerAggFn[`myAggFn;();`api1`api2`api3]
//
Calls to the GW can now reference these new agg functions. Note that agg function mappings can always be overridden with the aggFn
key in the request header.
q)h:hopen`:gwHost:5678 // Connect to GW
// Call ping without specifying the agg function. Defaults to raze.
q)last h(`.kxi.ping;`region`assetClass`startTS`endTS!(`amer`emea`apac;`equity;-0Wp;0Wp);`;(0#`)!())
111b
// Call with an override; we'll now just get the min value.
q)last h(`.kxi.ping;`region`assetClass`startTS`endTS!(`amer`emea`apac;`equity;-0Wp;0Wp);`;``aggFn!("";`pingAggOverride))
1b
// Call to our countBy API without specifying the aggregation function. Defaults to `pjAgg` as specified
// in the Agg's mapping.
q)last h(`countBy;`region`assetClass`startTS`endTS`table`byCols!(`amer;`equity;-0Wp;0Wp;`trade;`sym);(0#`)!())
sym | cnt
-------| ------
AAPL.N | 103212
MSFT.OQ| 100213
..
// Call to our countBy API and specify an aggFn override.
q)last h(`countBy;`region`assetClass`startTS`endTS`table`byCols!(`amer;`equity;-0Wp;0Wp;`trade;`sym`date);``aggFn!("";`avAgg))
sym | cnt
-------| ------
AAPL.N | 344.5
MSFT.OQ| 301.22
..
Tuning the log level
The kxi-sg-gw
log level can be configured with `KXI_LOG_LEVELS as above.
At runtime, the log level can be dynamically adjusted RESTfully by POSTing to the /log endpoint.
curl -X POST "https://${HOSTNAME}/log?level=debug
Garbage Collection
To configure garbage collection for the aggregator and resource-coordinator, set the garbage collection flag.
The default garbage collection mode is deferred.
For the service-gateway, message buffers are allocated per connected aggregator.
Upon connection, each thread is allocated KXI_SG_BUFFER_INITIAL
bytes. Message buffers will expand to fit any response up to the 2GB limit.
After returning a response, the buffers will shrink back to KXI_SG_BUFFER_RETAIN
bytes.
By default the KXI_SG_BUFFER_INITIAL
and the KXI_SG_BUFFER_RETAIN
are 2MB.