Data Access configuration
In its most basic form, Data Access is a set of Docker images that are combined using minimal configuration. Below is an explanation of the images required, what configuration parameters need to be defined, and an some example configurations.
Images
There is one image per process type in the service. The architecture allows for multiple DAs to be run in parallel (upstream gateways can load balance as desired).
process | number | required | image |
---|---|---|---|
DA | many | Yes | kxi-da |
In addition, Data Access can optionally use KX Insights Service Discovery in order for processes to discover and connect with each other seamlessly (see the KXI Service Discovery documentation). Images required are as follows.
process | description | image |
---|---|---|
sidecar | Discovery sidecar. | kxi_sidecar |
discovery | Discovery client. Configure one, which all processes seamlessly connect to. | kxi-eureka-discovery |
proxy | Discovery proxy. | discovery_proxy |
Environment variables
The DA microservice relies on certain environment variables to be defined in the containers. The variables are described below.
variable | required | containers | description |
---|---|---|---|
KXI_NAME | Yes | DA | Process name. |
KXI_PORT | No | DA | Port. Can also be started with "-p $KXI_PORT" . |
KXI_SC | Yes | DA | Service Class type for data access (e.g. RDB, IDB,HDB) |
KXI_LOG_FORMAT | No | DA, sidecar | Message format (see qlog documentation). |
KXI_LOG_DEST | No | DA, sidecar | Endpoints (see qlog documentation). |
KXI_LOG_LEVELS | No | DA, sidecar | Component routing (see qlog documentation). |
KXI_ASSEMBLY_FILE | Yes | DA | Assembly yaml.file. |
KXI_CONFIG_FILE | Yes | sidecar | Discovery configuration file (see KXI Service Discovery documentation). |
KXI_CUSTOM_FILE | No | DA | File containing custom code to load in DA processes. |
KXI_DAP_SANDBOX | No | DA | Whether this DAP is a sandbox. |
SBX_MAX_ROWS | No | DA | Maximum number of rows, per partitioned table, to store in memory. |
KXI_ALLOWED_SBX_APIS | No | DA | Comma-delimited list of sandbox APIs to allow in non-sandbox DAPs (ex: ".kxi.sql,.kxi.qsql"). |
KXI_DA_RELOAD_STAGGER | No | DA | Time in seconds between DAPs of the same class reloading after an EOX (default: 30 ) |
KXI_DA_USE_REAPER | No | DA | Whether to use KX Reaper and object storage cache - follow the (configuration)[#object-store-config] (default: false ) |
KXI_MAX_RECORD_INTV | No | DA | Maximum number of records in an interval before triggering emergency reload (default: unlimited ) |
See example section below.
Object store config
The Data Access HDB processes are able to cache and reap object storage results to avoid repeated downloads of the same data.
Be sure to configure RT log archiving to not overlap with the cache
If using RT and the RT log volume, be sure to size the RT log volume appropriately to make additional room for the object storage cache.
The following environment variables should be set:
variable | required | containers | description |
---|---|---|---|
KX_OBJSTOR_CACHE_PATH | Yes (unless Platform) | DA | Path to where the object storage cache should be. This uses the RT Log Volume in Platform. |
KX_OBJSTOR_CACHE_SIZE | Yes | DA | Size of the object storage cache in MB. Increase the RT Log Volume by this amount in Platform. |
For Platform, the RT Log Volume is used for the object storage cache. Since all RT Log Volumes must be sized identically for log archiving, increase the RT Log Volume by the object storage cache size. For example, for a 20Gi log volume and a desired 5Gi object storage cache, set the RT Log Volume size for the HDB to 25Gi and set KXI_DA_USE_REAPER
to "true"
for the HDB DAP element.
Names
Data Access process names help determine the order in which RDB
processes reload, to help avoid processes all reloading at once. This is handled by Kubernetes StatefulSet configuration, which will name Pods as pod-name-<ordinal>
and by Docker Compose, which will name containers as container-name_<ordinal>
. In cases where this naming convention isn't followed, either explicitly or via Kubernetes/Docker Compose, the reloads will be immediate with no staggering. See the KXI_DA_RELOAD_STAGGER
to control the time period between reloads.
Assembly
The assembly configuration is a yaml file that defines the DA configuration, i.e. what data it is expected to offer, how it responds to queries. Assemblies are used in all KX Insights microservices.
field | required | description |
---|---|---|
name | Yes | Assembly name. |
description | No | Description of the assembly. |
labels | Yes | Labels (i.e. dimensions) along which the data is partitioned in the DAs, and possible values (see Labels). |
tables | Yes | Schema for tables to be loaded into DAs. |
bus | No | Messaging protocol to be used by streaming DAs. |
mounts | Yes | Reference mount point for that a DA is expected to surface data for. In-memory mounts are referred to as stream |
elements | Yes | Additional, service specific configuration (see Elements). |
See Labels/Elements or Example for example assembly yaml configurations. The assembly yaml file must be included in the Docker container.
Labels
Labels are used to define the DA purview. That is, the data that it grants access to. If using the KX Insights Service Gateway, these are the values reported as the DAP's purview (see "Service Gateway" page).
Below are some examples.
Example 1 - Provides FX data for America.
labels:
region: amer
assetClass: fx
Example 2 - Provides electrical, weekly billing for residential customers.
labels:
sensorType: electric
clientType: residential
billing: weekly
Tables
A Table schema has the following structure:
description
: String describing the purpose of this table. Optional.type
: String; one of {splayed
,partitioned
}.primaryKeys
: List of names of primary key columns. Optional.partCol
: Name of a column to be used for storage partitioning. Optional.blockSize
: Integer; Number of rows to keep in-memory before SM writes to disk. Optional.updTsCol
: Name of the arrival timestamp column. Optional.columns
: List of column schemas.
A column schema has the following structure:
name
: Name of the column.description
: String describing the purpose of this column. Optional.type
: Q type name.foreign
: This column is a foreign key into another table in this assembly of the formtable.column
. Optional.attrMem
: String; column attribute when stored in memory. Optional.attrDisk
: String; column attribute when stored on disk. Optional.attrOrd
: String; column attribute when stored on disk with anordinal
partition scheme. Optional.attrObj
: String; column attribute when stored in Object store (e.g. S3). Optional.
Bus
Data Access ingests data from an event stream; a Bus contains the information necessary to subscribe to that stream.
The bus
section consists of a dictionary of bus entries. Each bus entry provides several fields:
protocol
: Short string indicating the protocol of the messaging system. Currently, the only valid choices for this protocol arecustom
andrt
. A protocol ofcustom
indicates that custom Q code should be loaded from the path given by an environment variableKXI_RT_LIB
. A protocl ofrt
indicates that the data access process will be using the Insights Realtime Transport protocol.topic
: String indicating the subset of messages in this stream that consumers are interested in.nodes
: List of one or more connection strings to machines/services which can be used for subscribing to this bus. In the case of thecustom
protocol, this list should contain a singlehostname:port
string.
Mounts
Data Access can mount data from any of the supported tiers each with its own locality and format. Loosely speaking the type of Data Access process is defined by the type
of Mount. Where stream
is similar to a traditional kdb+ RDB, and local
equivalent to an HDB. The object
tier is unique to cloud based storage.
The Mounts section is a dictionary mapping user-defined names of storage locations to dictionaries with the following fields:
type
: String; one of {stream
,local
,object
}.baseURI
: String URI representing where that data can be mounted by other services. Presently this supports thefile://
URI schema, or object storage URIs.partition
: Partitioning scheme for this mount. One of:none
: do not partition; store in the order it arrives.ordinal
: partition by a numeric virtual column which increments according to a corresponding storage tier'sschedule
and resets when the subsequent tier (if any) rolls over.date
: partition by each table'spartCol
column, interpreted as a date.sym
: (Object storage only) A file:// URI or object storage URI path to a sym filepar
: (Object storage only) A file:// URI or object storage URI path to a par.txt filestorageURI
: (Object storage only) An object storage URI that points to a database.
Notes:
- A mount of type
stream
must bepartition:none
. - A mount of type
local
orobject
must bepartition:ordinal
orpartition:date
.
Elements
Assemblies coordinate a number of processes and/or microservices, which we call elements of the assembly. The elements
section provides configuration details which are only relevant to individual services. This guide will focus on the configuration options for Data Access, which go in the dap
entry of elements
.
The dap
element configuration has the following configuration parameters:
sgArch
: Architecture of service gateway process. Support fortraditional
andasymmetric
. Default isasymmetric
if unspecified.rcEndpoints
: List ofhostname:port
strings of known resource coordinators to connect to if the discovery service is unavailable.rcName
: The name of the resource coordinator for the DAP to connect to, as defined by itsKXI_NAME
environment variable.smEndpoints
: Thehostname:port
strings of storage manager service for data accesss process to connnect to.tableLoad
: How to populate in-memory database tables. Support forempty
,splay
, andlinks
. Default behaviour isempty
.mountName
: Name of mount frommounts
section of assembly for DA to mount and provide access to.mapPartitions
: Whether a local mount should map partitions after a remount. See kdb+ documentation here.purview
: Inclusive start, exclusive end purview for startup of DA processenforceSchema
: Whether stream DAP should validate all incoming table data against what's defined in the schema. There is a performance cost having this enabled.
Within the assembly it is structured under the dap
element, instances. Config that applies to all DAPs are indented one level above the instances themselves. This can be overridden at the instance level as well.
elements:
dap:
# These configs apply to all DA below
rcName: sg_rc # Used with discovery to determine resource coordinator to connect to
instances:
RDB:
# Config specific to DAPs with a KXI_SC of RDB
mountName: rdb # Must match name of mount in "mounts" section
IDB:
# Config specific to DAPs with a KXI_SC of IDB
mountName: idb
HDB:
# Config specific to DAPs with a KXI_SC of HDB
mountName: hdb
Custom file
The DA processes load the q file pointed to by the KXI_CUSTOM_FILE
environment variable. In this file, you can load any custom APIs/functions that you want accessible by the DA processes. Note that while DA only supports loading a single file, you can load other files from within this file using \l
(allowing you to control load order). The current working directory (pwd
) at load time is the base directory of the file.
This can be combined with the Service Gateway microservice (which allows custom aggregation functions) to create full custom API support within KX Insights (see "Service Gateway" for details).
Note: It's recommended to avoid .da*
namespaces to avoid colliding with DA functions.
To make an API executable within DA, use the .sgagg.registerAPI
API, whose signature is as follows.
* api
- symbol - Aggregation function name.
* metadata
- list|string|dictionary - Aggregation function metadata (see "SAPI - Metadata" documentation).
API functions MUST be registered with .sgagg.registerAPI
in order to be invoke-able by the DA processes. See Custom file example below for an example.
If using the Service Gateway microservice, you can see which APIs are available (and in which DAP), use the .kxi.getMeta
API (See "SG - APIs").
When creating custom analytics that access data there is a helper function .kxi.selectTable
which understands the data model within each DAP and can help select from the tables necessary to return the appropriate records. It's interface is as follows:
Name | Type | Description |
---|---|---|
tn | symbol | Name of table to retrieve data from |
ts | timestamp[2] | Time period of interest |
wc | list[] | Where clause of what to select |
bc | dict/boolean | By clause for select |
cn | symbol | Names of columns to select for. Include any columns needed in aggregations |
agg | dict | Select clause/aggregations to apply to table |
Sandbox Mode
If a data access process is passed the environment variable KXI_DAP_SANDBOX
with a value of "true" then it will be started in a a "sandboxed" mode. Under this mode the DAP will not initialize connections to the resource coordinator, or storage manager. In addition local
mount types will load any splayed tables into memory.
For stream
mounts there is an additional environment parameter SBX_MAX_ROWS
which the DAP will use to limit the number of rows a partitioned table has in memory. When it's set the only the last SBX_MAX_ROWS
records received/updated will be kept in memory.
Example
Below is a sample configuration. We use a docker-compose yaml, but this can be adapted to other formats. Note: variables ${...}
are user-defined and based on your local directory structure/file names. Sections/lines marked Optional
are optional.
Docker-compose
#
# Optional: Create volumes to include license/configuration in the containers.
#
x-vols: &vols
volumes:
- ${kx_license_dir}:/opt/kx/lic
- ${cfg_dir}:/opt/kx/cfg
- ${mnt_dir}:/data
- ${custom_dir}:/opt/kx/custom # Optional mount for loading custom code
#
# Optional: Create a network for processes to communicate.
#
x-kxnet: &kxnet
networks:
- kx
networks:
kx:
name: kx
driver: bridge
#
# Services.
#
services:
#
# Realtime Database
#
rdb:
image: kxi-da:0.8.0
command: -p 5080
environment:
- KXI_NAME=rdb
- KXI_SC=RDB
- KXI_LOG_FORMAT=text # Optional
- KXI_LOG_LEVELS=default:trace # Optional
- KXI_ASSEMBLY_FILE=/opt/kx/cfg/assembly/${assembly_file_yaml}
- KXI_RT_LIB=/opt/kx/cfg/docker/rt_tick_client_lib.q
- KXI_CUSTOM_FILE=/opt/kx/custom/${custom_rdb_code}.q # Optional
ports:
- 5080-5084:5080
deploy:
mode: replicated
replicas: 2
<<: *vols # Optional
<<: *kxnet # Optional
#
# Optional: RDB sidecar. Only required if using discovery, otherwise, may be omitted.
#
rdb_sidecar:
image: kxi_sidecar:0.8.0
environment:
- KXI_CONFIG_FILE=/opt/kx/cfg/${rdb_sidecar_config_json}
- KXI_LOG_LEVELS=default:debug # Optional
<<: *vols # Optional
<<: *kxnet # Optional
#
# Intraday Database
#
idb:
image: kxi-da:0.8.0
command: -p 5090
environment:
- KXI_NAME=idb
- KXI_SC=IDB
- KXI_LOG_FORMAT=text # Optional
- KXI_LOG_LEVELS=default:trace # Optional
- KXI_ASSEMBLY_FILE=/opt/kx/cfg/assembly/${assembly_file_yaml}
- KXI_CUSTOM_FILE=/opt/kx/custom/${custom_idb_code}.q # Optional
ports:
- 5090-5094:5090
deploy:
mode: replicated
replicas: 2
<<: *vols # Optional
<<: *kxnet # Optional
#
# Historical Database
#
hdb:
image: kxi-da:0.8.0
command: -p 5100
environment:
- KXI_NAME=hdb
- KXI_SC=HDB
- KXI_LOG_FORMAT=text # Optional
- KXI_LOG_LEVELS=default:trace # Optional
- KXI_ASSEMBLY_FILE=/opt/kx/cfg/assembly/${assembly_file_yaml}
- KXI_CUSTOM_FILE=/opt/kx/custom/${custom_hdb_code}.q # Optional
ports:
- 5100-5104:5090
deploy:
mode: replicated
replicas: 2
<<: *vols # Optional
<<: *kxnet # Optional
#
# Optional: Eureka Service Discovery Registry. Only required if using discovery, otherwise, may be omitted.
#
eureka:
image: kxi-eureka-discovery:0.8.0
ports:
- 9000:8761
#
# Optional: Discovery proxy. Only required if using discovery, otherwise, may be omitted.
#
proxy:
image: discovery_proxy:0.8.0
ports:
- 4000:4000
environment:
- KXI_CONFIG_FILE=/opt/app/cfg/${proxy_config_json}
command: -p 4000
Assembly
Here's an example assembly configuration where the Data Access processes are tagged with a region of "New York" and an assetClass
of "stocks".
name: integration-env
description: Data access assembly configuration
labels:
region: New York
assetClass: stocks
tables:
trade:
description: Trade data
type: partitioned
blockSize: 10000
prtnCol: realTime
columns:
- name: time
description: Time
type: timespan
- name: sym
description: Symbol name
type: symbol
attrMemory: grouped
attrDisk: parted
attrOrd: parted
- name: realTime
description: Real timestamp
type: timestamp
- name: price
description: Trade price
type: float
- name: size
description: Trade size
type: long
quote:
description: Quote data
type: partitioned
blockSize: 10000
prtnCol: realTime
columns:
- name: time
description: Time
type: timespan
- name: sym
description: Symbol name
type: symbol
attrMemory: grouped
attrDisk: parted
attrOrd: parted
- name: realTime
description: Real timestamp
type: timestamp
- name: bid
description: Bid price
type: float
- name: ask
description: Ask price
type: float
- name: bidSize
description: Big size
type: long
- name: askSize
description: Ask size
type: long
bus:
stream:
protocol: custom
nodes: tp:5000
topic: dataStream
mounts:
rdb:
type: stream
uri: file://stream
partition: none
idb:
type: local
uri: file://data/db/idb/current
partition: ordinal
hdb:
type: local
uri: file://data/db/hdb/current
partition: date
elements:
dap:
gwAssembly: gw-assembly
instances:
RDB:
mountName: rdb
IDB:
mountName: idb
HDB:
mountName: hdb
RDB discovery sidecar
Config file configured as per discovery documentation.
{
"connection": ":rdb:5080",
"frequencySecs": 5,
"discovery":
{
"registry": ":proxy:4000",
"adaptor": "discEurekaAdaptor.q",
"heartbeatSecs": 30,
"leaseExpirySecs": 90
}
}
Custom file
Each DA process can load a custom file for custom API support. For example,
// Sample DA custom file.
// Can load other files within this file. Note that the current directory
// is the directory of this file (in this example: /opt/kx/custom).
\l subFolder/otherFile1.q
\l subFolder/otherFile2.q
//
// @desc Define a new API. Counts number of entries by specified columns.
//
// @param table {symbol} Table name.
// @param byCols {symbol|symbol[]} Column(s) to count by.
// @param startTS {timestamp} Start time (inclusive).
// @param endTS {timestamp} End time (exclusive).
//
// @return {table} Count by specified columns.
//
countBy:{[table;startTS;endTS;byCols]
?[table;enlist(within;`realTime;(startTS;endTS-1));{x!x,:()}byCols;enlist[`cnt]!enlist(count;`i)]
}
// Register with the DA process.
.da.registerAPI[`countBy;
.sapi.metaDescription["Define a new API. Counts number of entries by specified columns."],
.sapi.metaParam[`name`type`isReq`description!(`table;-11h;1b;"Table name.")],
.sapi.metaParam[`name`type`isReq`description!(`byCols;-11 11h;1b;"Column(s) to count by.")],
.sapi.metaParam[`name`type`isReq`description!(`startTS;-12h;1b;"Start time (inclusive).")],
.sapi.metaParam[`name`type`isReq`description!(`endTS;-12h;1b;"End time (exclusive).")],
.sapi.metaReturn[`type`description!(98h;"Count by specified columns.")]
.sapi.metaMisc[enlist[`safe]!enlist 1b]
]
// etc...