# Internal q publisher and subscriber

Internal q publishers and subscribers (such as the Stream Processor and Storage Manager) use the rt.qpk (also known as the "RT kdb Insights qpk"), publish messages to an RT stream and to receive messages from an RT stream.

## Publisher

The publisher should call:

p:.rt.pub["streamid"]


The returned p is a function to which q objects can be passed for publishing.

### Session name

Session names must be globally unique, as the session is used to generate the name of the directory in which the logs files are stored (both locally and in RT).

By default the rt.qpk combines the streamid and the host name to provide a unique session name (and therefore directory) for the log files during replication.

Path must survive reboots

If the hostname is stable (for example, as part of a stateful set) then this path must survive reboots.

However, if you have multiple publishers connected to the same streamid and running on the same host, you must manually override the session name to ensure each publisher uses a directory this globally unique. This can be done by setting .rt.pi to the required session name before calling .rt.pub.

Warning

There must never be more than one publisher running using the same session name at the same time.

### Deduplication

To enable deduplication of a stream the publisher needs to:

1. Set $RT_RAFT_CLUSTER to the endpoint prefix for the RT cluster, typically ${RT_TOPIC_PREFIX}<streamid> as described here.

2. When calling .rt.pub[argument], set argument to <dedupid>.dedup, rather than argument=streamid.

3. A message identifier must be set using .rt.id for each published message.

For example:

q)getenv RT_TOPIC_PREFIX
"kx-"
q)streamid:"mystream"
q)RT_RAFT_CLUSTER setenv (getenv RT_TOPIC_PREFIX),streamid
q)getenv RT_RAFT_CLUSTER
"kx-mystream"
q)pub_fun:.rt.pub[argument]
q)//pub_fn publishes message to the RT with streamid=mystream using deduplication with dedupid=trades


All session streams from different publishers who shared the same dedupid will be deduplicated. RT will keep track of the high watermark for each dedupid - where it sees a message with a lower identifier than the watermark then that message is discarded.

Note

Where you want to run multiple publishers on the same host then .rt.pi should be set differently for each publisher.

The message header consists of the following fields:

• on - the origin name, a symbol representing who originated this message.
• ts - the timestamp the message was recorded. Setting this to 0Np will have the publisher use the current real-time.
• id - the message identifier, should be unique for the origin, and have a distance from zero that is increasing by one with each message. For example: 1 -2 3 4 is a valid sequence of ids, but 1 2 -2 4 is not.
• to - reserved for future use.

The message header can be populated by setting the corresponding variable in the .rt namespace (e.g. .rt.id) before calling the returned function p when publishing a message.

## Subscriber

The rt.qpk monitors the replicated merged log files, using file-system events, responds to their changes and surfaces those changes to the subscriber.

### Pre-requisites

A publisher or subscriber to RT can use the rt.qpk file and add it as a dependency in the application:

{
"default": {
"entry": ["myapp.q"],
"depends": ["rt"]
}
}


## Subscribing

The subscriber needs to implement the following message callback:

callback:{[data;pos] ... }
.rt.sub["streamid"; pos; callback]


The callback arguments provide deserialized q messages and a "position" that can be used for restarting the subscriber from this point.

When calling .rt.sub the pos should be a valid position previously passed to callback; if no position is available then using :: is appropriate to refer to the beginning of the stream.

Note

The "position" is opaque and cannot be used as a message index.

### Other events

There are other events in the message stream that the user can be notified of with an event callback. These inform the user of exceptions where data loss can occur.

event details next position implication
badtail Message log file corruption is detected. Skips to the next log file. There is always data loss.
badmsg A message cannot be decoded. Skips to the next message. There is always data loss.
reset RT has been reset, deleting its log files and starting a new session. Skips to the new session. There is potential data loss and message duplication.
skip-forward The current position points to a log file that has been archived. Skips to the start of the first unarchived log file. There is always data loss.

These events are replay-able just like messages, i.e. re-subscribing from an earlier position will result in the same message/event sequence.

When one of these event occurs .rt.on_event function is called with the following arguments:

name details
event badtail, badmsg, reset or skip-forward
streamid The same streamid value used in the .rt.sub call
position 7h vector of two positions

The two values in the position vector depend on the event:

event position vector values
badtail the position of the corrupted message and the starting position of the next log file
badmsg the position of the corrupted message and the next message
reset the end position of the last message and the position of the first message after a reset
skip-forward the position in the archived log and the starting position of the next unarchived log file

The default implementation of .rt.on_event only prints a message to standard out, for example:

q).rt.on_event[reset;"streamid";130 17592186044416]
Reset event in streamid, rolling position from 130 to 17592186044416


This default implementation can be redefined or the user can supply a custom event handler with a dictionary:

q) .rt.sub["streamid";0;messageevent!({data;position] ...};{[event;position] ...})]


### Unsubscribing

The subscriber can decide to stop receiving new messages. There are two ways of using the .rt.unsub function:

1. .rt.unsub[] can be called from a message or event callback and that will stop the message stream for the subscription that the message/event was received on.

2. .rt.unsub[streamid] can be called at any point (inside a callback or not) with the streamid to unsubscribe from (as was passed to .rt.sub[streamid;pos;callback]).

### Pause and Resume

#### Pause

The subscriber can also decide to pause the delivery of incoming new messages while the subscribers local log files still continue to grow such that the delivery of messages to the subscriber can be resumed later.

There are two ways of doing this using the .rt.pause function:

1. .rt.pause[] can be called from a message or event callback and that will pause the message stream for the subscription the message/event was received on.

2. .rt.pause[streamid] can be called at any point (inside a callback or not) with the streamid to pause (as was passed to .rt.sub[streamid;pos;callback]).

#### Resume

The subscriber can resume a paused stream using the .rt.resume function.

.rt.resume[streamid] can be called at any point (inside a callback or not) with the streamid to resume (as was passed to .rt.sub[streamid;pos;callback]).

### Filtering

The rt.qpk provides an efficient way to filter messages that are generic lists with one or two leading symbols, e.g (upd;trade;data).

In order to apply message filtering a special message processing callback function has to be created.

An example subscription to all messages on a stream:

.rt.sub[stream;position;callback];


An example subscription to a subset of the data on a stream, in this case the trade and quote tables only:

cb:.rt.filter[{$[x=.b;y in tradequote;0b]};callback]; .rt.sub[stream;position; cb];  The filter will only deliver the messages to the subscriber that satisfies the criteria. ### Garbage collection of subscriber stream log files #### Adopt the RT garbage collection policy The RT Archiver manages the garbage collection of all merged stream logs inside RT based on a configurable policy. These archival options can be defined as part of the kdb Insights Configuration. By default the subscribers will adopt the garbage collection policy used by RT (opt-in). When log files are truncated in RT they are also truncated on all subscribers. #### Subscriber opt-out There are use cases where a subscriber may want to opt-out of the default behavior and independently manage the garbage collection of their local log files. To do this they need to set the environment variable $REPLICATOR_EXCHANGE_ARCHIVED=0 before calling .rt.sub[].

This decouples the subscriber from RT's global garbage collection policy. Therefore the subscriber must prune their own local log files as messages are consumed and no longer need to be replayed (say on a restart). RT log files are pruned using the API:

.rt.prune[streamid;pos]


where:

• streamid is the same parameter as was passed to .rt.sub[streamid;pos;callback]
• pos is the position returned from the subscriber callback

The .rt.prune[] function will truncate all log files which have been rolled over and where all the messages in the log file are before the specified position.

To do this the lead subscriber needs to set the environment variable \$REPLICATOR_EXCHANGE_ARCHIVED=1 before calling .rt.sub[]. It can then use the .rt.prune[] function as described and its truncation decisions will be reflected in RT and the other subscribers (assuming they haven't opted-out).