RT Internal Clients
Pre-requisites
A publisher or consumer to RT from within the Insights Platform needs to use the rt.qpk
file and add it as a dependency in the application:
{
"default": {
"entry": ["myapp.q"],
"depends": ["rt"]
}
}
Downloading rt.qpk
You can get a pre-built rt.qpk
from https://gitlab.com/kxdev/interop/rt/app-demo/-/packages, or using q-packer:
qp pull gitlab.com/kxdev/interop/rt/app-demo/rt.qpk 0.2.2
API
The message header
The message header consists of the following fields:
on
- the origin name, a symbol representing who originated this message.ts
- the timestamp the message was recorded. Setting this to0Np
will have the publisher use the current real-time.id
- the message identifier, should be unique for the origin, and have a distance from zero that is increasing by one with each message. For example:1 -2 3 4
is a valid sequence of ids, but1 2 -2 4
is not.to
- the timeout (an integer) of how long the client will wait for a response. This field may be0Ni
to indicate no response is desired.
The message header can be populated in q by setting the corresponding variable in the .rt
namespace (.e.g, .rt.id
).
Basic Producer
The basic producer needs only:
p:.rt.pub["topic"]
p
will be a function that takes messages to record.
The current contents of the header will be consulted when recording messages.
Naming convention for topics
Because the replicator combines the topic name and host name to provide a unique directory name for the stream files during replication, it is important to name the topic carefully.
As long as the topic name does not begin with a forward-slash (i.e. it is not set to a full path), then the stream files will be stored in $RT_LOG_PATH/$hostname.$topic/
.
Path must survive reboots
If the hostname is stable (for example, as part of a stateful set) then this path must survive reboots.
The producer creates an index .rt.pi
to get uniqueness for a topic, which by default only includes the hostname. The producer can set .rt.pi
to something else before calling .rt.pub
if they wish.
Multiple producers on a host
When there are multiple (independent) publishers on a host it is vital that each producer uses a different topic name. Therefore, if you are creating multiple (independent) publishers on a machine, they must set .rt.pi
to something that makes the topic name unique, before calling .rt.pub
.
Only one topic publisher per host
There must only be one publisher running on a host at any one time using a specific topic name.
Process ID
If each publisher-worker does not checkpoint anything, or require deduplication between publishers, using the process ID can work:
.rt.pi:raze[system"hostname"],".",string .z.i;
Deduplication
If different instances of a publisher are sending the same message more than once, and in the same sequence, and you want to guarantee that the consumers only get the message once, RT can perform deduplication on a per topic basis, before the messages are merged and sequenced.
To enable deduplication a topic name should be suffixed with .dedup
.
Use cases include: - Ensuring that when a Stream Processor fails and is restarted, any messages resent because they were they were published between the last checkpoint and the failure are not duplicated to the consumers. - Two hosts publishing the same messages, to support the failure of one of the publishers, RT will ensure only one copy of each message is sent to the consumers.
Stream directory naming
Even though the two hosts use the same topic name they will have different stream directory names as the directories are named: $hostname.$topic
.
Starting Replicators
As long as the topic name does not begin with a forward-slash some replicators will be started to support some basic topologies:
- a
pull_server
onlocalhost:5001
intending to service internal sequencers and consumers that-pull
this stream. - a number (currently 3) of
push_client
connecting to$RT_TOPIC_PREFIX$topic-$n:5002
where$n
is the ordinal (0, 1, 2) and$RT_TOPIC_PREFIX
should be a hostname prefix such asinternal--
.
Using a full path
If the topic name is set to a full path (and begins with a forward-slash) then the replicator will not be started automatically.
.rt.sub["/fastssd/replay"; .pos:@[get;`:/fastssd/checkpoint;0]] {.pos:y; upd ... }
.z.ts:{`:/fastssd/checkpoint set .pos}
lupd:.rt.pub["/fastssd/replay"]; / logged upd
Garbage collection of publisher logs
Publisher log files are automatically garbage collected when both of these conditions are met:
- The publisher's log file has rolled, for example from
log.0.0
tolog.0.1
log.0.0
has been replicated to all the RT pods and merged
At this point log.0.0
is garbage collected on each of the RT pods. The archived flag is then propagated back to publisher so its local log.0.0
is then also garbage collected.
Basic Consumer
The basic consumer needs to implement the following:
Callback:{[data;pos] ... }
.rt.sub["topic"; pos; Callback]
````
The `pos` must be a position returned in `callback`; if no position is available then using `::` is appropriate to refer to the beginning of the stream.
However, if the topic is not a full path (and therefore does not begin with a forward-slash), then the log file will be stored in `$RT_LOG_PATH/$topic/`
!!!note
Unlike the producer this is not prefixed with a hostname.
Some replicators will also be started to support some basic topologies:
- a number (currently 3) of `pull_client` connecting to `$RT_TOPIC_PREFIX$topic-$n:5001` -- that is, the first `pull_server` in the basic producer or Sequencer producing this data. Again, `$n` is the ordinal (0,1,2) and `$RT_TOPIC_PREFIX` should be a hostname prefix such as `internal--`
#### Other events
There are other events in the message stream the user is notified of:
- `badtail`: when a message log corruption is detected
- `reset`: when the source has deleted its log files and starts a new session
These are informing the user of potential data loss. There is always a data loss in case of a `badtail` event.
There is potential data loss and message duplication is case of a `reset` event.
These events are replay-able just like messages, i.e. re-subscribing from an earlier position will result in the message/event sequence.
When an even occurs `.rt.on_event` function is called with the `event`, `topic` and `position` arguments. The `event` argument is one of \`reset or \`badtail, `topic` is the same value used in the `.rt.sub` call and `position` is a 7h vector of
- the position of the corrupted message and the starting position of the next log file for `event`=\`badtail
- the end position of the last message and the position of the first message after a reset for `event`=`reset
The default implementation of `.rt.on_event` only prints a message to standard out, for example:
```q
q).rt.on_event[`reset;"/tmp/test";130 17592186044416]
Reset event in /tmp/test, rolling position from 130 to 17592186044416
This shared default implementation can be redefined or the user can supply a custom event handler with a dictionary:
q) .rt.sub["/tmp/test";0;`message`event!({data;position] ...};{[event;position] ...})]
After delivering the event the stream position just moves on to position[1]
and the next message is delivered unless the used stops the stream.
Unsubscribing
The subscriber can decide to stop receiving new messages. There are two ways of doing this.
-
From a message or event callback the user can simply call
.rt.unsub[]
. -
Call
.rt.unsub[topic]
using the sametopic
argument as in the.rt.sub[topic;pos;callback]
.
Garbage collection of subscriber stream log files
RT Archiver
The RT Archiver manages the garbage collection of all merged stream logs inside RT based on a configurable policy.
These archival options can be defined as part of the Insights Configuration.
By default the subscribers will adopt the garbage collection policy used by RT (opt-in). When log files are truncated in RT there are also truncated on all subscribers.
Subscriber opt-out
There are use cases where a subscriber may want to opt-out of the default behavior and independently manage the garbage collection of their local log files. To do this they need to set the environment variable $REPLICATOR_EXCHANGE_ARCHIVED=0
before calling .rt.sub[]
This decouples the subscriber from RT's global garbage collection policy. Therefore the subscriber must prune their own local log files as messages are consumed and no longer need to be replayed (say on a restart). RT log files are pruned using the API:
.rt.prune[x;pos]
where:
x
is the same topic-parameter supplied to.rt.sub[]
pos
is the position returned from the subscriber callback
The .rt.prune[]
function will truncate all log files which have been rolled over and where all the messages in the log file are before the specified position.
Lead subscriber
There is one specific use case where one of the subscribers can observe when messages have been persisted downstream either to a storage tier or another durable messaging system. After this has occurred those messages are no longer required to be replayed to it or other subscribers.
Note
This needs to be done as a deliberate design decision where one of the subscribers can be configured as the lead.
In this scenario that lead subscriber will drive the truncation of merged logs both in RT and the other subscribers. Note this would work alongside the RT Archiver which is still required to ensure RT itself doesn't run out of disk space should the lead subscriber fail. However, an early truncation decision by the lead subscriber will take precedence over the RT Archiver policy.
To do this the lead subscriber needs to set the environment variable $REPLICATOR_EXCHANGE_ARCHIVED=1
before calling .rt.sub[]
. It can then use the .rt.prune[]
function as described and its truncation decisions will be reflected in RT and the other subscribers (assuming they haven't opted-out).