Skip to content

Docker reference deployment

A reference deployment using Docker Compose is provided below.

Pre-requisites

Pulling the images requires a login with:

docker login registry.dl.kx.com -u <username> -p <password>

Directory structure

Before running this example, the following directory structure should be created:

db/     # Empty directory where the database will be stored on disk
cfg/    # Directory for configuration files
    assembly.yaml
    rt_tick_client_lib.q
src/    # Directory for custom API code
    agg.q
    da.q
kdb-tick/   # Clone of kdb-tick for a tickerplant service
    sym/
    tick/{r.q, u.q, sym.q}
    tick.q
.env
docker-compose-sg.yaml
docker-compose-da.yaml
docker-compose-sm.yaml
docker-compose-tp.yaml

Write permissions

The db/ and kdb-tick/sym directory on the host must allow write permission by the SM and TP containers who run as the user "nobody".

  • The sources for agg.q and da.q for this example are provided here.
  • kdb-tick/ is cloned from the KX github.
  • The sym.q schema must include the _prtnEnd and _reload tables, as in:

```q title="kdb-tick/tick/sym.q"

// internal tables // with time and sym columns added by RT client for compatibility ($"_prtnEnd")set ([] time:"n"$(); sym:\((); signal:$(); endTS:"p"$(); opts:()); (\)"_reload")set ([] time:"n"\((); sym:$(); mount:\)(); params:())

trade:([] time:"n"\((); sym:$(); realTime:"p"$(); price:"f"$(); size:"j"$()) quote:([] time:"n"$(); sym:\)(); realTime:"p"\((); bid:"f"\)(); ask:"f"\((); bidSize:"j"\)(); askSize:"j"$())


## Run

To run, execute the following:

```bash
docker-compose \
    -f docker-compose-tp.yaml \
    -f docker-compose-da.yaml \
    -f docker-compose-sm.yaml \
    -f docker-compose-sg.yaml \
    up

Environment

Each of the Docker Compose files below use a .env file specifying the images and licenses to be used.

```bash title=".env"

Images and license

KDB_LICENSE_B64="[MASKED]" kxi_sg_gw=registry.dl.kx.com/kxi-sg-gw:1.2.0 kxi_sg_rc=registry.dl.kx.com/kxi-sg-rc:1.2.0 kxi_sg_agg=registry.dl.kx.com/kxi-sg-agg:1.2.0 kxi_sm_single=registry.dl.kx.com/kxi-da-single:1.2.0 kxi_da_single=registry.dl.kx.com/kxi-da-single:1.2.0 kxi_q=registry.dl.kx.com/qce:3.1.0

Paths

custom_da_dir="\(PWD/src/da" custom_agg_dir="\)PWD/src/agg" cfg_dir="\(PWD/cfg" mnt_dir="\)PWD/db" tp_dir="$PWD/kdb-tick/sym" custom_dir="./custom"



## Assembly file

The *Assembly file* is the main business configuration for the database. Table schemas and logical process configuration are defined here. See [assembly configuration](./configuration_dap.html#assembly) for more information.

```yaml title="cfg/assembly.yaml"

name: integration-env
description: Data access assembly configuration
labels:
  region: New York
  assetClass: stocks

tables:
  trade:
    description: Trade data
    type: partitioned
    blockSize: 10000
    prtnCol: realTime
    sortColsOrd: sym
    sortColsDisk: sym
    columns:
      - name: time
        description: Time
        type: timespan
      - name: sym
        description: Symbol name
        type: symbol
        attrMem: grouped
        attrDisk: parted
        attrOrd: parted
      - name: realTime
        description: Real timestamp
        type: timestamp
      - name: price
        description: Trade price
        type: float
      - name: size
        description: Trade size
        type: long

  quote:
    description: Quote data
    type: partitioned
    blockSize: 10000
    prtnCol: realTime
    sortColsOrd: sym
    sortColsDisk: sym
    columns:
      - name: time
        description: Time
        type: timespan
      - name: sym
        description: Symbol name
        type: symbol
        attrMem: grouped
        attrDisk: parted
        attrOrd: parted
      - name: realTime
        description: Real timestamp
        type: timestamp
      - name: bid
        description: Bid price
        type: float
      - name: ask
        description: Ask price
        type: float
      - name: bidSize
        description: Bid size
        type: long
      - name: askSize
        description: Ask size
        type: long

bus:
  stream:
    protocol: custom
    nodes: tp:5010
    topic: dataStream

mounts:
  rdb:
    type: stream
    baseURI: file://stream
    partition: none
  idb:
    type: local
    baseURI: file:///data/idb
    partition: ordinal
  hdb:
    type: local
    baseURI: file:///data/hdb
    partition: date

elements:
  dap:
    gwAssembly: gw-assembly
    smEndpoints: sm:10001
    instances:
      dap:
        mountList: [rdb, idb, hdb]

  sm:
    description: Storage manager
    source: stream
    tiers:
      - name: stream
        mount: rdb
      - name: idb
        mount: idb
        schedule:
          freq: 0D00:10:00 # every 10 minutes
      - name: hdb1
        mount: hdb
        schedule:
          freq: 1D00:00:00 # every day
          snap:   01:35:00 # at 1:35 AM
        retain:
          time: 2 days
          rows: 200000
      - name: hdb2
        mount: hdb
        store: file:///data/hdbtier2
        retain:
          time: 5 weeks
          size: 2 TB
          rows: 10000000
      - name: hdb3
        mount: hdb
        store: file:///data/hdbtier3
        retain:
          time: 3 months
          size: 1 PB
          rows: 20000000
    disableDiscovery: true  # Disables registering with discovery

Docker compose

Each subservice of the database is deployed here as a separate Docker Compose file for clarity. Each of these could be combined into a single Docker Compose file instead.

Service Gateway

The Service Gateway configures the three containers that make up the gateway.

```yaml title="docker-compose-sg.yaml"

x-vols: &vols volumes: - ${cfg_dir}:/opt/kx/cfg - ${custom_agg_dir}:/opt/kx/custom # Optional mount for custom APIs

networks: kx: name: kx

services: sgrc: image: ${kxi_sg_rc} env_file: .env environment: - KXI_NAME=sg_rc - KXI_PORT=5050 networks: [kx] <<: *vols

sgagg: image: ${kxi_sg_agg} env_file: .env environment: - KXI_NAME=sg_agg - KXI_PORT=5060 - KXI_SG_RC_ADDR=sgrc:5050 - KXI_CUSTOM_FILE=/opt/kx/custom/custom.q # Optional for custom APIs deploy: # Optional: deploy multiple replicas. mode: replicated replicas: 3 networks: [kx] <<: *vols

sggw: image: ${kxi_sg_gw} env_file: .env environment: - GATEWAY_QIPC_PORT=5040 - GATEWAY_HTTP_PORT=8080 - KXI_SG_RC_ADDR=sgrc:5050 networks: [kx] deploy: # Optional: deploy multiple replicas. mode: replicated replicas: 1 <<: *vols


### Data Access Processes

A set of data access processes are configured, each of which connect to the Resource Coordinator of the Service Gateway launched above.

```yaml title="docker-compose-da.yaml"

networks:
  kx:
    name: kx

services:
  dap:
    image: ${kxi_da_single}
    command: -p 5080
    env_file: .env
    environment:
      - KXI_NAME=dap
      - KXI_SC=dap
      - KXI_PORT=5080
      - KXI_RT_LIB=/opt/kx/cfg/rt_tick_client_lib.q
      - KXI_ASSEMBLY_FILE=/opt/kx/cfg/assembly.yaml
      - KXI_SG_RC_ADDR=sgrc:5050
      - KXI_CUSTOM_FILE=/opt/kx/custom/custom.q # Optional for custom APIs
    volumes:
      - ${cfg_dir}:/opt/kx/cfg
      - ${mnt_dir}:/data
      - ${tp_dir}:/logs
      - ${custom_da_dir}:/opt/kx/custom # Optional mount for custom APIs
    networks: [kx]

Storage Manager

The Storage Manager is configured as a single container, allowing connections by Data Access Processes configured above.

```yaml title="docker-compose-sm.yaml"

networks: kx:

services: sm: image: ${kxi_sm_single} env_file: .env command: -p 10001 environment: - KXI_NAME=sm - KXI_SC=SM - KXI_ASSEMBLY_FILE=/opt/kx/cfg/assembly.yaml - KXI_RT_LIB=/opt/kx/cfg/rt_tick_client_lib.q volumes: - ${mnt_dir}:/data - ${tp_dir}:/logs - ${cfg_dir}:/opt/kx/cfg networks: [kx]


### Tickerplant

A standard tickerplant is put infront of the Storage Manager and Data Access Process to provide durable data ingestion.

!!! note Reliable Transport

    Within *KX Insights Platform*, the transport used is *KX Insights Reliable Transport* rather than a tickerplant. This allows for fault-tolerance and durable messaging despite potentially unreliable network connections. When using a standard tickerplant instead, the interface must adhere to the API expected by RT.

``` yaml title="docker-compose-tp.yaml"

networks:
  kx:
    name: kx

services:
  tp:
    image: ${kxi_q}
    env_file: .env
    command: tick.q sym /logs/ -p 5010
    working_dir: /opt/kx/tick
    volumes:
      - ${tp_dir}:/logs
      - ${tp_dir}/..:/opt/kx/tick
    networks: [kx]

Custom API code

Custom code is entirely optional, but allows creation of custom APIs to reside within Data Access Processes and Aggregators. These APIs are then registered with the Service Gateway to provide unified access across tiers to clients.

```q title="src/da/custom.q"

// Sample DA custom file.

// Can load other files within this file. Note that the current directory // is the directory of this file (in this example: /opt/kx/custom). / \l subFolder/otherFile1.q / \l subFolder/otherFile2.q

// // @desc Define a new API. Counts number of entries by specified columns. // // @param table {symbol} Table name. // @param byCols {symbol|symbol[]} Column(s) to count by. // @param startTS {timestamp} Start time (inclusive). // @param endTS {timestamp} End time (exclusive). // // @return {table} Count by specified columns. // countBy:{[table;startTS;endTS;byCols] ?[table;enlist(within;realTime;(startTS;endTS-1));{x!x,:()}byCols;enlist[cnt]!enlist(count;`i)] }

// Register with the DA process. .da.registerAPI[countBy; .sapi.metaDescription["Define a new API. Counts number of entries by specified columns."], .sapi.metaParam[nametypeisReqdescription!(table;-11h;1b;"Table name.")], .sapi.metaParam[nametypeisReqdescription!(byCols;-11 11h;1b;"Column(s) to count by.")], .sapi.metaParam[nametypeisReqdescription!(startTS;-12h;1b;"Start time (inclusive).")], .sapi.metaParam[nametypeisReqdescription!(endTS;-12h;1b;"End time (exclusive).")], .sapi.metaReturn[typedescription!(98h;"Count by specified columns.")], .sapi.metaMisc[enlist[safe]!enlist 1b] ]


``` q title="src/agg/custom.q"

// Sample Agg custom file.

// Can load other files within this file. Note that the current directory
// is the directory of this file (in this example: /opt/kx/custom).
/ \l subFolder/otherFile1.q
/ \l subFolder/otherFile2.q

//
// @desc An override to the default ping aggregation function. Instead of doing a raze,
// we just take the min (so true indicates all targets successful).
//
// @param res   {boolean[]} Results from the DAPs.
//
// @return      {boolean}   Min of all DAP results.
//
pingAggOverride:{[res]
    .sapi.ok min res
    }


//
// @desc Agg function that does a plus join on a list of tables.
//
// @param tbls  {table[]}   List plus-joinable tables.
//
// @return      {table}     Plus join.
//
pjAgg:{[tbls]
    .sapi.ok (pj/)tbls
    }


//
// @desc Agg function that does an average daily count by sym.
//
// @param tbls  {table[]}   List of tables with ``` `sym`date`cnt``` columns.
//
// @return      {table}     Average count by sym
//
avAgg:{[tbls]
    res:select sum cnt by sym,date from raze 0!'tbls; / Join common dates
    .sapi.ok select avg cnt by sym from res / Average
    }


//
// In order to be usable, aggregation functions MUST be registered with the Agg process. When registering,
// one can also set the aggregation function as the default aggregation function for one or more APIs.
// For example, Suppose we had an API defined in the DAPs that peforms a "count by" operation on a table:
//
// countBy:{[table;startTS;endTS;byCols]
//     ?[table;enlist(within;`realTime;(startTS;endTS-1));{x!x,:()}byCols;enlist[`cnt]!enlist(count;`i)]
//     }
//
// We can then register our aggregations functions thusly:
//
.sgagg.registerAggFn[`pingAggOverride;
    .sapi.metaDescription["Custom override to .kxi.ping"],
    .sapi.metaParam[`name`type`description!(`res;1h;"List of booleans indicating ping was successful")],
    .sapi.metaReturn[`type`description!(-1h;"The worst of all results")];
    `$()
    ]

.sgagg.registerAggFn[`pjAgg;
    .sapi.metaDescription["Plus join aggregation"],
    .sapi.metaParam[`name`type`description!(`tbls;0h;"Tables received from DAPs")],
    .sapi.metaReturn`type`description!(98h;"The plus join (over) of the tables");
    `countBy]; // Register as default aggregation function for this API

.sgagg.registerAggFn[`avAgg;
    .sapi.metaDescription["Average join aggregation"],
    .sapi.metaParam[`name`type`description!(`tbls;0h;"Tables received from DAPs")],
    .sapi.metaReturn`type`description!(98h;"The average join (over) of the tables");
    `$()
    ]

//
// Note also that an aggregation function can be default aggregation function for multiple APIs. E.g.
//  .sgagg.registerAggFn[`myAggFn;();`api1`api2`api3]
//

RT client library

To connect to the tickerplant, both Data Access Processes and the Storage Manager are configured with a custom script defining connection details and providing the necessary .rt.* APIs expected by the services.

```q title="cfg/rt_tick_client_lib.q"

// === internal tables without time/sym columns === .rt.NO_TIME_SYM:`$("_prtnEnd";"_reload")

// === rt publish and push functions === .rt.push:{'"cannot push unless you have called .rt.pub first"}; // will be overridden

.rt.pub:{[topic] if[not 10h=type topic;'"topic must be a string"]; h:neg hopen hsym$getenvKXI_RT_NODES; .rt.push:{[nph;payload] if[type x:last payload; x:value flip x]; if[(t:first payload)in .rt.NO_TIME_SYM; x:(count[first x]#'(0Nn;)),x]; nph(.u.upd;t;x);}[h;]; .rt.push }

// === rt update and subscribe ===

if[upd in key.; '"do not define upd: rt+tick will implement this"]; if[end in key.u; '"do not define .u.end: rt+tick will implement this"];

if[not type key`.rt.upd; .rt.upd:{[payload;idx] '"need to implement .rt.upd"}];

.rt.sub:{[topic;startIdx;uf] if[not 10h=type topic;'"topic must be a string"];

//connect to the tickerplant h:hopen hsym$getenvKXI_RT_NODES;

//initialise our message counter .rt.idx:0;

// === tick.q will call back to these === upd::{[uf;t;x] if[not type x; x:flip(cols .rt.schema t)!x]; // for log replay if[t in .rt.NO_TIME_SYM; x:timesym _x]; uf[(t;x);.rt.idx]; .rt.idx+:1; }[uf];

.u.end:{.rt.idx:.rt.date2startIdx x+1};

//replay log file and continue the live subscription if[null startIdx;startIdx:0W]; // null means follow only, not start from beginning

//subscribe res:h "(.u.sub[;]; .u iL; .u.d)"; .rt.schema:(!/)flip res 0; // used to convert arrays to tables during log replay

//if start index is less than current index, then recover if[startIdx<.rt.idx:(.rt.date2startIdx res 2)+res[1;0]; .rt.recoverMultiDay[res[1];startIdx]]; }

//100 billion records per day .rt.MAX_LOG_SZ:"j"$1e11;

.rt.date2startIdx:{("J"$(string x) except ".")*.rt.MAX_LOG_SZ};

.rt.recoverMultiDay:{[iL;startIdx] //iL - index and Log (as can be fed into -11!) i:first iL; L:last iL; //get all files in the same folder as the tp log file files:key dir:first pf:vs last L; //get the name of the logfile itself fileName:last pf; //get all the lognameXXXX.XX.XX files (logname is sym by default - so usually the files are of the form sym2021.01.01, sym2021.01.02, sym2021.01.03, etc) files:files where files like (-10_ string fileName),"*"; //from those files, get those with dates in the range we are interested in files: sv/: dir,/:asc files where ("J"$(-10#/:string files) except: ".")>=startIdx div .rt.MAX_LOG_SZ; //set up upd to skip the first part of the file and revert to regular definition when you hit start index upd::{[startIdx;updo;t;x] \([.rt.idx>=startIdx; [upd::updo; upd[t;x]]; .rt.idx+:1]}[startIdx;upd]; //read all of all the log files except the last, where you read up to 'i' files:0W,/:files; files[(count files)-1;0]:i; //reset .rt.idx for each new day and replay the log file {.rt.idx:.rt.date2startIdx "D"\)-10#string x 1; -11!x}each files; };

//100 billion updates per day - 1e11 //30210610*1e11 ```