Skip to content

Data Access configuration

In its most basic form, Data Access is a set of Docker images that are combined using minimal configuration. Below is an explanation of the images required, what configuration parameters need to be defined, and an some example configurations.

Images

There is one image per process type in the service. The architecture allows for multiple DAs to be run in parallel (upstream gateways can load balance as desired).

process number required image
DA many Yes kxi-da

In addition, Data Access can optionally use KX Insights Service Discovery in order for processes to discover and connect with each other seamlessly (see the KXI Service Discovery documentation). Images required are as follows.

process description image
sidecar Discovery sidecar. kxi_sidecar
discovery Discovery client. Configure one, which all processes seamlessly connect to. kxi-eureka-discovery
proxy Discovery proxy. discovery_proxy

Environment variables

The DA microservice relies on certain environment variables to be defined in the containers. The variables are described below.

variable required containers description
KXI_NAME Yes DA Process name.
KXI_PORT No DA Port. Can also be started with "-p $KXI_PORT".
KXI_SC Yes DA Service Class type for data access (e.g. RDB, IDB,HDB)
KXI_LOG_FORMAT No DA, sidecar Message format (see qlog documentation).
KXI_LOG_DEST No DA, sidecar Endpoints (see qlog documentation).
KXI_LOG_LEVELS No DA, sidecar Component routing (see qlog documentation).
KXI_ASSEMBLY_FILE Yes DA Assembly yaml.file.
KXI_CONFIG_FILE Yes sidecar Discovery configuration file (see KXI Service Discovery documentation).
KXI_CUSTOM_FILE No DA File containing custom code to load in DA processes.
KXI_DAP_SANDBOX No DA Whether this DAP is a sandbox.
SBX_MAX_ROWS No DA Maximum number of rows, per partitioned table, to store in memory.
KXI_ALLOWED_SBX_APIS No DA Comma-delimited list of sandbox APIs to allow in non-sandbox DAPs (ex: ".kxi.sql,.kxi.qsql").
KXI_DA_RELOAD_STAGGER No DA Time in seconds between DAPs of the same class reloading after an EOX (default: 30)
KXI_DA_USE_REAPER No DA Whether to use KX Reaper and object storage cache - follow the (configuration)[#object-store-config] (default: false)
KXI_MAX_RECORD_INTV No DA Maximum number of records in an interval before triggering emergency reload (default: unlimited)

See example section below.

Object store config

The Data Access HDB processes are able to cache and reap object storage results to avoid repeated downloads of the same data.

Be sure to configure RT log archiving to not overlap with the cache

If using RT and the RT log volume, be sure to size the RT log volume appropriately to make additional room for the object storage cache.

The following environment variables should be set:

variable required containers description
KX_OBJSTOR_CACHE_PATH Yes (unless Platform) DA Path to where the object storage cache should be. This uses the RT Log Volume in Platform.
KX_OBJSTOR_CACHE_SIZE Yes DA Size of the object storage cache in MB. Increase the RT Log Volume by this amount in Platform.

For Platform, the RT Log Volume is used for the object storage cache. Since all RT Log Volumes must be sized identically for log archiving, increase the RT Log Volume by the object storage cache size. For example, for a 20Gi log volume and a desired 5Gi object storage cache, set the RT Log Volume size for the HDB to 25Gi and set KXI_DA_USE_REAPER to "true" for the HDB DAP element.

Names

Data Access process names help determine the order in which RDB processes reload, to help avoid processes all reloading at once. This is handled by Kubernetes StatefulSet configuration, which will name Pods as pod-name-<ordinal> and by Docker Compose, which will name containers as container-name_<ordinal>. In cases where this naming convention isn't followed, either explicitly or via Kubernetes/Docker Compose, the reloads will be immediate with no staggering. See the KXI_DA_RELOAD_STAGGER to control the time period between reloads.

Assembly

The assembly configuration is a yaml file that defines the DA configuration, i.e. what data it is expected to offer, how it responds to queries. Assemblies are used in all KX Insights microservices.

field required description
name Yes Assembly name.
description No Description of the assembly.
labels Yes Labels (i.e. dimensions) along which the data is partitioned in the DAs, and possible values (see Labels).
tables Yes Schema for tables to be loaded into DAs.
bus No Messaging protocol to be used by streaming DAs.
mounts Yes Reference mount point for that a DA is expected to surface data for. In-memory mounts are referred to as stream
elements Yes Additional, service specific configuration (see Elements).

See Labels/Elements or Example for example assembly yaml configurations. The assembly yaml file must be included in the Docker container.

Labels

Labels are used to define the DA purview. That is, the data that it grants access to. If using the KX Insights Service Gateway, these are the values reported as the DAP's purview (see "Service Gateway" page).

Below are some examples.

Example 1 - Provides FX data for America.

labels:
    region: amer
    assetClass: fx

Example 2 - Provides electrical, weekly billing for residential customers.

labels:
    sensorType: electric
    clientType: residential
    billing: weekly

Tables

A Table schema has the following structure:

  • description: String describing the purpose of this table. Optional.
  • type: String; one of {splayed, partitioned}.
  • primaryKeys: List of names of primary key columns. Optional.
  • partCol: Name of a column to be used for storage partitioning. Optional.
  • blockSize: Integer; Number of rows to keep in-memory before SM writes to disk. Optional.
  • updTsCol: Name of the arrival timestamp column. Optional.
  • columns: List of column schemas.

A column schema has the following structure:

  • name: Name of the column.
  • description: String describing the purpose of this column. Optional.
  • type: Q type name.
  • foreign: This column is a foreign key into another table in this assembly of the form table.column. Optional.
  • attrMem: String; column attribute when stored in memory. Optional.
  • attrDisk: String; column attribute when stored on disk. Optional.
  • attrOrd: String; column attribute when stored on disk with an ordinal partition scheme. Optional.
  • attrObj: String; column attribute when stored in Object store (e.g. S3). Optional.

Bus

Data Access ingests data from an event stream; a Bus contains the information necessary to subscribe to that stream.

The bus section consists of a dictionary of bus entries. Each bus entry provides several fields:

  • protocol: Short string indicating the protocol of the messaging system. Currently, the only valid choices for this protocol are custom and rt. A protocol of custom indicates that custom Q code should be loaded from the path given by an environment variable KXI_RT_LIB. A protocl of rt indicates that the data access process will be using the Insights Realtime Transport protocol.
  • topic: String indicating the subset of messages in this stream that consumers are interested in.
  • nodes: List of one or more connection strings to machines/services which can be used for subscribing to this bus. In the case of the custom protocol, this list should contain a single hostname:port string.

Mounts

Data Access can mount data from any of the supported tiers each with its own locality and format. Loosely speaking the type of Data Access process is defined by the type of Mount. Where stream is similar to a traditional kdb+ RDB, and local equivalent to an HDB. The object tier is unique to cloud based storage.

The Mounts section is a dictionary mapping user-defined names of storage locations to dictionaries with the following fields:

  • type: String; one of {stream, local, object}.
  • baseURI: String URI representing where that data can be mounted by other services. Presently this supports the file:// URI schema, or object storage URIs.
  • partition: Partitioning scheme for this mount. One of:
  • none: do not partition; store in the order it arrives.
  • ordinal: partition by a numeric virtual column which increments according to a corresponding storage tier's schedule and resets when the subsequent tier (if any) rolls over.
  • date: partition by each table's partCol column, interpreted as a date.
  • sym: (Object storage only) A file:// URI or object storage URI path to a sym file
  • par: (Object storage only) A file:// URI or object storage URI path to a par.txt file
  • storageURI: (Object storage only) An object storage URI that points to a database.

Notes:

  • A mount of type stream must be partition:none.
  • A mount of type local or object must be partition:ordinal or partition:date.

Elements

Assemblies coordinate a number of processes and/or microservices, which we call elements of the assembly. The elements section provides configuration details which are only relevant to individual services. This guide will focus on the configuration options for Data Access, which go in the dap entry of elements.

The dap element configuration has the following configuration parameters:

  • sgArch: Architecture of service gateway process. Support for traditional and asymmetric. Default is asymmetric if unspecified.
  • rcEndpoints: List of hostname:port strings of known resource coordinators to connect to if the discovery service is unavailable.
  • rcName: The name of the resource coordinator for the DAP to connect to, as defined by its KXI_NAME environment variable.
  • smEndpoints: The hostname:port strings of storage manager service for data accesss process to connnect to.
  • tableLoad: How to populate in-memory database tables. Support for empty, splay, and links. Default behaviour is empty.
  • mountName: Name of mount from mounts section of assembly for DA to mount and provide access to.
  • mapPartitions: Whether a local mount should map partitions after a remount. See kdb+ documentation here.
  • purview : Inclusive start, exclusive end purview for startup of DA process
  • enforceSchema : Whether stream DAP should validate all incoming table data against what's defined in the schema. There is a performance cost having this enabled.

Within the assembly it is structured under the dap element, instances. Config that applies to all DAPs are indented one level above the instances themselves. This can be overridden at the instance level as well.

elements:
  dap:
    # These configs apply to all DA below
    rcName: sg_rc # Used with discovery to determine resource coordinator to connect to
    instances:
      RDB:
        # Config specific to DAPs with a KXI_SC of RDB
        mountName: rdb # Must match name of mount in "mounts" section
      IDB:
        # Config specific to DAPs with a KXI_SC of IDB
        mountName: idb
      HDB:
        # Config specific to DAPs with a KXI_SC of HDB
        mountName: hdb

Custom file

The DA processes load the q file pointed to by the KXI_CUSTOM_FILE environment variable. In this file, you can load any custom APIs/functions that you want accessible by the DA processes. Note that while DA only supports loading a single file, you can load other files from within this file using \l (allowing you to control load order). The current working directory (pwd) at load time is the base directory of the file.

This can be combined with the Service Gateway microservice (which allows custom aggregation functions) to create full custom API support within KX Insights (see "Service Gateway" for details).

Note: It's recommended to avoid .da* namespaces to avoid colliding with DA functions.

To make an API executable within DA, use the .sgagg.registerAPI API, whose signature is as follows. * api - symbol - Aggregation function name. * metadata - list|string|dictionary - Aggregation function metadata (see "SAPI - Metadata" documentation).

API functions MUST be registered with .sgagg.registerAPI in order to be invoke-able by the DA processes. See Custom file example below for an example.

If using the Service Gateway microservice, you can see which APIs are available (and in which DAP), use the .kxi.getMeta API (See "SG - APIs").

When creating custom analytics that access data there is a helper function .kxi.selectTable which understands the data model within each DAP and can help select from the tables necessary to return the appropriate records. It's interface is as follows:

Name Type Description
tn symbol Name of table to retrieve data from
ts timestamp[2] Time period of interest
wc list[] Where clause of what to select
bc dict/boolean By clause for select
cn symbol Names of columns to select for. Include any columns needed in aggregations
agg dict Select clause/aggregations to apply to table

Sandbox Mode

If a data access process is passed the environment variable KXI_DAP_SANDBOX with a value of "true" then it will be started in a a "sandboxed" mode. Under this mode the DAP will not initialize connections to the resource coordinator, or storage manager. In addition local mount types will load any splayed tables into memory.

For stream mounts there is an additional environment parameter SBX_MAX_ROWS which the DAP will use to limit the number of rows a partitioned table has in memory. When it's set the only the last SBX_MAX_ROWS records received/updated will be kept in memory.

Example

Below is a sample configuration. We use a docker-compose yaml, but this can be adapted to other formats. Note: variables ${...} are user-defined and based on your local directory structure/file names. Sections/lines marked Optional are optional.

Docker-compose

#
# Optional: Create volumes to include license/configuration in the containers.
#
x-vols: &vols
    volumes:
    - ${kx_license_dir}:/opt/kx/lic
    - ${cfg_dir}:/opt/kx/cfg
    - ${mnt_dir}:/data
    - ${custom_dir}:/opt/kx/custom # Optional mount for loading custom code

#
# Optional: Create a network for processes to communicate.
#
x-kxnet: &kxnet
    networks:
    - kx

networks:
    kx:
    name: kx
    driver: bridge

#
# Services.
#
services:

    #
    # Realtime Database
    #
    rdb:
    image: kxi-da:0.8.0
    command: -p 5080
    environment:
        - KXI_NAME=rdb
        - KXI_SC=RDB
        - KXI_LOG_FORMAT=text # Optional
        - KXI_LOG_LEVELS=default:trace # Optional
        - KXI_ASSEMBLY_FILE=/opt/kx/cfg/assembly/${assembly_file_yaml}
        - KXI_RT_LIB=/opt/kx/cfg/docker/rt_tick_client_lib.q
        - KXI_CUSTOM_FILE=/opt/kx/custom/${custom_rdb_code}.q # Optional
    ports:
        - 5080-5084:5080
    deploy:
        mode: replicated
        replicas: 2
    <<: *vols # Optional
    <<: *kxnet # Optional

    #
    # Optional: RDB sidecar. Only required if using discovery, otherwise, may be omitted.
    #
    rdb_sidecar:
        image: kxi_sidecar:0.8.0
        environment:
        - KXI_CONFIG_FILE=/opt/kx/cfg/${rdb_sidecar_config_json}
        - KXI_LOG_LEVELS=default:debug # Optional
        <<: *vols # Optional
        <<: *kxnet # Optional

    #
    # Intraday Database
    #
    idb:
    image: kxi-da:0.8.0
    command: -p 5090
    environment:
        - KXI_NAME=idb
        - KXI_SC=IDB
        - KXI_LOG_FORMAT=text # Optional
        - KXI_LOG_LEVELS=default:trace # Optional
        - KXI_ASSEMBLY_FILE=/opt/kx/cfg/assembly/${assembly_file_yaml}
        - KXI_CUSTOM_FILE=/opt/kx/custom/${custom_idb_code}.q # Optional
    ports:
        - 5090-5094:5090
    deploy:
        mode: replicated
        replicas: 2
    <<: *vols # Optional
    <<: *kxnet # Optional

    #
    # Historical Database
    #
    hdb:
    image: kxi-da:0.8.0
    command: -p 5100
    environment:
        - KXI_NAME=hdb
        - KXI_SC=HDB
        - KXI_LOG_FORMAT=text # Optional
        - KXI_LOG_LEVELS=default:trace # Optional
        - KXI_ASSEMBLY_FILE=/opt/kx/cfg/assembly/${assembly_file_yaml}
        - KXI_CUSTOM_FILE=/opt/kx/custom/${custom_hdb_code}.q # Optional
    ports:
        - 5100-5104:5090
    deploy:
        mode: replicated
        replicas: 2
    <<: *vols # Optional
    <<: *kxnet # Optional

    #
    # Optional: Eureka Service Discovery Registry. Only required if using discovery, otherwise, may be omitted.
    #
    eureka:
        image: kxi-eureka-discovery:0.8.0
        ports:
        - 9000:8761

    #
    # Optional: Discovery proxy. Only required if using discovery, otherwise, may be omitted.
    #
    proxy:
        image: discovery_proxy:0.8.0
    ports:
        - 4000:4000
    environment:
        - KXI_CONFIG_FILE=/opt/app/cfg/${proxy_config_json}
    command: -p 4000

Assembly

Here's an example assembly configuration where the Data Access processes are tagged with a region of "New York" and an assetClass of "stocks".

name: integration-env
description: Data access assembly configuration
labels:
  region: New York
  assetClass: stocks

tables:
  trade:
    description: Trade data
    type: partitioned
    blockSize: 10000
    prtnCol: realTime
    columns:
      - name: time
        description: Time
        type: timespan
      - name: sym
        description: Symbol name
        type: symbol
        attrMemory: grouped
        attrDisk: parted
        attrOrd: parted
      - name: realTime
        description: Real timestamp
        type: timestamp
      - name: price
        description: Trade price
        type: float
      - name: size
        description: Trade size
        type: long

  quote:
    description: Quote data
    type: partitioned
    blockSize: 10000
    prtnCol: realTime
    columns:
      - name: time
        description: Time
        type: timespan
      - name: sym
        description: Symbol name
        type: symbol
        attrMemory: grouped
        attrDisk: parted
        attrOrd: parted
      - name: realTime
        description: Real timestamp
        type: timestamp
      - name: bid
        description: Bid price
        type: float
      - name: ask
        description: Ask price
        type: float
      - name: bidSize
        description: Big size
        type: long
      - name: askSize
        description: Ask size
        type: long

bus:
  stream:
    protocol: custom
    nodes: tp:5000
    topic: dataStream

mounts:
  rdb:
    type: stream
    uri: file://stream
    partition: none
  idb:
    type: local
    uri: file://data/db/idb/current
    partition: ordinal
  hdb:
    type: local
    uri: file://data/db/hdb/current
    partition: date

elements:
  dap:
    gwAssembly: gw-assembly
    instances:
      RDB:
        mountName: rdb
      IDB:
        mountName: idb
      HDB:
        mountName: hdb

RDB discovery sidecar

Config file configured as per discovery documentation.

{
    "connection": ":rdb:5080",
    "frequencySecs": 5,
    "discovery":
    {
        "registry": ":proxy:4000",
        "adaptor": "discEurekaAdaptor.q",
        "heartbeatSecs": 30,
        "leaseExpirySecs": 90
    }
}

Custom file

Each DA process can load a custom file for custom API support. For example,

// Sample DA custom file.

// Can load other files within this file. Note that the current directory
// is the directory of this file (in this example: /opt/kx/custom).
\l subFolder/otherFile1.q
\l subFolder/otherFile2.q

//
// @desc Define a new API. Counts number of entries by specified columns.
//
// @param table     {symbol}            Table name.
// @param byCols    {symbol|symbol[]}   Column(s) to count by.
// @param startTS   {timestamp}         Start time (inclusive).
// @param endTS     {timestamp}         End time (exclusive).
//
// @return          {table}             Count by specified columns.
//
countBy:{[table;startTS;endTS;byCols]
    ?[table;enlist(within;`realTime;(startTS;endTS-1));{x!x,:()}byCols;enlist[`cnt]!enlist(count;`i)]
    }

// Register with the DA process.
.da.registerAPI[`countBy;
    .sapi.metaDescription["Define a new API. Counts number of entries by specified columns."],
    .sapi.metaParam[`name`type`isReq`description!(`table;-11h;1b;"Table name.")],
    .sapi.metaParam[`name`type`isReq`description!(`byCols;-11 11h;1b;"Column(s) to count by.")],
    .sapi.metaParam[`name`type`isReq`description!(`startTS;-12h;1b;"Start time (inclusive).")],
    .sapi.metaParam[`name`type`isReq`description!(`endTS;-12h;1b;"End time (exclusive).")],
    .sapi.metaReturn[`type`description!(98h;"Count by specified columns.")]
    .sapi.metaMisc[enlist[`safe]!enlist 1b]
    ]

// etc...