Skip to content

Messaging

Messaging is a publish-subscribe framework bundled with KX Stream. It runs in a server-client model with a messaging server process acting as the discovery service for processes that consume or publish data.

Processes register their topics and the server matches them with corresponding publisher or subscriber processes. Subscription topics are specified in JSON with a table and optional column filters. Both publishers and subscribers can specify filters. Helper APIs for generating JSON are described in the Template API guide.

Two Messaging server instances are bundled with Stream and can be run from the DS_launch_MS_{A|B} workflows. Most of the Stream process templates contain a messagingServer instance parameter, so to enable a process for messaging, set this to the DS_MESSAGING_SERVER:DS config parameter, which contains the names of the two Stream instances. On startup, processes will then register with the Messaging servers and be available to publish and subscribe.

Example topics

Topic Description
dxQuote Subscribe to or publish all messages in the dxQuote table. No filtering is applied at the publisher.
{"dxQuote":{"sym":"EURUSD"}} Subscribe to dxQuote table where sym=`EURUSD
{"dxQuote":{"sym":"EURUSD","src":"FD"}} Subscribe to dxQuote table for multiple sym=`EURUSD, src=`FD

In addition to matching on topics, Messaging has the concept of a channel. This is a symbol tag that provides an extra layer to distinguish between the same topics. For example, if two tickerplants were running hot-hot, the subscriber would want to receive the data only once. This would be done by specifying two different channels. When a publisher and subscriber have the same channel and their topics overlap, the Messaging server will match them and initiate handshakes.

Subscription topics

When two processes have been matched and handshakes completed, subscription state is initiated on the publisher process. Whenever that process calls the publisher APIs, it will check the data being published against the current list of subscriptions. The APIs all take table name and data as parameters and will find subscriptions for the same table first. It will then apply any subscription filters to the data and publish the results.

There are multiple filtered topic modes that define how the publisher sends the data. The following sections describe them in detail. The APIs for each mode are detailed in the Template API Guide.

Segmented

In segmented mode, when multiple filter values are applied on the same column, the topic is split into separate subscriptions on the publisher. So three column values would result in three separate topics. If the publisher sent data with mixed column values, they would be filtered and published in separate batches. This is used when the subscriber wants to do bulk operations on one value at a time.

Multiple filters on one column:

{"dxQuote":{"sym":["EURUSD","GBPUSD"],"src":"FD"}}

Results in:

dxQuote where sym=`EURUSD, src=`FD
dxQuote where sym=`GBPUSD, src=`FD

Multiple filters on more than one column:

{"dxQuote":{"sym":["EURUSD","GBPUSD"],"src":["FD","Kx"]}}

Results in:

dxQuote where sym=`EURUSD, src=`FD
dxQuote where sym=`GBPUSD, src=`FD
dxQuote where sym=`EURUSD, src=`Kx
dxQuote where sym=`GBPUSD, src=`Kx

Bulk

In bulk mode, when multiple filter values are applied to the same column, the topic is not split. In this mode, subscribers will receive data filtered as specified but with all messages in one batch per topic.

Multiple filters on one column:

{"dxQuote":{"sym":["EURUSD","GBPUSD"],"src":"FD"}}

Results in:

dxQuote where sym in `EURUSD`GBPUSD, src=`FD

Multiple filters on more than one column:

{"dxQuote:{"sym":["EURUSD","GBPUSD"],"src":["FD","Kx"]}}

Results in:

dxQuote where sym in `EURUSD`GBPUSD, src in `FD`Kx

Sharded

This mode is supported for subscriptions only and is used when subscribers want to shard (split) data by a single column across multiple processes. For instance for load balancing, one process might take responsibility for stocks beginning A-K and a second for L-Z. The important thing to note is that the comprehensive list of instruments might not be known ahead of time and could change. Maintaining the list and dynamically updating subscriptions could be tricky to implement.

Instead, Messaging allows the developer to specify a shard column and regex condition to implement the split.
code.kx.com Cookbook/Regular expressions

This can be combined with other column filters, which are treated as bulk.

Shard filter only:

{"dxQuote":{"sym":[".q.like","[a-hA-H]*"]}}

Results in:

dxQuote where sym like "[a-hA-H]*"

Shard and multiple column filter:

{"dxQuote":{"sym":[".q.like","[a-hA-H]*"],"src":["FD","KX"]}}

Results in:

dxQuote where sym like "[a-hA-H]*", src in `FD`KX

Filters are currently supported only for symbol columns.

Topic hierarchy

Below is tabulated the combinations of subscription mode and publisher topics, given all the combinations of publisher, subscriber and topic type from the previous section. The vertical axis is the publisher type and the horizontal is subscriber type.

Subscribers have two other modes that aren’t supported for publishers:

  • sharding is supported only for subscribers
  • subscribers can also send a blank topic (empty string) to subscribe for all tables with no filters
blank no filter segmented bulk shard
No filter Match with no filter Match with no filter Segmented match of all syms Bulk match of all syms Shard match of all syms
Segmented Segmented match of all syms Segmented match of all syms Segmented match of overlapping syms Segmented match of overlapping syms Segmented match of overlapping syms
Bulk Bulk match of all syms Bulk match of all syms Segmented match of overlapping syms Bulk match of overlapping syms Bulk match of overlapping syms

Combinations of publisher types (vertical) and subscriber types (horizontal)

Reconnections

Connection between components

When initial connection to a server component is trying to be established, there is no timeout provided by default. If the component is busy, the process will hang until it becomes available to connect. The initial connection is configurable with three different modes.

  • off - initial connection made with no timeout
  • timeout - initial connection made with timeout according to .dm.i.initRetryTimeout. Callback .dm.cb.connectionFailed invoked on failure.
  • retry - makes the initial connection with a timeout and retries on a timer if this fails. If fails after max number of retries, will invoke .dm.cb.connectionFailed

When a connection to another component drops, by default one of the processes will try to reconnect on a timer. The reconnect behavior is configurable via enhanced instance parameters. The table of parameters below details what options are available.

parameter type default description
.dm.i.retryPriority int 0 Prioritise which end should perform the reconnection
.dm.i.retryInterval int 10000 Interval between reconnect attempts (milliseconds)
.dm.i.retryTimeout int 500 hopen connection timeout (milliseconds)
.dm.i.retries int 10 Maximum number of reconnect attempts
.dm.i.initRetryTimeout long 30000 Connection timeout
.dm.i.initRetryMode symbol off Configurable mode for reconnect

.dm.i.retryPriority

This instance parameter dictates the behavior when a messaging connection is dropped. In deciding which process will reconnect, the determination will be made as follows:

  • process with lower priority
  • if equal priority, use the one with the higher messaging ID
  • if one of the processes has null a priority (0N), neither process will reconnect

Below is a table of scenarios with .dm.i.retryPriority values for processes A and B and the reconnection initiator (assume A has higher messaging ID)

A B Initiator
0N 0N -
0N 1 -
10 0N -
10 1 B
10 10 A

It is clear that reconnects can be disabled for all, or a certain set of processes, by setting their priorities to null. Important processes should be set to high priority - lower priority processes will reconnect to them but not visa versa.

Connection from client to MS

If the conection from a client to the MS is dropped the retry hopen timeout and repeat interval can be configured using an override of INSTANCE_CONFIG

Enhanced Instance Configuration

parameter type default description
.dm.i.reconnFreq integer 10000 Interval between reconnect attempts (milliseconds) from client to MS when connection dropped
.dm.i.openTimeout long 250 hopen connection timeout (milliseconds) from client to MS when connection dropped

Connection between MS processes

For connections between processes in a server cluster the hopen timeout and repeat interval can be configured using an override of INSTANCE_CONFIG

Enhanced Instance Configuration

parameter type default description
.rpl.reconnFreq integer 10000 Interval between reconnect attempts (milliseconds) between MS processes in a cluster
.rpl.openTimeout integer 200 (windows)/1000 (linux) hopen connection timeout (milliseconds) between MS processes in a cluster

Table callbacks

The ability to specify function callbacks against a table is available in subscribers. Each table can have one or more functions to be executed whenever an update is received.

The standard model would be to;

  • setup the table to function(s) callbacks on instance startup
  • apply the callbacks whenever a new message is received to the upd or realTimeMsgFunct

This allows multiple instances to share one generic upd function but specify different callbacks per process.

An example of the API calls is provided below.

 callback:{[t;x] 0N!(t;x) };
 trade:([] time:.z.p; sym:`A`B; price:100 200.; size:1000 1000)

 // add a callback
 .dm.addCallback[`trade; `callback];

 // execute callbacks
 .dm.applyCallbacks[`trade; trade];
 /=> (`trade;+`time`sym`price`size!(2018.08.06D05:42:44.188299000 2018.08.06D05:42:44.188299000;`A`B;100 200f;1000 1000))

 // remove callback
 .dm.removeCallback[`trade; `callback];

Performance tuning

The Messaging server template contains two parameters that can help improve messaging performance.

useUnixDomainSockets
Enables the Unix Domain Socket functionality of kdb+. If publishers and subscribers reside on the same server and this option is enabled, they will communicate via domain sockets. This can show considerable efficiency gains over standard TCP/IP.
broadcastPublish
Improves performance of publishers if multiple subscriptions exist for the same topic. Instead of the publisher serializing the same message for each duplicate subscription, it will serialize only once and broadcast-publish to multiple handles. Under the covers it maps to the -25! operator in kdb+.

Both modes are disabled by default.

Event callbacks

Callbacks can be set up for various messaging events. This can be accomplished via the INSTANCE_CONFIG parameter and overrides.

Enhanced Instance Configuration under the Developers section of the KX Control document for details of how to set up an INSTANCE_CONFIG override for a process.

callback event parameter type
.dm.cb.connect When a connection is made to another process remoteId integer
dict (keys host, port, tls, UDS) dictionary
dict[`host] symbol
dict[`port] integer
dict[`tls] symbol
dict[`UDS] boolean
.dm.cb.addSub (RTE) When a subscription is added to a publisher subscriberId integer
dict (keys topic, channel, broadcast) dictionary
dict[`topic] mixed list
dict[`channel] symbol list
dict[`broadcast] boolean
.dm.cb.addSub (TP) When a subscription is added to a publisher subscriberId integer
dict (keys topic, channel, broadcast, logFile, numMsgs, schema, logStreamer, instance, logFileH, numMsgsH, intradayLogFiles, intradayTables) dictionary
dict[`topic] mixed list
dict[`channel] symbol list
dict[`broadcast] boolean
dict[`logFile] symbol
dict[`numMsgs] long
dict[`schema] mixed list
dict[`logStreamer] symbol
dict[`instance] symbol
dict[`logFileH] symbol
dict[`numMsgsH] long
dict[`intradayLogFiles] symbol list
dict[`intradayTables] symbol list
.dm.cb.connected When connection to another process is confirmed remoteId integer
.dm.cb.deleteSub When a subscription is removed from a publisher subscriberId integer
dict (keys topic, channel) dictionary
dict[`topic] string
dict[`channel] symbol
.dm.cb.disconnect When a connection is removed due to dropped connection remoteId integer
.dm.cb.receiveConn When a connection is received from another process remoteId integer
dict (keys host, port, tls, UDS) dictionary
dict[`host] integer
dict[`port] integer
dict[`tls] symbol
dict[`UDS] boolean
.dm.cb.receiveReconn When a reconnection is confirmed on a process being reconnected to remoteId integer
.dm.cb.reconnect When a reconnection is registered on a process making the reconnection remoteId integer
dict (keys host, port, tls, UDS) dictionary
dict[`host] integer
dict[`port] integer
dict[`tls] symbol
dict[`UDS] boolean
.dm.cb.subConfirm (TP) When a successful subscription is confirmed publisherId integer
dict (keys topic, channel, broadcast) dictionary
dict[`topic] mixed list
dict[`channel] symbol list
dict[`broadcast] boolean
.dm.cb.subConfirm (RTE) When a successful subscription is confirmed publisherId integer
dict (keys topic, channel, broadcast, logFile, numMsgs, schema, logStreamer, instance, logFileH, numMsgsH, intradayLogFiles, intradayTables) dictionary
dict[`topic] mixed list
dict[`channel] symbol list
dict[`broadcast] boolean
dict[`logFile] symbol
dict[`numMsgs] long
dict[`schema] mixed list
dict[`logStreamer] symbol
dict[`instance] symbol
dict[`logFileH] symbol
dict[`numMsgsH] long
dict[`intradayLogFiles] symbol list
dict[`intradayTables] symbol list
.dm.cb.subDeleted When confirmation that a subscription has been deleted is received publisherId integer
dict (keys topic, channel) dictionary
dict[`topic] string
dict[`channel] symbol
.dm.cb.connectionFailed When a unsuccessful connection or reconnection is made to another process remoteId integer
dict (keys host, port, tls, UDS) dictionary
dict[`host] symbol
dict[`port] integer
dict[`tls] symbol
dict[`UDS] boolean
Back to top