Reliable Transport (RT) quickstart guide
This guide is intended to allow users to launch RT plus a sample publisher and subscriber. There are illustrations for doing so using Docker Compose and Kubernetes.
For demonstration only
The kxi-rt-q-pub-eval
and kxi-rt-q-sub-eval
images are sample images for demonstration only, they are not supported by KX.
RT Cluster Size
RT supports a 1 or 3 node cluster, based on the RT_REPLICAS
environment variable:
-
In a 3 node cluster, RT can offer fault tolerance and high availability (HA). If one of the three nodes were to go offline for a period, the remaining two nodes could continue to send data to downstream subscribers.
-
In a 1 node cluster, fault tolerance is still present, however it is not highly available. If the 1 RT node were to go down, the publisher would continue writing to it's local RT log file. Once the RT node has restarted, it would then obtain any data that is present on the publisher node which it has not yet received.
Docker login
In order to be able to pull down the relevant images kxi-rt
, kxi-rt-q-pub-eval
and kxi-rt-q-sub-eval
, logging into a docker registry is required.
docker login registry.dl.kx.com -u username -p password
Publisher
A docker image is available which will send data to an RT stream. In this example, the publisher is sending data, as part of a table trade
to an RT stream.
The data is sent on a timer, using KX's native timer function .z.ts
.
A publisher will start a set of push_client
replicators.
Replicator count
The number of replicators the publisher starts needs to match the size of the RT cluster it is sending data to.
Environment Variables
RT_STREAM
An RT stream identifier, this is used along with theRT_TOPIC_PREFIX
, to create the RT hostnames that the publisher is to communicate with.RT_TOPIC_PREFIX
The prefix used to locate other nodes in the RT cluster via hostnames or DNS, e.g.rt-
. Intra-RT connections are made using${RT_TOPIC_PREFIX}${RT_STREAM}-[0 ... n]
RT_LOG_PATH
The directory where tpublisher's message log is stored.RT_RAFT_CLUSTER
Relevant when using deduplication. Used to specify the endpoint prefix for the RT cluster, e.g.rt-data
. See Deduplication' section below.PUB_TIMER_FREQ
the frequency (in milliseconds) at which messages are published to a RT stream.PUB_TIMER_MSG
the amount of messages which are published everyPUB_TIMER_FREQ
milliseconds.RT_REPLICAS
the number ofpush_client
repicators to start. This should match thereplicaCount
of the RT stream that is being published to. Supported values are 1 or 3.
// schema
trade:([] time:`timestamp$(); sym:`$(); exch:`$(); side:`$(); price:`float$(); size:`long$(); tradeID:`guid$());
// variables
.pub.N:"J"$first getenv`PUB_TIMER_MSG; // the number of messages to publish on a timer
.pub.timerFreq:"J"$getenv`PUB_TIMER_FREQ; // the frequency at which the messages are published
// functions
.fin.getData:{
([] time:.z.p; // current time in UTC
sym:.pub.N?`AAPL`FDP`GOOG; // randomly choose a value between AAPL, GOOG, FDP
exch:.pub.N?`NYSE`LSE; // randomly choose between NYSE or LSE
side:.pub.N?`buy`sell; // randomly decide between buy and sell
price:.pub.N?10.; // random value between 0 and 10.
size:.pub.N?10000; // random value between 0 and 10000
tradeID:.pub.N?0Ng) // random guid.
}
.fin.timerFunction:{
tradeData:.fin.getData .pub.N; // get random trade data
.rt.publish (`.b; `trade; tradeData); // write data to local log which will be replicated to RT nodes
}
.z.ts:.fin.timerFunction; // define the timer function
init:{
.log.info "RT_LOG_PATH:", getenv`RT_LOG_PATH; // log the location to which the RT logs will be written
RT_STREAM:getenv`RT_STREAM; // read in the env variable defined, type string
.rt.publish:.rt.pub[RT_STREAM]; // create connection to a RT stream
system"t ",string .pub.timerFreq; // turn on the timer
}
init[];
Push Client Replicators
Upon calling .rt.pub
, a number of push_client replicators will be started. These will connect to the internal a number of push_server replicators running on each of the RT sequencers. In the case of a 3 node RT cluster, the endpoints that connenct to will be:
${RT_TOPIC_PREFIX}${RT_STREAM}-0:5002
${RT_TOPIC_PREFIX}${RT_STREAM}-1:5002
${RT_TOPIC_PREFIX}${RT_STREAM}-2:5002
Deduplication
Details on deduplication can be found here
Subscriber
A docker image is available which will receive data to a RT stream. In this example, the subscriber is receiving data for a table trade
from a RT stream.
A subscriber will start a set of pull_client replicators.
Replicator count
The number of replicators it starts needs to match the size of the RT cluster it is sending data to.
Environment Variables
RT_STREAM
An RT stream identifier, this is used along with theRT_TOPIC_PREFIX
, to create the RT hostnames that the subscriber is to communicate with.RT_TOPIC_PREFIX
The prefix used to locate other nodes in the RT cluster via hostnames or DNS, e.g.rt-
. Intra-RT connections are made using${RT_TOPIC_PREFIX}${RT_STREAM}-[0 ... n]
RT_LOG_PATH
The directory in the persistent volume where publisher's message log is stored.RT_REPLICAS
the number ofpull_client
repicators to start. This should match thereplicaCount
of the RT stream that is being subscribed to. Supported values are 1 or 3.
// schemas
trade:([] time:`timestamp$(); sym:`$(); exch:`$(); side:`$(); price:`float$(); size:`long$(); tradeID:`guid$());
events:([]event:();pos:());
// dictionaries
tab_counts:(`$())!"j"$();
// functions
.fin.upd.trade:{[x]
insert[`trade;x];
}
.fin.trade.unknown:{[t;x]
msg[1] set update updateTS:.z.p from x;
show msg;
}
.fin.timerFunction:{
show tab_counts;
}
// callback functions, upd and event_handlers, called upon receiving a message from RT
upd:{[msg;pos]
tab_counts[msg 1]+:count msg 2; // keep a count of the number of records per table in a dictionary
if[msg[1] in tables[]; @[.fin.upd[msg 1]; msg 2; {show x}]]; // received a defined table
if[not msg[1] in tables[]; .fin.trade.unknown[msg 1;msg 2]]; // received an unexpected update
};
event_handler:{`events upsert 0N!`event`pos!(x;y)};
.z.ts:.fin.timerFunction
init:{
show`$"Start Subscription";
RT_STREAM:getenv`RT_STREAM;
.rt.sub[RT_STREAM; 0; `message`event!(upd;event_handler)]; // subscribe to the RT stream
system "t 5000"; // set the timer value to be 5 seconds
}
init[];
Pull Client Replicators
Upon calling .rt.sub
with the arguments above, a number, based on the RT_REPLICAS
environment variable, of pull_client replicators will be started. These will connect to the internal pull_server replicators running on each of the RT sequencers. In the case of a 3 node RT cluster, the endpoints that connenct to will be:
${RT_TOPIC_PREFIX}${RT_STREAM}-0:5001
${RT_TOPIC_PREFIX}${RT_STREAM}-1:5001
${RT_TOPIC_PREFIX}${RT_STREAM}-2:5001