Internal q publisher and subscriber
Internal q publishers and subscribers (such as the Stream Processor and Storage Manager) use the rt.qpk
interface (also known as the "RT kdb Insights qpk"), to publish messages to an RT stream and to receive messages from an RT stream. External q publishers can also leverage the rt.qpk
to communicate with an RT stream.
Publisher
Internal Publisher
The publisher should call:
q)p:.rt.pub (enlist`stream)!enlist "streamid";
The returned p
is a function to which q objects can be passed for publishing.
q)show t:([]time:3#.z.p;sym:`a`b`c)
time sym
---------------------------------
2023.04.03D16:50:36.914760000 a
2023.04.03D16:50:36.914760000 b
2023.04.03D16:50:36.914760000 c
q)p (`upd;`tab; t)
Call with a dictionary
Alternatively, .rt.pub
can also be called with a dictionary to override defaults, for example:
q)t:`stream`path`topic_prefix`port!("stream";"/rt/logs";"rt-";5002)
q)p:.rt.pub t
Description of the dictionary for .rt.pub
:
key | type | default | description |
---|---|---|---|
stream | string | n/a | RT stream name |
path | string | $RT_LOG_PATH |
The parent path of RT log directories |
topic_prefix | string | $RT_TOPIC_PREFIX |
The prefix of the RT sequencer node hostnames in $RT_TOPIC_PREFIX$stream-$n right before $stream . |
port | int | 5002 |
The port number of the internal pull servers |
publisher_id | string | n/a | Used to distinguish multiple publishers on the same host. The directory name will be $publisher_id.$hostname.$stream . |
dedup_id | string | n/a | Used in the sequencer to identify deduplicated input streams. The $stream suffix of the directory name changes to $dedup_id.dedup . |
cluster | list of strings | :$RT_TOPIC_PREFIX$stream-$n:$port |
It is possible to completely override the logic that creates the connection URLs for the replicators by manually providing the URLs. |
External Publisher
An external publisher can send data to RT via a set of load balancers. These load balancers need to be discovered, kdb Insights Enterprise relies on a service called the Information Service for this. When queried the Information Service returns the load balancer endpoints that an external publisher can communicate with. There are inbuilt APIs within the rt.qpk
that can manage the querying of the Information Service. The user is required to go through some authentication steps, these are covered below.
Pre-requisites
- Deploy an assembly. An example assembly
sdk_sample_assembly
can be found here. - Make sure that the assembly is deployed in your kdb Insights Enterprise instance.
- Ensure you have an authenticated kdb Insights Enterprise client URL.
- Make sure that the kdb Insights Enterprise ingest endpoints (as defined by the
KXI_CONFIG_URL
) are accessible.
Parameters
parameter | required | default | description |
---|---|---|---|
config_url | Mandatory | none | The URL that this program calls or reads to find the RT endpoint(s) it needs to connect to |
path | Mandatory | none | The location that the RT local files are written locally |
configUrl
The config_url
parameter can be set to KXI_CONFIG_URL
to access kdb Insights Enterprise from outside the cluster
q)url:getenv`KXI_CONFIG_URL;
q)params:`config_url`path!(url;"/tmp/rt/")
q)p:.rt.pub params
q)show t:([]time:3#.z.p;sym:`a`b`c)
time sym
---------------------------------
2023.04.03D16:50:36.914760000 a
2023.04.03D16:50:36.914760000 b
2023.04.03D16:50:36.914760000 c
q)p (`upd;`tab; t)
Maximum payload
An error is thrown by RT if the message being sent is too large. This occurs if a publisher attempts to send a message greater in size than 1GB, to be exact the upper limit is 1073741783 bytes. RT cannot support messages greater than 1GB.
The example code below sends a message that is too large and shows the error that is thrown:
q)url:getenv`KXI_CONFIG_URL;
q)params:`config_url`path!(url;"/tmp/rt/")
q)p:.rt.pub params
q)data:(prd 3#1024)#0x00;
q)p data
'message too large
1GB payload limit
Messages that are larger than 1GB need to be broken into smaller ones before they are sent to RT.
Publisher id
A publisher's log directory names must be globally unique. By default the rt.qpk
combines the streamid and the host name to provide a unique directory name.
Path must survive reboots
If the hostname is stable (for example, as part of a stateful set) then this path must survive reboots.
However, if you have multiple publishers connected to the same streamid and running on the same host, you also have to provide a publisher id to create unique directory names on the same host. This can be done by setting `publisher_id
in the dictionary .rt.pub
is called with.
q) p: .rt.pub `stream`publisher_id!("mystream";"pub_a")
Publisher IDs must be unique on a host
There must never be more than one publisher running using the publisher id on the same host at the same time.
Deduplication
To enable deduplication of a stream:
- The publisher needs to call
.rt.pub
with a dictionary and specify a`dedup_id
that identifies the deduplicated stream globally - A unique message identifier must be set in
.rt.id
before publishing each message .rt.id
needs to be monotonically increasing and if multiple identical streams are merged by the samededup_id
all publishers need to use the same.rt.id
for the same message.
For example:
q)t:`topic_prefix`stream`dedup_id!("kx-";"mystream";"trade")
q)pub_fun: .rt.pub t
q)//pub_fn publishes message to the RT with streamid=mystream using deduplication with dedup_id=trade
All session streams from different publishers who shared the same dedup_id
will be deduplicated. RT keeps track of the high watermark for each dedup_id
- where it sees a message with a lower identifier than the watermark then that message is discarded.
Publisher IDs must be unique on a host
Where you want to run multiple publishers on the same host then `publisher_id
also should be set differently for each publisher.
The message header
The message header consists of the following fields:
on
- the origin name, a symbol representing who originated this message.ts
- the timestamp the message was recorded. Setting this to0Np
will have the publisher use the current real-time.id
- the message identifier, should be unique for the origin, and have a distance from zero that is increasing by one with each message. For example:1 -2 3 4
is a valid sequence of ids, but1 2 -2 4
is not.to
- reserved for future use.
The message header can be populated by setting the corresponding variable in the .rt
namespace (e.g. .rt.id
) before calling the returned function p
when publishing a message.
Subscriber
The rt.qpk
monitors the replicated merged log files, and surfaces messages to the subscriber as they are available. It does this by using file-system events, responding to their changes and surfacing those changes to the subscriber.
Pre-requisites
A publisher or subscriber to RT can use the rt.qpk
file and add it as a dependency in the application:
{
"default": {
"entry": ["myapp.q"],
"depends": ["rt"]
}
}
Subscriber Callback
The subscriber needs to implement the following message callback:
callback:{[data;pos] ... }
.rt.sub `stream`position`callback!("streamid"; pos; callback);
The callback arguments provide deserialized q messages and a "position". The "position" represents the last message the subscriber received. This can then be used for restarting the subscriber from this point.
The position
is not a message index
The "position" is opaque and cannot be used as a message index.
When starting up, the subscriber can decide where in the stream to start receiving messages from:
- The first message available i.e. the oldest message that has not yet been archived.
- A particular "position" it has been previously given by the RT.qpk. This allows subscribers to restart from their last known position.
- A position of the message that was most recently made available to the subscriber. This allows subscribers to ignore all historic messages and only subscribe to new messages.
In the callback function, we can use the .rt.p.next_pos[]
function which gives back the next position related to the current position given by the y
parameter of the callback function.
message:([]msg:();pos:());
next_positions:([]pos:"j"$());
callback:{
`message upsert `msg`pos!(x;y);
`next_positions upsert .rt.p.next_pos[]];
There is a mapping from a given position into a log file info as a list of (session; log_no; position)
and vice versa provided by the .rt.position_map
function:
log_file:.rt.position_map 52779779358720; / 3 3 0
position:.rt.position_map 3 3i; / 52779779358720
Subscribing from the beginning of the stream
To subscribe to the earliest position, call .rt.sub
with the pos
argument set to ::
as follows:
callback:{[data;pos] ... }
.rt.sub `stream`position`callback!("streamid"; `::`; callback);
Subscribing from a known position in the stream
To subscribe to a known position in the stream, call .rt.sub
with the pos
argument set to a valid position previously passed to callback
as follows:
callback:{[data;pos] ... }
.rt.sub `stream`position`callback!("streamid"; pos; callback);
The position
is not a message index
The "position" is opaque and cannot be used as a message index.
Subscribing from the latest message
To subscribe to the latest position in the stream, you can either:
-
Call
.rt.sub
with thepos
parameter replaced with thelatest
symbol as follows:callback:{[data;pos] ... } rt.sub `stream`position`callback!("streamid";`latest; callback);
-
Use a 2 step approach:
- Obtain the latest position via the
.rt.get_latest_position
API -
Pass the result, a numerical position value, to the
.rt.sub
API:callback:{[data;pos] ... } latest_pos: .rt.get_latest_position["streamid"] .rt.sub `stream`position`callback!("streamid";latest_pos; callback);
- Obtain the latest position via the
Optionally, we can query the latest (last available message in the log) and the next position (expected next position in the stream to subscribe) for an absolute path as well.
apath:"/path/streamid";
latest_pos:.rt.get_latest_position[apath];
next_pos:.rt.get_next_position[apath];
The .rt.get_latest_position
function supports the dictionary input as well.
The input dictionary must contain the stream
and peers
parameters.
key | type | required | default | description |
---|---|---|---|---|
stream | string | Mandatory | None | RT stream. |
peers | string | Mandatory | None | KXI-RT cluster. |
latest_pos:.rt.get_latest_position `stream`peers!("mystream";enlist ":rt-mystream-1:5001");
Call with a dictionary
Alternatively, .rt.sub
can also be called with a dictionary to override defaults parameters.
Description of the dictionary for .rt.sub
:
-
All subscription options must contain the
position
andcallback
parameters as position and callback.key type required default description position int Mandatory None RT stream position. callback function Mandatory None RT stream callback function. -
There are 4 available options to include in the dictionary:
-
Create the
config_url
from input parameters:key type required default description stream string Mandatory None RT stream name. path string Optional $RT_LOG_PATH
The path of RT log directory. port int Optional 5001
The port number the pull clients start on. topic_prefix string Optional $RT_TOPIC_PREFIX
RT topic prefix name. -
Use a pre-defined cluster as
cluster
input parameter. Applying input cluster overrides the logic that creates the connection URLs for the replicators by manually providing the URLs.key type required default description stream string Mandatory None RT stream name. cluster list of strings Optional created by RT The connection URL. path string Optional $RT_LOG_PATH
The path of RT log directory. -
Use an available
config_url
given by its path:key type required default description config_url json file Mandatory None The URL of the RT endpoint(s). path string Optional $RT_LOG_PATH
The path of RT log directory. -
Use an
absolute_path
:key type required default description abs_path string Mandatory None The absolute path of the RT log directory.
-
The key path
can be relative or absolute when replicators are started, but in the case of key abs_path
the path must be absolute, and no replicators can be started. Regarding the relative
or absolute
path of the key path
, the difference is that the absolute path is used as a path of the RT log directory, while the relative path suffixes the $RT_LOG_PATH
this concatenated path is used as the RT log directory.
Examples of the available options to call .rt.sub
with a dictionary:
-
Call by creation of the config json file:
param:`stream`path`position`callback!("data";"/tmp/stream";`latest;callback); .rt.sub param # RT log path: `:/tmp/stream/72c0024dbc5e.data param:`stream`port`position`callback!("data";5001;0;callback); .rt.sub param # RT log path: `:/s/72c0024dbc5e.data
-
Call by application of a config json file:
param:`config_url`path`position`callback!("file:///tmp/config.json";"stream/data";`latest;callback); .rt.sub param # RT log path: `:/s/stream/data/72c0024dbc5e.data
-
Call by a manually provided URL:
cluster:enlist ":rt-data-0:5001"; param:`stream`cluster`path`position`callback!("data";cluster;"stream/data";`latest;callback); .rt.sub param # RT log path: `:/s/stream/data/72c0024dbc5e.data
-
Call by application of an absolute path:
param:`abs_path`position`callback!("/s/72c0024dbc5e.data";0;callback) .rt.sub param # no replicators are started
Other events
There are other events in the message stream that the user can be notified of with an event callback. These inform the user of exceptions where data loss can occur.
event | details | next position | implication |
---|---|---|---|
badtail |
Message log file corruption is detected. | Skips to the next log file. | There is always data loss. |
badmsg |
A message cannot be decoded. | Skips to the next message. | The affected message could not be deserialized by the q subscriber. There will be data loss of the affected message. |
reset |
RT has been reset, deleting its log files and starting a new session. | Skips to the new session. | There is potential data loss and message duplication. |
skip-forward |
The current position points to a log file that has been archived. | Skips to the start of the first unarchived log file. | There is always data loss. |
These events are replayable just like messages, for example resubscribing from an earlier position results in the same message or event sequence.
When one of these events occurs, the .rt.on_event
function is called with the following arguments:
name | details |
---|---|
event |
badtail , badmsg , reset or skip-forward . |
streamid |
The same streamid value used in the .rt.sub call. |
position |
7h vector of two positions. |
The two values in the position
vector depend on the event
:
event | position vector values |
---|---|
badtail |
The position of the corrupted message and the starting position of the next log file. |
badmsg |
The position of the corrupted message and the next message. |
reset |
The end position of the last message and the position of the first message after a reset. |
skip-forward |
The position in the archived log and the starting position of the next unarchived log file. |
The default implementation of .rt.on_event
only prints a message to standard out, for example:
q).rt.on_event`event`dir`position!(`reset;"streamid";130 17592186044416)
Reset event in streamid, rolling position from 130 to 17592186044416
This default implementation can be redefined, or the user can supply a custom event handler with a dictionary:
q) .rt.sub `stream`position`callback!("streamid";0;`message`event!({data;position] ...};{[event;position] ...}));
Unsubscribing
The subscriber can decide to stop receiving new messages. There are several ways of using the .rt.unsub
function:
-
.rt.unsub[]
can be called from a message or event callback and that stops the message stream for the subscription that the message or event was received on. -
.rt.unsub[streamid]
can be called at any point, whether inside a callback or not. This function requires thestreamid
as a parameter, which is the same ID provided during the subscription process using.rt.sub[streamid;pos;callback]
. This function unsubscribes from the specifiedstreamid
. -
.rt.unsub[param]
can be called at any point, whether inside a callback or not. This function requires theparam
as a parameter, which is the same dictionary provided during the subscription process using.rt.sub[param]
. This function unsubscribes from the specifiedparam
dictionary.
Pause and Resume
Pause
The subscriber can also decide to pause the delivery of incoming new messages while the subscribers local log files continue to grow, such that the delivery of messages to the subscriber can be resumed later.
There are two ways of doing this using the .rt.pause
function:
-
.rt.pause[]
can be called from a message or event callback and that pauses the message stream for the subscription the message or event was received on. -
.rt.pause[param]
can be called at any point, whether inside a callback or not. This function requires theparam
as a parameter, which is the same dictionary provided during the subscription process using.rt.sub[param]
. This function pauses the delivery from the specifiedparam
dictionary.
Resume
The subscriber can resume a paused stream using the .rt.resume
function:
.rt.resume[param]
can be called at any point, whether inside a callback or not. This function requires theparam
as a parameter, which is the same dictionary provided during the subscription process using.rt.sub[param]
. This function resumes the delivery from the specifiedparam
dictionary.
Filtering
The rt.qpk
provides an efficient way to filter messages that contain generic lists with one or two leading symbols, such as (upd;
trade;data).
To apply message filtering, you must create a special message processing callback function.
For example, to subscribe to all messages on a stream:
.rt.sub `stream`position`callback("streamid";0;callback);
To subscribe to a subset of the data on a stream, using only the trade and quote tables, use the following example:
cb:.rt.filter[{$[x=`.b;y in `trade`quote;0b]};callback];
.rt.sub `stream`position`callback("streamid";0;cb);
The filter only delivers the messages to the subscriber that satisfy the criteria.
The two types of filtering methods can be defined as:
- Filter with caching: to enable caching, the predicate has to return
boolean
back. - Filter without caching: for disabling caching, the given predicate has to return
long or int 0-1
back.
When any part of the filtering configuration changes (for example, modifications to the matching list of sym2
in the example below), you can clear the cache to ensure the updated filtering rules are applied.
t:enlist`quote;
1) enable caching - boolean predicate
f:filter[{$[x=`.b;y in t;0b]};callback];
.rt.sub `stream`position`callback("streamid";0;f);
t,:`trade;
2) clear cache
filter[f;`clear_cache];
3) disable caching - integer predicate
f:filter[{$[(x=`.b)&y in t;1;0]};callback];
.rt.sub `stream`position`callback("streamid";0;f);
Tracking
The rt.qpk
provides a type of callback function that only delivers the position and peek info (meta data) of each message without deserializing the whole message.
In addition to the position
, the meta data
is a dictionary with the following key members:
member | type | description |
---|---|---|
ts | timestamp | The timestamp when the message was recorded. |
to | integer | The timeout of how long the client waits for a response. |
corr | long | The correlator of the kxi message. |
orig | string | The origin name, a symbol representing who originated this message. |
s0 | string | The first leading symbol for filtering. |
s1 | string | The second leading symbol for filtering. |
len | integer | The length of the message. |
To subscribe to tracking, define a tracker callback using the .rt.helper
function where the x
parameter is the meta data
and y
is the position:
q) tr:.rt.tracker{0N!(`msg;y;x)};
q) rt.is_tracker[tr]
1b
q) d:`stream`position`callback!("mystream";`latest;tr);
q) .rt.sub d;
q) .rt.unsub d;
Garbage collection of subscriber stream log files
Adopt the RT garbage collection policy
The RT Archiver manages the garbage collection of all merged stream logs inside RT based on a configurable policy.
These archival options can be defined as part of the kdb Insights Enterprise Configuration.
By default, the subscriber adopts the garbage collection policy used by RT (opt-in). When log files are truncated in RT they are also truncated on all subscribers.
Subscriber opt-out
As a subscriber you may want to opt-out of the default behavior and independently manage the garbage collection of your local log files.
RT garbage collection API
If you choose to decouple the subscriber from RT's garbage collection you must use the API .rt.prune
. Removing RT log files through other means causes issues with the subscriber ingesting the RT stream.
To opt-out of the default behaviour, set the following environment variables before calling .rt.sub
.
$REPLICATOR_EXCHANGE_ARCHIVED=0
$REPLICATOR_TRUNCATE_ARCHIVED=0
The replicator is responsible for truncating the RT log files. Setting the environment variables in this way decouples the subscriber from RT's global garbage collection policy. Therefore, the subscriber must truncate their own local log files as messages are consumed and no longer need to be replayed (for example, on a restart). RT log files are truncated using the following API:
.rt.prune[param;pos]
where:
param
is the input dictionary as was passed tort.sub[param]
.pos
is the position returned from the subscriber callback.
The .rt.prune
function truncates all log files which have been rolled over, and where all the messages in the log file are before the specified position. You can find more details on truncation in the Archiver documentation. The .rt.prune
function call is an asynchronous, as it starts a thread that performs the truncation task and returns.
Lead subscriber
In another use case, one of the subscribers can observe when messages have been persisted downstream, either to a storage tier or another durable messaging system. Once this happens, those messages no longer need to be replayed to that subscriber or other subscribers.
A lead subscriber is required
This requires a deliberate design decision where one of the subscribers is configured as the lead.
In this scenario, the lead subscriber is responsible for driving the truncation of merged logs both in RT and the other subscribers.
The RT Archiver is still required
This would work alongside the RT Archiver which is still required to ensure RT itself doesn't run out of disk space if the lead subscriber fails. However, an early truncation decision by the lead subscriber takes precedence over the RT Archiver policy.
To do this, the lead subscriber must be set to the environment variable $REPLICATOR_EXCHANGE_ARCHIVED=1
before calling .rt.sub
. It can then use the .rt.prune
function, as described above, and its truncation decisions are then reflected in RT and the other subscribers (assuming they haven't opted-out).
Logging facilities
You can enable the logging feature of the rt.qpk
by setting the environment variable RT_LOGLEVEL_CONSOLE
to DEBUG
or TRACE
. With this setting, DEBUG level RT logs are generated when .rt.pub
, .rt.sub
, or .rt.get_latest_position
are called, and a one liner position log is created for .rt.sub
with latest
position.
All logs contain the rt-q-client
component name within the header info. The log format is in json
format by default, but you can change it to txt
format by setting the RT_LOGFORMAT_CONSOLE
environment variable to txt
.
You can also obtain log messages sent by the rt_helper
application (an external binary library for supporting all the replicator related tasks) from the kxi_c_sdk_logs
folder at the end of the path you set as rt_helper_log_path
member in the .rt.pub/.rt.sub
input parameter dictionary. This path also appears in the RT logs.
param:`stream`position`callback`rt_helper_log_path!("data";`latest;callback;"/s/mystream/logs");
.rt.sub param
# rt_helper logs: /s/mystream/logs/kxi_c_sdk_logs