Skip to content

Reliable Transport (RT) quickstart guide

This guide is intended to allow users to launch RT plus a sample publisher and subscriber. There are illustrations for doing so using Docker Compose and Kubernetes.

For demonstration only

The kxi-rt-q-pub-eval and kxi-rt-q-sub-eval images are sample images for demonstration only, they are not supported by KX.

Docker login

In order to be able to pull down the relevant images kxi-rt, kxi-rt-q-pub-eval and kxi-rt-q-sub-eval, logging into a docker registry is required.

docker login registry.dl.kx.com -u username -p password

Publisher

A docker image is available which will send data to an RT stream. In this example, the publisher is sending data, as part of a table trade to an RT stream. The data is sent on a timer, using KX's native timer function .z.ts.

A publisher will start a set of push_client replicators.

Replicator count

The number of replicators the publisher starts needs to match the size of the RT cluster it is sending data to.

Environment Variables

  • RT_STREAM An RT stream identifier, this is used along with the RT_TOPIC_PREFIX, to create the RT hostnames that the publisher is to communicate with.
  • RT_TOPIC_PREFIX The prefix used to locate other nodes in the RT cluster via hostnames or DNS, e.g. rt-. Intra-RT connections are made using ${RT_TOPIC_PREFIX}${RT_STREAM}-[0 ... n]
  • RT_LOG_PATH The directory where tpublisher's message log is stored.
  • RT_RAFT_CLUSTER Relevant when using deduplication. Used to specify the endpoint prefix for the RT cluster, e.g. rt-data. See Deduplication' section below.
  • PUB_TIMER_FREQ the frequency (in milliseconds) at which messages are published to a RT stream.
  • PUB_TIMER_MSG the amount of messages which are published every PUB_TIMER_FREQ milliseconds.
  • RT_REPLICAS the number of push_client repicators to start. This should match the replicaCount of the RT stream that is being published to. Supported values are 1 or 3.
// schema
trade:([] time:`timestamp$(); sym:`$(); exch:`$(); side:`$(); price:`float$(); size:`long$(); tradeID:`guid$());

// variables
.pub.N:"J"$first getenv`PUB_TIMER_MSG;  // the number of messages to publish on a timer
.pub.timerFreq:"J"$getenv`PUB_TIMER_FREQ;  // the frequency at which the messages are published

// functions
.fin.getData:{
    ([] time:.z.p; // current time in UTC
        sym:.pub.N?`AAPL`FDP`GOOG;  // randomly choose a value between AAPL, GOOG, FDP
        exch:.pub.N?`NYSE`LSE;  // randomly choose between NYSE or LSE
        side:.pub.N?`buy`sell;  // randomly decide between buy and sell
        price:.pub.N?10.;  // random value between 0 and 10.
        size:.pub.N?10000;  // random value between 0 and 10000
        tradeID:.pub.N?0Ng)  // random guid.
    }
.fin.timerFunction:{
  tradeData:.fin.getData .pub.N;      // get random trade data
  .rt.publish (`.b; `trade; tradeData);  // write data to local log which will be replicated to RT nodes
  }
.z.ts:.fin.timerFunction; // define the timer function

init:{
  .log.info "RT_LOG_PATH:", getenv`RT_LOG_PATH;  // log the location to which the RT logs will be written
  RT_STREAM:getenv`RT_STREAM;      // read in the env variable defined, type string
  .rt.publish:.rt.pub[RT_STREAM]; // create connection to a RT stream
  system"t ",string .pub.timerFreq;  // turn on the timer
  }
init[];

Push Client Replicators

Upon calling .rt.pub, a number of push_client replicators will be started. These will connect to the internal a number of push_server replicators running on each of the RT sequencers. In the case of a 3 node RT cluster, the endpoints that connenct to will be:

  • ${RT_TOPIC_PREFIX}${RT_STREAM}-0:5002
  • ${RT_TOPIC_PREFIX}${RT_STREAM}-1:5002
  • ${RT_TOPIC_PREFIX}${RT_STREAM}-2:5002

Deduplication

Details on deduplication can be found here

Subscriber

A docker image is available which will receive data to a RT stream. In this example, the subscriber is receiving data for a table trade from a RT stream.

A subscriber will start a set of pull_client replicators.

Replicator count

The number of replicators it starts needs to match the size of the RT cluster it is sending data to.

Environment Variables

  • RT_STREAM An RT stream identifier, this is used along with the RT_TOPIC_PREFIX, to create the RT hostnames that the subscriber is to communicate with.
  • RT_TOPIC_PREFIX The prefix used to locate other nodes in the RT cluster via hostnames or DNS, e.g. rt-. Intra-RT connections are made using ${RT_TOPIC_PREFIX}${RT_STREAM}-[0 ... n]
  • RT_LOG_PATH The directory in the persistent volume where publisher's message log is stored.
  • RT_REPLICAS the number of pull_client repicators to start. This should match the replicaCount of the RT stream that is being subscribed to. Supported values are 1 or 3.
// schemas 
trade:([] time:`timestamp$(); sym:`$(); exch:`$(); side:`$(); price:`float$(); size:`long$(); tradeID:`guid$());
events:([]event:();pos:());

// dictionaries 
tab_counts:(`$())!"j"$();

// functions
.fin.upd.trade:{[x]
  insert[`trade;x];
  }
.fin.trade.unknown:{[t;x]
  msg[1] set update updateTS:.z.p from x;
  show msg;
  }
.fin.timerFunction:{
  show tab_counts;
  }


// callback functions, upd and event_handlers, called upon receiving a message from RT
upd:{[msg;pos]
  tab_counts[msg 1]+:count msg 2;  // keep a count of the number of records per table in a dictionary
  if[msg[1] in tables[]; @[.fin.upd[msg 1]; msg 2; {show x}]];  // received a defined table
  if[not msg[1] in tables[]; .fin.trade.unknown[msg 1;msg 2]];  // received an unexpected update
  };
event_handler:{`events upsert 0N!`event`pos!(x;y)};

.z.ts:.fin.timerFunction

init:{
  show`$"Start Subscription";
  RT_STREAM:getenv`RT_STREAM;
  .rt.sub[RT_STREAM; 0; `message`event!(upd;event_handler)];  // subscribe to the RT stream
  system "t 5000"; // set the timer value to be 5 seconds
  }
init[];

Pull Client Replicators

Upon calling .rt.sub with the arguments above, a number, based on the RT_REPLICAS environment variable, of pull_client replicators will be started. These will connect to the internal pull_server replicators running on each of the RT sequencers. In the case of a 3 node RT cluster, the endpoints that connenct to will be:

  • ${RT_TOPIC_PREFIX}${RT_STREAM}-0:5001
  • ${RT_TOPIC_PREFIX}${RT_STREAM}-1:5001
  • ${RT_TOPIC_PREFIX}${RT_STREAM}-2:5001