Skip to content

Reliable Transport (RT) quickstart guide

This guide is intended to allow users to launch RT plus a sample publisher and subscriber. There are illustrations for doing so using Docker Compose and Kubernetes.

For demonstration only

The kxi-rt-q-pub-eval and kxi-rt-q-sub-eval images are sample images for demonstration only, they are not supported by KX.

RT Cluster Size

RT supports a 1 or 3 node cluster, based on the RT_REPLICAS environment variable:

  • In a 3 node cluster, RT can offer fault tolerance and high availability (HA). If one of the three nodes were to go offline for a period, the remaining two nodes could continue to send data to downstream subscribers.

  • In a 1 node cluster, fault tolerance is still present, however it is not highly available. If the 1 RT node were to go down, the publisher would continue writing to it's local RT log file. Once the RT node has restarted, it would then obtain any data that is present on the publisher node which it has not yet received.

Docker registry login

In order to be able to pull down the relevant images kxi-rt, kxi-rt-q-pub-eval and kxi-rt-q-sub-eval, logging into a docker registry is required.

docker login registry.dl.kx.com -u username -p password

Docker Requirements

The RT image is built as an x64 rockylinux:8 container but uses functionality which requires that the Linux kernel be 4.18.0 or later. Because containers use the kernel from the docker host, in order to run the RT container, the docker host must have such a kernel. Examples of Linux distros with a 4.18.0 kernel include:

  • CentOS 8 or later
  • Red Hat Enterprise Linux (RHEL) 8 or later
  • Ubuntu 18.04 or later

Using WSL2 as the docker host is also supported but it is recommended that you update the WSL2 kernel to the latest version.

Note

Running the RT container is only supported where the docker host is running Linux on an x64 architecture.

Publisher

A docker image is available which will send data to an RT stream. In this example, the publisher is sending data, as part of a table trade to an RT stream. The data is sent first at startup, when the image launches and then on a timer, using KX's native timer function .z.ts.

A publisher will start a set of push_client replicators.

Replicator count

The number of replicators the publisher starts needs to match the size of the RT cluster it is sending data to.

Environment Variables

  • RT_STREAM An RT stream identifier, this is used along with the RT_TOPIC_PREFIX, to create the RT hostnames that the publisher is to communicate with.
  • RT_TOPIC_PREFIX The prefix used to locate other nodes in the RT cluster via hostnames or DNS, e.g. rt-. Intra-RT connections are made using ${RT_TOPIC_PREFIX}${RT_STREAM}-[0 ... n]
  • RT_LOG_PATH The directory where the publisher's message log is stored.
  • RT_RAFT_CLUSTER Relevant when using deduplication. Used to specify the endpoint prefix for the RT cluster, e.g. rt-data. See Deduplication' section below.
  • PUB_TIMER_FREQ the frequency (in milliseconds) at which messages are published to a RT stream.
  • PUB_TIMER_MSG the amount of messages which are published every PUB_TIMER_FREQ milliseconds.
  • RT_REPLICAS the number of push_client replicators to start. This should match the replicaCount of the RT stream that is being published to. Supported values are 1 or 3.
// schema
trade:([] time:`timestamp$(); sym:`$(); exch:`$(); side:`$(); price:`float$(); size:`long$(); tradeID:`guid$());

// variables
.pub.N:"J"$first getenv`PUB_TIMER_MSG;  // the number of messages to publish on a timer
.pub.timerFreq:"J"$getenv`PUB_TIMER_FREQ;  // the frequency at which the messages are published

// functions
.fin.getData:{
    ([] time:.z.p; // current time in UTC
        sym:.pub.N?`AAPL`FDP`GOOG;  // randomly choose a value between AAPL, GOOG, FDP
        exch:.pub.N?`NYSE`LSE;  // randomly choose between NYSE or LSE
        side:.pub.N?`buy`sell;  // randomly decide between buy and sell
        price:.pub.N?10.;  // random value between 0 and 10.
        size:.pub.N?10000;  // random value between 0 and 10000
        tradeID:.pub.N?0Ng)  // random guid.
    }
.fin.timerFunction:{
  tradeData:.fin.getData .pub.N;      // get random trade data
  .fin.publish (`.b; `trade; tradeData);  // write data to local log which will be replicated to RT nodes
  }
.z.ts:.fin.timerFunction; // define the timer function

init:{
  .log.info "RT_LOG_PATH:", getenv`RT_LOG_PATH;  // log the location to which the RT logs will be written
  stream:getenv`RT_STREAM;      // read in the env variable defined, type string
  path:getenv`RT_LOG_PATH;
  prefix:getenv`RT_TOPIC_PREFIX;      // read in the env variable defined, type string
  replicas:3^"I"$getenv`RT_REPLICAS  // number of replicas, defaults to 3 if not defined;
  params:`stream`path`topic_prefix`replicas!(stream;path;prefix;replicas);
  .fin.publish:.rt.pub params; // create connection to a RT stream by starting replicators.
  .fin.timerFunction[]; // publish .pub.N messages at startup
  system"t ",string .pub.timerFreq;  // turn on the timer
  }
init[];

Push Client Replicators

Upon calling .rt.pub, a number of push_client replicators will be started. These will connect to the internal a number of push_server replicators running on each of the RT sequencers. In the case of a 3 node RT cluster, the endpoints that connenct to will be:

  • ${RT_TOPIC_PREFIX}${RT_STREAM}-0:5002
  • ${RT_TOPIC_PREFIX}${RT_STREAM}-1:5002
  • ${RT_TOPIC_PREFIX}${RT_STREAM}-2:5002

Deduplication

Details on deduplication can be found here

Subscriber

A docker image is available which will receive data from an RT stream. In this example, the subscriber is receiving data for a table trade from an RT stream.

A subscriber will start a set of pull_client replicators.

Replicator count

The number of replicators it starts needs to match the size of the RT cluster it is sending data to.

Environment Variables

  • RT_STREAM An RT stream identifier, this is used along with the RT_TOPIC_PREFIX, to create the RT hostnames that the subscriber is to communicate with.
  • RT_TOPIC_PREFIX The prefix used to locate other nodes in the RT cluster via hostnames or DNS, e.g. rt-. Intra-RT connections are made using ${RT_TOPIC_PREFIX}${RT_STREAM}-[0 ... n]
  • RT_LOG_PATH The directory in the persistent volume where publisher's message log is stored.
  • RT_REPLICAS the number of pull_client repicators to start. This should match the replicaCount of the RT stream that is being subscribed to. Supported values are 1 or 3.
// schemas 
trade:([] time:`timestamp$(); sym:`$(); exch:`$(); side:`$(); price:`float$(); size:`long$(); tradeID:`guid$());
events:([]event:();pos:());

// dictionaries 
tab_counts:(`$())!"j"$();

// functions
.fin.upd.trade:{[x]
  insert[`trade;x];
  }
.fin.trade.unknown:{[t;x]
  msg[1] set update updateTS:.z.p from x;
  show msg;
  }
.fin.timerFunction:{
  show tab_counts;
  }


// callback functions, upd and event_handlers, called upon receiving a message from RT
upd:{[msg;pos]
  tab_counts[msg 1]+:count msg 2;  // keep a count of the number of records per table in a dictionary
  if[msg[1] in tables[]; @[.fin.upd[msg 1]; msg 2; {show x}]];  // received a defined table
  if[not msg[1] in tables[]; .fin.trade.unknown[msg 1;msg 2]];  // received an unexpected update
  };
event_handler:{`events upsert 0N!`event`pos!(x;y)};

.z.ts:.fin.timerFunction

init:{
  show`$"Start Subscription";
  RT_STREAM:getenv`RT_STREAM;
  .rt.sub[RT_STREAM; 0; `message`event!(upd;event_handler)];  // subscribe to the RT stream
  system "t 5000"; // set the timer value to be 5 seconds
  }
init[];

Pull Client Replicators

Upon calling .rt.sub with the arguments above, a number, based on the RT_REPLICAS environment variable, of pull_client replicators will be started. These will connect to the internal pull_server replicators running on each of the RT sequencers. In the case of a 3 node RT cluster, the endpoints that connenct to will be:

  • ${RT_TOPIC_PREFIX}${RT_STREAM}-0:5001
  • ${RT_TOPIC_PREFIX}${RT_STREAM}-1:5001
  • ${RT_TOPIC_PREFIX}${RT_STREAM}-2:5001