Reliable Transport (RT) quickstart guide
This guide is intended to allow users to launch RT plus a sample publisher and subscriber. There are illustrations for doing so using Docker Compose and Kubernetes.
For demonstration only
The kxi-rt-q-pub-eval
and kxi-rt-q-sub-eval
images are sample images for demonstration only, they are not supported by KX.
Docker login
In order to be able to pull down the relevant images kxi-rt
, kxi-rt-q-pub-eval
and kxi-rt-q-sub-eval
, logging into a docker registry is required.
docker login registry.dl.kx.com -u username -p password
Publisher
A docker image is available which will send data to an RT stream. In this example, the publisher is sending data, as part of a table trade
to an RT stream.
The data is sent on a timer, using KX's native timer function .z.ts
.
A publisher will start a set of push_client
replicators.
Replicator count
The number of replicators the publisher starts needs to match the size of the RT cluster it is sending data to.
Environment Variables
RT_STREAM
An RT stream identifier, this is used along with theRT_TOPIC_PREFIX
, to create the RT hostnames that the publisher is to communicate with.RT_TOPIC_PREFIX
The prefix used to locate other nodes in the RT cluster via hostnames or DNS, e.g.rt-
. Intra-RT connections are made using${RT_TOPIC_PREFIX}${RT_STREAM}-[0 ... n]
RT_LOG_PATH
The directory where tpublisher's message log is stored.RT_RAFT_CLUSTER
Relevant when using deduplication. Used to specify the endpoint prefix for the RT cluster, e.g.rt-data
. See Deduplication' section below.PUB_TIMER_FREQ
the frequency (in milliseconds) at which messages are published to a RT stream.PUB_TIMER_MSG
the amount of messages which are published everyPUB_TIMER_FREQ
milliseconds.RT_REPLICAS
the number ofpush_client
repicators to start. This should match thereplicaCount
of the RT stream that is being published to. Supported values are 1 or 3.
// schema
trade:([] time:`timestamp$(); sym:`$(); exch:`$(); side:`$(); price:`float$(); size:`long$(); tradeID:`guid$());
// variables
.pub.N:"J"$first getenv`PUB_TIMER_MSG; // the number of messages to publish on a timer
.pub.timerFreq:"J"$getenv`PUB_TIMER_FREQ; // the frequency at which the messages are published
// functions
.fin.getData:{
([] time:.z.p; // current time in UTC
sym:.pub.N?`AAPL`FDP`GOOG; // randomly choose a value between AAPL, GOOG, FDP
exch:.pub.N?`NYSE`LSE; // randomly choose between NYSE or LSE
side:.pub.N?`buy`sell; // randomly decide between buy and sell
price:.pub.N?10.; // random value between 0 and 10.
size:.pub.N?10000; // random value between 0 and 10000
tradeID:.pub.N?0Ng) // random guid.
}
.fin.timerFunction:{
tradeData:.fin.getData .pub.N; // get random trade data
.rt.publish (`.b; `trade; tradeData); // write data to local log which will be replicated to RT nodes
}
.z.ts:.fin.timerFunction; // define the timer function
init:{
.log.info "RT_LOG_PATH:", getenv`RT_LOG_PATH; // log the location to which the RT logs will be written
RT_STREAM:getenv`RT_STREAM; // read in the env variable defined, type string
.rt.publish:.rt.pub[RT_STREAM]; // create connection to a RT stream
system"t ",string .pub.timerFreq; // turn on the timer
}
init[];
Push Client Replicators
Upon calling .rt.pub
, a number of push_client replicators will be started. These will connect to the internal a number of push_server replicators running on each of the RT sequencers. In the case of a 3 node RT cluster, the endpoints that connenct to will be:
${RT_TOPIC_PREFIX}${RT_STREAM}-0:5002
${RT_TOPIC_PREFIX}${RT_STREAM}-1:5002
${RT_TOPIC_PREFIX}${RT_STREAM}-2:5002
Deduplication
Details on deduplication can be found here
Subscriber
A docker image is available which will receive data to a RT stream. In this example, the subscriber is receiving data for a table trade
from a RT stream.
A subscriber will start a set of pull_client replicators.
Replicator count
The number of replicators it starts needs to match the size of the RT cluster it is sending data to.
Environment Variables
RT_STREAM
An RT stream identifier, this is used along with theRT_TOPIC_PREFIX
, to create the RT hostnames that the subscriber is to communicate with.RT_TOPIC_PREFIX
The prefix used to locate other nodes in the RT cluster via hostnames or DNS, e.g.rt-
. Intra-RT connections are made using${RT_TOPIC_PREFIX}${RT_STREAM}-[0 ... n]
RT_LOG_PATH
The directory in the persistent volume where publisher's message log is stored.RT_REPLICAS
the number ofpull_client
repicators to start. This should match thereplicaCount
of the RT stream that is being subscribed to. Supported values are 1 or 3.
// schemas
trade:([] time:`timestamp$(); sym:`$(); exch:`$(); side:`$(); price:`float$(); size:`long$(); tradeID:`guid$());
events:([]event:();pos:());
// dictionaries
tab_counts:(`$())!"j"$();
// functions
.fin.upd.trade:{[x]
insert[`trade;x];
}
.fin.trade.unknown:{[t;x]
msg[1] set update updateTS:.z.p from x;
show msg;
}
.fin.timerFunction:{
show tab_counts;
}
// callback functions, upd and event_handlers, called upon receiving a message from RT
upd:{[msg;pos]
tab_counts[msg 1]+:count msg 2; // keep a count of the number of records per table in a dictionary
if[msg[1] in tables[]; @[.fin.upd[msg 1]; msg 2; {show x}]]; // received a defined table
if[not msg[1] in tables[]; .fin.trade.unknown[msg 1;msg 2]]; // received an unexpected update
};
event_handler:{`events upsert 0N!`event`pos!(x;y)};
.z.ts:.fin.timerFunction
init:{
show`$"Start Subscription";
RT_STREAM:getenv`RT_STREAM;
.rt.sub[RT_STREAM; 0; `message`event!(upd;event_handler)]; // subscribe to the RT stream
system "t 5000"; // set the timer value to be 5 seconds
}
init[];
Pull Client Replicators
Upon calling .rt.sub
with the arguments above, a number, based on the RT_REPLICAS
environment variable, of pull_client replicators will be started. These will connect to the internal pull_server replicators running on each of the RT sequencers. In the case of a 3 node RT cluster, the endpoints that connenct to will be:
${RT_TOPIC_PREFIX}${RT_STREAM}-0:5001
${RT_TOPIC_PREFIX}${RT_STREAM}-1:5001
${RT_TOPIC_PREFIX}${RT_STREAM}-2:5001