Skip to content

Internal q publisher and subscriber

Internal q publishers and subscribers (such as the Stream Processor and Storage Manager) use the rt.qpk interface (also known as the "RT kdb Insights qpk"), to publish messages to an RT stream and to receive messages from an RT stream. External q publishers can also leverage the rt.qpk to communicate with an RT stream.

Publisher

Internal Publisher

The publisher should call:

q)p:.rt.pub["streamid"]

The returned p is a function to which q objects can be passed for publishing.

q)show t:([]time:3#.z.p;sym:`a`b`c)
time                          sym
---------------------------------
2023.04.03D16:50:36.914760000 a
2023.04.03D16:50:36.914760000 b
2023.04.03D16:50:36.914760000 c
q)p (`upd;`tab; t) 

Call with a dictionary

Alternatively, .rt.pub can also be called with a dictionary to override defaults, for example:

q)t:`stream`path`topic_prefix`replicas`port!("stream";"/rt/logs";"rt-";3;5002)
q)p:.rt.pub t

Description of the dictionary for .rt.pub:

key type default description
stream string n/a RT stream name
path string $RT_LOG_PATH The parent path of RT log directories
topic_prefix string $RT_TOPIC_PREFIX The prefix of the RT sequencer node hostnames in $RT_TOPIC_PREFIX$stream-$n right before $stream.
replicas int .rt.replicas/$RT_REPLICAS/3 The replica count of the RT sequencer the client is publishing to. .rt.replicas is initialized from $RT_REPLICAS when the library is loaded, and defaults to 3.
port int 5002 The port number of the internal pull servers
publisher_id string n/a Used to distinguish multiple publishers on the same host. The directory name will be $publisher_id.$hostname.$stream.
dedup_id string n/a Used in the sequencer to identify deduplicated input streams. The $stream suffix of the directory name will change to $dedup_id.dedup.
cluster list of strings :$RT_TOPIC_PREFIX$stream-$n:$port It is possible to completely override the logic that creates the connection URLs for the replicators by manually providing the URLs.

External Publisher

An external publisher can send data to RT via a set of load balancers. These load balancers need to be discovered, kdb Insights Enterprise relies on a service called the Information Service for this. When queried the Information Service will return the load balancer endpoints that an external publisher can communicate with. There are inbuilt APIs within the rt.qpk that can manage the querying of the Information Service. The user is required to go through some authentication steps, these are covered below.

Pre-requisites

  • Deploy an assembly. An example assembly sdk_sample_assembly can be found here.
  • Make sure that the assembly is deployed in your kdb Insights Enterprise instance.
  • Ensure you have an authenticated kdb Insights Enterprise client URL.
  • Make sure that the kdb Insights Enterprise ingest endpoints (as defined by the KXI_CONFIG_URL) are accessible.

Parameters

parameter required default description
config_url Mandatory none The URL that this program calls or reads to find the RT endpoint(s) it needs to connect to
path Mandatory none The location that the RT local files are written locally

configUrl

The config_url parameter can be set to KXI_CONFIG_URL to access kdb Insights Enterprise from outside the cluster

q)url:getenv`KXI_CONFIG_URL;
q)params:`config_url`path!(url;"/tmp/rt/")
q)p:.rt.pub params
q)show t:([]time:3#.z.p;sym:`a`b`c)
time                          sym
---------------------------------
2023.04.03D16:50:36.914760000 a
2023.04.03D16:50:36.914760000 b
2023.04.03D16:50:36.914760000 c
q)p (`upd;`tab; t) 

Maximum payload

An error will be thrown by RT if the message being sent is too large. This will occur if a publisher attempts to send a message greater in size than 1GB, to be exact the upper limit is 1073741783 bytes. RT cannot support messages greater than 1GB.

The example code below sends a message that is too large and shows the error that is thrown:

q)p:.rt.pub["streamid"];
q)data:(prd 3#1024)#0x00;
q)p data
'message too large

1GB payload limit

Messages that are larger than 1GB will need to be broken into smaller ones before they are sent to RT.

Publisher id

A publisher's log directory names must be globally unique. By default the rt.qpk combines the streamid and the host name to provide a unique directory name.

Path must survive reboots

If the hostname is stable (for example, as part of a stateful set) then this path must survive reboots.

However, if you have multiple publishers connected to the same streamid and running on the same host, you also have to provide a publisher id to create unique directory names on the same host. This can be done by setting `publisher_id in the dictionary .rt.pub is called with.

q) p: .rt.pub `stream`publisher_id!("mystream";"pub_a")

Publisher IDs must be unique on a host

There must never be more than one publisher running using the publisher id on the same host at the same time.

Deduplication

To enable deduplication of a stream:

  1. The publisher needs to call .rt.pub with a dictionary and specify a `dedup_id that identifies the deduplicated stream globally
  2. A unique message identifier must be set in .rt.id before publishing each message
  3. .rt.id needs to be monotonically increasing and if multiple identical streams are merged by the same dedup_id all publishers need to use the same .rt.id for the same message.

For example:

q)t:`topic_prefix`stream`dedup_id!("kx-";"mystream";"trade")
q)pub_fun: .rt.pub t
q)//pub_fn publishes message to the RT with streamid=mystream using deduplication with dedup_id=trade

All session streams from different publishers who shared the same dedup_id will be deduplicated. RT will keep track of the high watermark for each dedup_id - where it sees a message with a lower identifier than the watermark then that message is discarded.

Publisher IDs must be unique on a host

Where you want to run multiple publishers on the same host then `publisher_id also should be set differently for each publisher.

The message header

The message header consists of the following fields:

  • on - the origin name, a symbol representing who originated this message.
  • ts - the timestamp the message was recorded. Setting this to 0Np will have the publisher use the current real-time.
  • id - the message identifier, should be unique for the origin, and have a distance from zero that is increasing by one with each message. For example: 1 -2 3 4 is a valid sequence of ids, but 1 2 -2 4 is not.
  • to - reserved for future use.

The message header can be populated by setting the corresponding variable in the .rt namespace (e.g. .rt.id) before calling the returned function p when publishing a message.

Subscriber

The rt.qpk monitors the replicated merged log files, and surfaces messages to the subscriber as they are available. It does this by using file-system events, responding to their changes and surfacing those changes to the subscriber.

Pre-requisites

A publisher or subscriber to RT can use the rt.qpk file and add it as a dependency in the application:

{
  "default": {
    "entry": ["myapp.q"],
    "depends": ["rt"]
  }
}

Subscriber Callback

The subscriber needs to implement the following message callback:

callback:{[data;pos] ... }
.rt.sub["streamid"; pos; callback]

The callback arguments provide deserialized q messages and a "position". The "position" represents the last message the subscriber received. This can then be used for restarting the subscriber from this point.

The position is not a message index

The "position" is opaque and cannot be used as a message index.

When starting up, the subscriber can decide where in the stream to start receiving messages from:

  1. The first message available i.e. the oldest message that has not yet been archived.
  2. A particular "position" it has been previously given by the RT.qpk. This allows subscribers to restart from their last known position.
  3. A position of the message that was most recently made available to the subscriber. This allows subscribers to ignore all historic messages and only subscribe to new messages.

Subscribing from the beginning of the stream

To subscribe to the earliest position, call .rt.sub with the pos argument set to :: as follows:

callback:{[data;pos] ... }
.rt.sub["streamid"; `::`; callback]

Subscribing from a known position in the stream

To subscribe to a known position in the stream, call .rt.sub with the pos argument set to a valid position previously passed to callback as follows:

callback:{[data;pos] ... }
.rt.sub["streamid"; pos; callback]

The position is not a message index

The "position" is opaque and cannot be used as a message index.

Subscribing from the latest message

To subscribe to the latest position in the stream, you can either:

  • Call .rt.sub with the pos parameter replaced with the latest symbol as follows:

    callback:{[data;pos] ... }
    .rt.sub["streamid";`latest; callback]
    
  • Use a 2 step approach:

    1. Obtain the latest position via the .rt.get_latest_position API
    2. Pass the result, a numerical position value, to the .rt.sub API:

      callback:{[data;pos] ... }
      latest_pos: .rt.get_latest_position["streamid"]
      .rt.sub["streamid"; latest_pos; callback)}]
      

Other events

There are other events in the message stream that the user can be notified of with an event callback. These inform the user of exceptions where data loss can occur.

event details next position implication
badtail Message log file corruption is detected. Skips to the next log file. There is always data loss.
badmsg A message cannot be decoded. Skips to the next message. The affected message could not be deserialized by the q subscriber. There will be data loss of the affected message.
reset RT has been reset, deleting its log files and starting a new session. Skips to the new session. There is potential data loss and message duplication.
skip-forward The current position points to a log file that has been archived. Skips to the start of the first unarchived log file. There is always data loss.

These events are replay-able just like messages, i.e. re-subscribing from an earlier position will result in the same message/event sequence.

When one of these event occurs .rt.on_event function is called with the following arguments:

name details
event badtail, badmsg, reset or skip-forward
streamid The same streamid value used in the .rt.sub call
position 7h vector of two positions

The two values in the position vector depend on the event:

event position vector values
badtail the position of the corrupted message and the starting position of the next log file
badmsg the position of the corrupted message and the next message
reset the end position of the last message and the position of the first message after a reset
skip-forward the position in the archived log and the starting position of the next unarchived log file

The default implementation of .rt.on_event only prints a message to standard out, for example:

q).rt.on_event[`reset;"streamid";130 17592186044416]
Reset event in streamid, rolling position from 130 to 17592186044416

This default implementation can be redefined or the user can supply a custom event handler with a dictionary:

q) .rt.sub["streamid";0;`message`event!({data;position] ...};{[event;position] ...})]

Unsubscribing

The subscriber can decide to stop receiving new messages. There are two ways of using the .rt.unsub function:

-.rt.unsub[] can be called from a message or event callback and that will stop the message stream for the subscription that the message/event was received on.

  • .rt.unsub[streamid] can be called at any point (inside a callback or not) with the streamid to unsubscribe from (as was passed to .rt.sub[streamid;pos;callback]).

Pause and Resume

Pause

The subscriber can also decide to pause the delivery of incoming new messages while the subscribers local log files still continue to grow such that the delivery of messages to the subscriber can be resumed later.

There are two ways of doing this using the .rt.pause function:

  • .rt.pause[] can be called from a message or event callback and that will pause the message stream for the subscription the message/event was received on.

  • .rt.pause[streamid] can be called at any point (inside a callback or not) with the streamid to pause (as was passed to .rt.sub[streamid;pos;callback]).

Resume

The subscriber can resume a paused stream using the .rt.resume function.

.rt.resume[streamid] can be called at any point (inside a callback or not) with the streamid to resume (as was passed to .rt.sub[streamid;pos;callback]).

Filtering

The rt.qpk provides an efficient way to filter messages that are generic lists with one or two leading symbols, e.g (upd;trade;data).

In order to apply message filtering a special message processing callback function has to be created.

An example subscription to all messages on a stream:

.rt.sub[stream;position;callback];

An example subscription to a subset of the data on a stream, in this case the trade and quote tables only:

cb:.rt.filter[{$[x=`.b;y in `trade`quote;0b]};callback];
.rt.sub[stream;position;  cb];

The filter will only deliver the messages to the subscriber that satisfies the criteria.

We can define two types of filtering methods: - filter with caching: to enable caching, the predicate has to return boolean back, - filter without caching: for disabling caching, the given predicate has to return long or int 0-1 back.

We are able to clear cache, when any part of the filtering has been changed (for example changes in the matching list of the sym2, like in the example below).

t:enlist`quote;
1) enable caching - boolean predicate
f:filter[{$[x=`.b;y in t;0b]};callback];
sub[stream;0;f];

t,:`trade;
2) clear cache
filter[f;`clear_cache];

3) disable caching - integer predicate
f:filter[{$[(x=`.b)&y in t;1;0]};callback];
sub[stream;0;f];

Garbage collection of subscriber stream log files

Adopt the RT garbage collection policy

The RT Archiver manages the garbage collection of all merged stream logs inside RT based on a configurable policy.

These archival options can be defined as part of the kdb Insights Configuration.

By default the subscribers will adopt the garbage collection policy used by RT (opt-in). When log files are truncated in RT they are also truncated on all subscribers.

Subscriber opt-out

As a subscriber you may want to opt-out of the default behavior and independently manage the garbage collection of your local log files.

RT garbage collection API

If you choose to decouple the subscriber from RT's garbage collection you must use the API .rt.prune. Removing RT log files through other means will cause issues with the subscriber ingesting the RT stream.

To opt-out of the default behaviour, you need to set the following environment variables before calling .rt.sub.

  • $REPLICATOR_EXCHANGE_ARCHIVED=0
  • $REPLICATOR_TRUNCATE_ARCHIVED=0

The replicator is responsible for truncating the RT log files. Setting the environment variables in this way will decouple the subscriber from RT's global garbage collection policy. Therefore the subscriber must truncate their own local log files as messages are consumed and no longer need to be replayed (say on a restart). RT log files are truncated using the API:

.rt.prune[streamid;pos]

where:

  • streamid is the same parameter as was passed to .rt.sub[streamid;pos;callback]
  • pos is the position returned from the subscriber callback

The .rt.prune function will truncate all log files which have been rolled over and where all the messages in the log file are before the specified position. Further details on truncation are available here. The .rt.prune function call is an async call in term of it starts a thread that does the truncation task and returns.

Lead subscriber

Another use case is where one of the subscribers can observe when messages have been persisted downstream either to a storage tier or another durable messaging system. After this has occurred those messages are no longer required to be replayed to it or other subscribers.

A lead subscriber is required

This needs to be done as a deliberate design decision where one of the subscribers can be configured as the lead.

In this scenario that lead subscriber will drive the truncation of merged logs both in RT and the other subscribers.

The RT Archiver is still required

This would work alongside the RT Archiver which is still required to ensure RT itself doesn't run out of disk space should the lead subscriber fail. However, an early truncation decision by the lead subscriber will take precedence over the RT Archiver policy.

To do this the lead subscriber needs to set the environment variable $REPLICATOR_EXCHANGE_ARCHIVED=1 before calling .rt.sub. It can then use the .rt.prune function as described above and its truncation decisions will be reflected in RT and the other subscribers (assuming they haven't opted-out).