Skip to content

Reliable Transport (RT) Overview

RT is a microarchitecture used within the KX Insights Platform for ensuring the reliable streaming of messages. It is designed to satisfy both high availability and high performance requirements.

RT uses a sequencer and replication strategy to ensure that messages are moved as quickly as possible between publishers and subscribers. Its basic operation consists of the following steps:

  • Publishers write events to a transaction stream file on local disk (think of writing a tickerplant log)

  • The stream file is replicated between publisher and subscribers (think a streaming rsync of the above file)

  • Subscribers consume the transaction stream file once it has been replicated (think tickerplant log replay)

Note

  • If multiple publishers are writing to the same stream, they are merged into a single unified and ordered stream file for the subscribers to consume.

  • Multiple subscribers can replicate the same stream file and there is no requirement for them to do so within a certain time frame (i.e. they don't have to "keep up" with the publisher).

This results in full decoupling of publishers and subscribers (i.e. publishers don't have to deal with slow subscribers).

RT as a tick replacement

RT can be used as a tick replacement, but it also provides the following benefits:

  • Configurable High Availability (however there is always a HA / throughput tradeoff)

  • Slow subscribers cannot affect the publisher.

Definitions

  • stream - this is synonymous with an instance of RT, whether it is a 1,3,5,7... node RT implementation. All the messages sent to the stream are replicated, merged and sequenced before being passed to the consumers. This stream name is used to identify which RT cluster will receive the messages sent by a producer.

Note

Within the KX Insights Platform, RT is restricted to a 3-node implementation.

  • topic - this is a subset of the messages on the stream that originated from a single publisher, or set of publishers on different hosts if you wish to deduplicate the messages.

Publishers must use unique topics

Each publisher on a given host MUST use a different topic name as the topic name and host name are combined to provide a unique directory name for the stream files before they are merged.

Overview

We will now look at the various components in more detail. RT includes a low-level publishing layer, with higher-level components layered on top which can be aware of the networking topology needed to ingest data into KX Insights.

This diagram shows RT being used in a 3-node RAFT configuration. RT Ingestion

RT clients can operate as:

  • Publisher (or producer): writes to its own stream file directory. Insights currently provides a Java-based and a C-based publisher.
  • Subscriber (or consumer): read from a stream file directory.

And these clients leverage the RT components:

  • Replicators: continuously, rapidly, and securely transport a stream file directory from one machine to another.
  • Sequencer: read from multiple stream file directories and produce a single merged directory that can itself be consumed.
  • Archiver: part of the sequencer which manages garbage collection of old merged log files

Publishers

In RT the publisher essentially just records messages in a stream file. A publisher has the following characteristics:

  • Includes a low-level log-writer.
  • Does zero networking.
  • The stream files are written to a specific directory on the publisher's host.
  • The stream file names have the format "log.X.Y" - where “X” is the session number and “Y” is the roll-index.
  • The directory is preserved between runs, but is not shared by multiple log-writers.
  • The stream files are rolled every 1GB. When the stream file is rolled and has been replicated to all RT nodes it becomes eligible for garbage collection. This means that a loss of networking won't result in data loss (assuming the network does come back and the publisher has enough disk space to queue during the blackout).

At startup, the publisher does local filesystem checks on the stream file directory, creates a new “session” file and starts the replicators needed to move data from its own stream directory into KX Insights Platform for consumers such as the stream processor or the storage manager to consume.

For more details on how to publish data to an RT stream see:

Deduplication

Deduplication is available for publishers who are sending the same messages. Use cases include:

  • Failure and restart of a single publisher from the last checkpoint. This ensures any messages sent between the last publisher checkpoint and a failover are not passed down the stream twice.

  • Multi node publishing of the same messages, to ensure high availability. This ensures only one copy of each message is sent down the stream but allows for one of the publishers to fail without data loss or duplication.

To enable deduplication topic names should be suffixed with .dedup.

Note

There must never be more than one publisher running on a single host using the same topic name at the same time.

Subscribers

Subscribing, like publishing, involves the subscribers monitoring the replicated merged files, using file-system events, responding to their changes and surfacing those changes to the component consumer.

The "RT Insights qpk" provides a subscriber that a consumer, such as the stream processor, uses. This takes the form of q messages and a “position” that can be used for restarting the subscriber from this point. (The "position" is opaque and cannot be used as a message index).

Note

The “position” is opaque and cannot be used as a message index.

Note

The stream file reader, which is part of the subscriber, is nearly as fast as shared-memory, parallel-not-sequential reads and the best/lowest-latency performance is running the consumer on same node as the consensus.

For more details on how to subscribe to an RT stream see the Interface Guide

Replicators

The replicator's job is to transport messages from one machine to another machine and each replicator is made up of a client and a server. Each of the client SDKs (ODBC and Java) include a push_client replicator, which sends the messages to the Insights Platform's push_server replicator, which receives the messages. Replicators have the following characteristics:

  • Keep track of the restart position on the consumer side, not the publisher side, for zero data loss/gaps​. The consumer initiates recovery​ i.e. it knows the “position” to restart from​, but the producer opens the (TCP) connections.

  • Use inotify() to detect data with low latency​.

  • Use mutually authenticated TLS to encrypt traffic across network boundaries.

  • Support topologies where either the producer or consumer can have a fixed address.

  • The replicator is expected to run indefinitely.

Sequencers

Combining more than one stream file into a single output is the responsibility of a sequencer. The sequencer is entirely deterministic: it chooses a sequence of events from any number of input stream directories and combines them into a plausible order in another directory.

The sequencer uses RAFT, which is suitable for ⅗/7-node topologies, and has the advantage that as long as the majority of nodes are up (a quorum) the output stream file can be constructed and delivered to the consumers. This means that "tickerplant" can continue to run even if up to half the RAFT nodes stop running.

The RAFT sequencer nodes work together as follows:

  1. Using the RAFT algorithm, the sequencers form a cluster and elect a leader.
  2. The cluster nodes communicate with the leader about how much of each published stream file they have replicated (locally to themselves).
  3. Using the above information, the leader determines what new information can be appended to the merged file. It publishes this back to the follower nodes as a merge instruction.
  4. Using the publisher files (which the nodes have gotten externally to the RAFT process) and merging those files into a single file using the instructions supplied by the RAFT leader.
  5. Since all sequencers have the same files and merge them using the same instruction set, then all of the merged files output by the various sequencers will be identical.
  6. Using the rules set out by RAFT, if a leader fails, a new leader will be elected. In this way up to half the nodes can fail before the cluster as a whole stops functioning.

Once a merged file has been appended to, a pull replicator replicates the merged file for a subscriber to consume.

Archiver

The RT Archiver runs as part of the sequencer and drives the garbage collection of all merged logs - those in RT and other subscribers - based on a configurable policy. This policy can specify:

  • Retention period for merged RT log files in minutes. Rolled merged log files which contain messages older than this (based on the message timestamp) are garbage collected.
  • Maximum size of all log files in RT. Rolled merged log files which push the total size beyond this limit are garbage collected, oldest first.
  • Maximum percentage of the available disk space that will be used by RT. When this percentage is exceeded rolled merged log files are garbage collected, oldest first. If not configured the default value of 90% is used.

When a log file on the sequencer is marked for garbage collection, this information is propagated to the subscribers such that the log files are also garbage collected there.

When pruning, the following additional constraints exist:

  • The newest stream file is never pruned.
  • Stream files roll every 1GB (or every session, whichever comes first).
  • The time considered is only the first timestamp in a stream file, so some time beyond the expiration can actually pass before the stream file is pruned. I.e. it must roll first.

Warning

The size of the disk associated with each RT and consumer pod MUST be larger than the limit defined.

Note

These archival options can be defined as part of the Insights Configuration.

RT Cluster Ports

port type notes
4000 TCP Sequencer used for the prometheus sidecar
5000 TCP Load balancer push server replicator (exposed outside of Kubernetes)
5001 TCP Internal pull server
5002 TCP Internal push server
7000 UDP Raft (exposed to other RT nodes)
7100 TCP Raft (exposed to other RT nodes)
8000 UDP Sequencer (exposed to other RT nodes)
9000 UDP Watcher (exposed to other RT nodes)