Skip to content

Reliable Transport (RT) Overview

The kdb Insights Reliable Transport (RT) is a microarchitecture for ensuring the reliable streaming of messages. It is designed to satisfy both high availability and high performance requirements. It is built upon the Raft consensus algorithm.

See here for more details on how to publish data to an instance of RT, also known as an kdb Insights Stream within kdb Insights Enterprise.

Overview

Unlike a 'traditional' messaging system which uses direct IPC (either synchronous or asynchronous with an ACK mechanism) to send messages between the publishers, broker and subscribers, RT uses log files. Publishers asynchronously write their messages to a local log file which is automatically replicated to RT and then sequenced and merged. The merged log files are then replicated automatically to the subscribers where the messages are read asynchronously from the local log file and consumed.

RT uses a sequencer and replication strategy to ensure that messages are moved as quickly as possible between publishers and subscribers. Its basic operation consists of the following steps:

  1. Publishers use a kdb Insights interface to asynchronously write messages to log files in a session directory on local disk (think of writing a tickerplant log).

  2. The session directory is replicated from the publisher to the RT nodes. Raft requires a consensus meaning there will need to be 3,5,7,.. nodes in a RT cluster. The combination of Raft and the sequencer creates a merged log directory, present on each RT node.

  3. If multiple publishers are writing to the same stream, they are merged into a single unified and ordered stream by the sequencer.

  4. Subscribers can use the Q or C interface to consume the merged stream. Upon subscribing, the replicator automatically copies the merged log directory to the subscriber node where the messages are read asynchronously from the now local merged logs files using the APIs in the rt-qpk.

    Note

    Multiple subscribers can consume the merged stream and there is no requirement for them to do so within a certain time frame (i.e. they don't have to "keep up" with the publisher).

This results in full decoupling of publishers and subscribers (i.e. publishers don't have to deal with slow subscribers).

RT vs tick

RT is similar to tick, but it also provides the following benefits:

  • A fault tolerant system which can sustain a RT node going offline.

  • Since the publisher & subscriber are decoupled from one another and from the RT nodes, there is no concern about a slow subscriber.

Definitions

  • stream - this is synonymous with an instance of RT, whether it is a 3,5,7,.. node RT implementation. All the messages sent to the stream are replicated, merged and sequenced before being passed to the consumers.

  • session - this is an the identifier for a single publisher which is physically represented as a directory that messages are written to.

  • stream id - applicable to a kdb Insights Enterprise deployment only. Since there can be multiple Streams in a kdb Insights Enterprise, the stream id (currently known in the UI as a Sub Topic) is used by external publishers to identify the stream to send data to.

  • publisher: a publisher writes messages to a session which are pushed to a stream.

    Publishers must use unique session names

    Each publisher on a given host MUST use a different session name to provide a unique directory name for the log files before they are merged.

  • subscriber: a subscriber consumes the merged messages from a stream.

  • sequencer: the sequencer listens to publishers and sequences the incoming messages into a single message stream, creating a global order of messages. The order of messsages from a given publisher will not change after sequencing.

Interacting with RT

Details on how clients can send and receive messages into the stream are available as follows:

RT Architecture

RT includes a low-level publishing layer, with higher-level components layered on top which can be aware of the networking topology needed to stream data.

RT supports a 1 or 3 node cluster:

  • In a 3 node cluster, RT can offer fault tolerance and high availability (HA). If one of the three nodes were to go offline for a period, the remaining two nodes could continue to send data to downstream subscribers.

    This diagram shows RT being used in a 3-node RAFT configuration within the kdb Insights Enterprise. RT Ingestion

  • In a 1 node cluster, fault tolerance is still present, however it is not highly available. If the 1 RT node were to go down, the publisher would continue writing to its local RT log file. Once the RT node has restarted, it would then obtain any data that is present on the publisher node which it has not yet received.

Components

We will now look at the various components that make up the RT microservice:

  • Replicators: continuously, rapidly, and securely transport a log directory from one machine to another.
  • Sequencers: read from multiple session directories and produce a single merged log directory that can itself be consumed.
  • Archiver: drives the garbage collection of all merged logs.

Replicators

The replicator's job is to transport messages from one machine to another and each replicator is made up of a client and a server.

For publishing each of the client interfaces include a push_client replicator, which sends the messages to the push_server replicator, which receives the messages. Similarly for subscribing the interfaces include a pull_client replicator which receive the messages from the pull_server replicator.

Replicators have the following characteristics:

  • Keep track of the restart position on the receiving side, not the sending side, for zero data loss/gaps​. The receiver initiates recovery​, i.e. it knows the "position" to restart from​
  • The client opens the (TCP) connections.
  • Use inotify() to detect new data with low latency​.
  • Use mutually authenticated TLS to encrypt traffic across network boundaries.
  • The replicator is expected to run indefinitely.

Sequencers

Combining more than one session into a single output is the responsibility of a sequencer. The sequencer is entirely deterministic: it chooses a sequence of events from any number of input session directories and merges them into a plausible order in the output stream.

The sequencer uses RAFT, which is suitable for 3,5,7,.. node topologies, and has the advantage that as long as the majority of nodes are up (a quorum) the merged output log file can be constructed and delivered to the subscribers. This means that RT can continue to run even if up to half the RAFT nodes stop running.

The RAFT sequencer nodes work together as follows:

  1. Using the RAFT algorithm, the sequencers form a cluster and elect a leader.
  2. The cluster nodes communicate with the leader about the log files in each session directory they have available locally (replicated from each publisher)
  3. Using the above information, the leader determines what new messages can be appended to the merged stream. It publishes this back to the follower nodes as a merge instruction.
  4. Using the publisher log files (in the session directories) and the merge instruction (supplied by the RAFT leader), messages are merged to the output stream.
  5. Since all sequencers have the same session log files and merge them using the same instruction set, then all of the merged outputs will be identical on every cluster node.
  6. Using the rules set out by RAFT, if a leader fails, a new leader will be elected. In this way up to half the nodes can fail before the cluster as a whole stops functioning.

Once a merged file has been appended to, a pull replicator replicates the merged file to the subscribers to consume.

Archiver

The RT Archiver runs as part of the sequencer and drives the garbage collection of all merged log files - those in RT and replicated to other subscribers - based on a configurable policy. This policy can specify:

  • Retention period for merged log files in minutes. Merged log files which contain messages older than this (based on the message timestamp) are garbage collected.
  • Maximum size of all log files in RT. Merged log files which push the total size beyond this limit are garbage collected, oldest first.
  • Maximum percentage of the available disk space that will be used by RT. When this percentage is exceeded merged log files are garbage collected, oldest first. If not configured the default value of 90% is used.

When a merged log file on the sequencer is marked for garbage collection, this information is propagated to the subscribers such that the log files are also garbage collected there. Garbage collection has the effect of pruning or truncating the log files. The log files still exist but they are of size 0.

When pruning, the following additional constraints exist:

  • The newest merged log file is never pruned.
  • Log files roll every 1GB (or every session, whichever comes first). Only rolled log files are eligible for garbage collection.
  • The time considered is only the first timestamp in a log file, so some time beyond the expiration can actually pass before the log file is pruned, i.e. it must roll first.

Minimum disk size

The size of the disk associated with each RT and consumer pod MUST be larger than the limit defined.

Note

These archival options can be defined as part of the kdb Insights Configuration.

The sample values below would see the merged log files retained for 7 days. However, if the total disk space consumed by them goes above 90Gi, or 90% of the available space, eligible merged logs files are removed, oldest first.

{
    "time": 10080,
    "limit": "90Gi",
    "disk": 90
}

Log File Meta Data

RT provides a logHistory API which returns meta data about the RT log files that have been garbage collected. This allows you to see an history of the log files that were written to.
In a situation where an user has archived many RT log files, this API will give the user an indication of what log files were being written to over time.

The meta data includes:

  • The log file name.
  • The time of the first message added to that log file.
  • The position of the first message added to that log file.

In order to use the API the user must port-forward port 6000 of one of the applicable RT pods to their localhost. For example where remote 6000 has been port-forwarded to localhost 6000:

curl http://127.0.0.1:6000/logHistory
[{
    "ll": ":/s/out/OUT/log.0.1",
    "dir": ":/s/out/OUT",
    "lp": ["2023-04-05T11:26:49.466529804", 1073741824],
    "t": "OUT",
    "logtime": "2023-04-05T11:26:49.466529804",
    "logpos": 1073741824
}, {
    "ll": ":/s/out/OUT/log.0.2",
    "dir": ":/s/out/OUT",
    "lp": ["2023-04-05T14:00:14.979905121", 2147483648],
    "t": "OUT",
    "logtime": "2023-04-05T14:00:14.979905121",
    "logpos": 2147483648
}]