Reliable Transport (RT) quickstart guide
This guide is intended to allow users to launch RT plus a sample publisher and subscriber. There are illustrations for doing so using Docker Compose and Kubernetes.
For demonstration only
The kxi-rt-q-pub-eval
and kxi-rt-q-sub-eval
images are sample images for demonstration only, they are not supported by KX.
RT Cluster Size
RT supports a 1 or 3 node cluster, based on the RT_REPLICAS
environment variable:
-
In a 3 node cluster, RT can offer fault tolerance and high availability (HA). If one of the three nodes were to go offline for a period, the remaining two nodes could continue to send data to downstream subscribers.
-
In a 1 node cluster, fault tolerance is still present, however it is not highly available. If the 1 RT node were to go down, the publisher would continue writing to it's local RT log file. Once the RT node has restarted, it would then obtain any data that is present on the publisher node which it has not yet received.
Docker registry login
In order to be able to pull down the relevant images kxi-rt
, kxi-rt-q-pub-eval
and kxi-rt-q-sub-eval
, logging into a docker registry is required.
docker login registry.dl.kx.com -u username -p password
Docker Requirements
The RT image is built as an x64 rockylinux:8 container but uses functionality which requires that the Linux kernel be 4.18.0 or later. Because containers use the kernel from the docker host, in order to run the RT container, the docker host must have such a kernel. Examples of Linux distros with a 4.18.0 kernel include:
- CentOS 8 or later
- Red Hat Enterprise Linux (RHEL) 8 or later
- Ubuntu 18.04 or later
Using WSL2 as the docker host is also supported but it is recommended that you update the WSL2 kernel to the latest version.
Note
Running the RT container is only supported where the docker host is running Linux on an x64 architecture.
Publisher
A docker image is available which will send data to an RT stream. In this example, the publisher is sending data, as part of a table trade
to an RT stream.
The data is sent first at startup, when the image launches and then on a timer, using KX's native timer function .z.ts
.
A publisher will start a set of push_client
replicators.
Replicator count
The number of replicators the publisher starts needs to match the size of the RT cluster it is sending data to.
Environment Variables
RT_STREAM
An RT stream identifier, this is used along with theRT_TOPIC_PREFIX
, to create the RT hostnames that the publisher is to communicate with.RT_TOPIC_PREFIX
The prefix used to locate other nodes in the RT cluster via hostnames or DNS, e.g.rt-
. Intra-RT connections are made using${RT_TOPIC_PREFIX}${RT_STREAM}-[0 ... n]
RT_LOG_PATH
The directory where the publisher's message log is stored.RT_RAFT_CLUSTER
Relevant when using deduplication. Used to specify the endpoint prefix for the RT cluster, e.g.rt-data
. See Deduplication' section below.PUB_TIMER_FREQ
the frequency (in milliseconds) at which messages are published to a RT stream.PUB_TIMER_MSG
the amount of messages which are published everyPUB_TIMER_FREQ
milliseconds.RT_REPLICAS
the number ofpush_client
replicators to start. This should match thereplicaCount
of the RT stream that is being published to. Supported values are 1 or 3.
// schema
trade:([] time:`timestamp$(); sym:`$(); exch:`$(); side:`$(); price:`float$(); size:`long$(); tradeID:`guid$());
// variables
.pub.N:"J"$first getenv`PUB_TIMER_MSG; // the number of messages to publish on a timer
.pub.timerFreq:"J"$getenv`PUB_TIMER_FREQ; // the frequency at which the messages are published
// functions
.fin.getData:{
([] time:.z.p; // current time in UTC
sym:.pub.N?`AAPL`FDP`GOOG; // randomly choose a value between AAPL, GOOG, FDP
exch:.pub.N?`NYSE`LSE; // randomly choose between NYSE or LSE
side:.pub.N?`buy`sell; // randomly decide between buy and sell
price:.pub.N?10.; // random value between 0 and 10.
size:.pub.N?10000; // random value between 0 and 10000
tradeID:.pub.N?0Ng) // random guid.
}
.fin.timerFunction:{
tradeData:.fin.getData .pub.N; // get random trade data
.fin.publish (`.b; `trade; tradeData); // write data to local log which will be replicated to RT nodes
}
.z.ts:.fin.timerFunction; // define the timer function
init:{
.log.info "RT_LOG_PATH:", getenv`RT_LOG_PATH; // log the location to which the RT logs will be written
stream:getenv`RT_STREAM; // read in the env variable defined, type string
path:getenv`RT_LOG_PATH;
prefix:getenv`RT_TOPIC_PREFIX; // read in the env variable defined, type string
replicas:3^"I"$getenv`RT_REPLICAS // number of replicas, defaults to 3 if not defined;
params:`stream`path`topic_prefix`replicas!(stream;path;prefix;replicas);
.fin.publish:.rt.pub params; // create connection to a RT stream by starting replicators.
.fin.timerFunction[]; // publish .pub.N messages at startup
system"t ",string .pub.timerFreq; // turn on the timer
}
init[];
Push Client Replicators
Upon calling .rt.pub
, a number of push_client replicators will be started. These will connect to the internal a number of push_server replicators running on each of the RT sequencers. In the case of a 3 node RT cluster, the endpoints that connenct to will be:
${RT_TOPIC_PREFIX}${RT_STREAM}-0:5002
${RT_TOPIC_PREFIX}${RT_STREAM}-1:5002
${RT_TOPIC_PREFIX}${RT_STREAM}-2:5002
Deduplication
Details on deduplication can be found here
Subscriber
A docker image is available which will receive data from an RT stream. In this example, the subscriber is receiving data for a table trade
from an RT stream.
A subscriber will start a set of pull_client replicators.
Replicator count
The number of replicators it starts needs to match the size of the RT cluster it is sending data to.
Environment Variables
RT_STREAM
An RT stream identifier, this is used along with theRT_TOPIC_PREFIX
, to create the RT hostnames that the subscriber is to communicate with.RT_TOPIC_PREFIX
The prefix used to locate other nodes in the RT cluster via hostnames or DNS, e.g.rt-
. Intra-RT connections are made using${RT_TOPIC_PREFIX}${RT_STREAM}-[0 ... n]
RT_LOG_PATH
The directory in the persistent volume where publisher's message log is stored.RT_REPLICAS
the number ofpull_client
repicators to start. This should match thereplicaCount
of the RT stream that is being subscribed to. Supported values are 1 or 3.
// schemas
trade:([] time:`timestamp$(); sym:`$(); exch:`$(); side:`$(); price:`float$(); size:`long$(); tradeID:`guid$());
events:([]event:();pos:());
// dictionaries
tab_counts:(`$())!"j"$();
// functions
.fin.upd.trade:{[x]
insert[`trade;x];
}
.fin.trade.unknown:{[t;x]
msg[1] set update updateTS:.z.p from x;
show msg;
}
.fin.timerFunction:{
show tab_counts;
}
// callback functions, upd and event_handlers, called upon receiving a message from RT
upd:{[msg;pos]
tab_counts[msg 1]+:count msg 2; // keep a count of the number of records per table in a dictionary
if[msg[1] in tables[]; @[.fin.upd[msg 1]; msg 2; {show x}]]; // received a defined table
if[not msg[1] in tables[]; .fin.trade.unknown[msg 1;msg 2]]; // received an unexpected update
};
event_handler:{`events upsert 0N!`event`pos!(x;y)};
.z.ts:.fin.timerFunction
init:{
show`$"Start Subscription";
RT_STREAM:getenv`RT_STREAM;
.rt.sub[RT_STREAM; 0; `message`event!(upd;event_handler)]; // subscribe to the RT stream
system "t 5000"; // set the timer value to be 5 seconds
}
init[];
Pull Client Replicators
Upon calling .rt.sub
with the arguments above, a number, based on the RT_REPLICAS
environment variable, of pull_client replicators will be started. These will connect to the internal pull_server replicators running on each of the RT sequencers. In the case of a 3 node RT cluster, the endpoints that connenct to will be:
${RT_TOPIC_PREFIX}${RT_STREAM}-0:5001
${RT_TOPIC_PREFIX}${RT_STREAM}-1:5001
${RT_TOPIC_PREFIX}${RT_STREAM}-2:5001