Messaging
Messaging is a publish-subscribe framework bundled with KX Stream. It runs in a server-client model with a messaging server process acting as the discovery service for processes that consume or publish data.
Processes register their topics and the server matches them with corresponding publisher or subscriber processes. Subscription topics are specified in JSON with a table and optional column filters. Both publishers and subscribers can specify filters. Helper APIs for generating JSON are described in the Template API guide.
Two Messaging server instances are bundled with Stream and can be run from the DS_launch_MS_{A|B}
workflows. Most of the Stream process templates contain a messagingServer
instance parameter, so to enable a process for messaging, set this to the DS_MESSAGING_SERVER:DS
config parameter, which contains the names of the two Stream instances. On startup, processes will then register with the Messaging servers and be available to publish and subscribe.
Example topics
Topic | Description |
---|---|
dxQuote |
Subscribe to or publish all messages in the dxQuote table. No filtering is applied at the publisher. |
{"dxQuote":{"sym":"EURUSD"}} |
Subscribe to dxQuote table where sym=`EURUSD |
{"dxQuote":{"sym":"EURUSD","src":"FD"}} |
Subscribe to dxQuote table for multiple sym=`EURUSD , src=`FD |
In addition to matching on topics, Messaging has the concept of a channel. This is a symbol tag that provides an extra layer to distinguish between the same topics. For example, if two tickerplants were running hot-hot, the subscriber would want to receive the data only once. This would be done by specifying two different channels. When a publisher and subscriber have the same channel and their topics overlap, the Messaging server will match them and initiate handshakes.
Subscription topics
When two processes have been matched and handshakes completed, subscription state is initiated on the publisher process. Whenever that process calls the publisher APIs, it will check the data being published against the current list of subscriptions. The APIs all take table name and data as parameters and will find subscriptions for the same table first. It will then apply any subscription filters to the data and publish the results.
There are multiple filtered topic modes that define how the publisher sends the data. The following sections describe them in detail. The APIs for each mode are detailed in the Template API Guide.
Segmented
In segmented mode, when multiple filter values are applied on the same column, the topic is split into separate subscriptions on the publisher. So three column values would result in three separate topics. If the publisher sent data with mixed column values, they would be filtered and published in separate batches. This is used when the subscriber wants to do bulk operations on one value at a time.
Multiple filters on one column:
{"dxQuote":{"sym":["EURUSD","GBPUSD"],"src":"FD"}}
Results in:
dxQuote where sym=`EURUSD, src=`FD
dxQuote where sym=`GBPUSD, src=`FD
Multiple filters on more than one column:
{"dxQuote":{"sym":["EURUSD","GBPUSD"],"src":["FD","Kx"]}}
Results in:
dxQuote where sym=`EURUSD, src=`FD
dxQuote where sym=`GBPUSD, src=`FD
dxQuote where sym=`EURUSD, src=`Kx
dxQuote where sym=`GBPUSD, src=`Kx
Bulk
In bulk mode, when multiple filter values are applied to the same column, the topic is not split. In this mode, subscribers will receive data filtered as specified but with all messages in one batch per topic.
Multiple filters on one column:
{"dxQuote":{"sym":["EURUSD","GBPUSD"],"src":"FD"}}
Results in:
dxQuote where sym in `EURUSD`GBPUSD, src=`FD
Multiple filters on more than one column:
{"dxQuote:{"sym":["EURUSD","GBPUSD"],"src":["FD","Kx"]}}
Results in:
dxQuote where sym in `EURUSD`GBPUSD, src in `FD`Kx
Sharded
This mode is supported for subscriptions only and is used when subscribers want to shard (split) data by a single column across multiple processes. For instance for load balancing, one process might take responsibility for stocks beginning A-K and a second for L-Z. The important thing to note is that the comprehensive list of instruments might not be known ahead of time and could change. Maintaining the list and dynamically updating subscriptions could be tricky to implement.
Instead, Messaging allows the developer to specify a shard column and regex condition to implement the split.
code.kx.com Cookbook/Regular expressions
This can be combined with other column filters, which are treated as bulk.
Shard filter only:
{"dxQuote":{"sym":[".q.like","[a-hA-H]*"]}}
Results in:
dxQuote where sym like "[a-hA-H]*"
Shard and multiple column filter:
{"dxQuote":{"sym":[".q.like","[a-hA-H]*"],"src":["FD","KX"]}}
Results in:
dxQuote where sym like "[a-hA-H]*", src in `FD`KX
Filters are currently supported only for symbol columns.
Topic hierarchy
Below is tabulated the combinations of subscription mode and publisher topics, given all the combinations of publisher, subscriber and topic type from the previous section. The vertical axis is the publisher type and the horizontal is subscriber type.
Subscribers have two other modes that aren’t supported for publishers:
- sharding is supported only for subscribers
- subscribers can also send a blank topic (empty string) to subscribe for all tables with no filters
blank | no filter | segmented | bulk | shard | |
---|---|---|---|---|---|
No filter | Match with no filter | Match with no filter | Segmented match of all syms | Bulk match of all syms | Shard match of all syms |
Segmented | Segmented match of all syms | Segmented match of all syms | Segmented match of overlapping syms | Segmented match of overlapping syms | Segmented match of overlapping syms |
Bulk | Bulk match of all syms | Bulk match of all syms | Segmented match of overlapping syms | Bulk match of overlapping syms | Bulk match of overlapping syms |
Combinations of publisher types (vertical) and subscriber types (horizontal)
Reconnections
Connection between components
When initial connection to a server component is trying to be established, there is no timeout provided by default. If the component is busy, the process will hang until it becomes available to connect. The initial connection is configurable with three different modes.
- off - initial connection made with no timeout
- timeout - initial connection made with timeout according to
.dm.i.initRetryTimeout.
Callback.dm.cb.connectionFailed
invoked on failure. - retry - makes the initial connection with a timeout and retries on a timer if this fails. If fails after max number of retries, will invoke .
dm.cb.connectionFailed
When a connection to another component drops, by default one of the processes will try to reconnect on a timer. The reconnect behavior is configurable via enhanced instance parameters. The table of parameters below details what options are available.
parameter | type | default | description |
---|---|---|---|
.dm.i.retryPriority |
int | 0 | Prioritise which end should perform the reconnection |
.dm.i.retryInterval |
int | 10000 | Interval between reconnect attempts (milliseconds) |
.dm.i.retryTimeout |
int | 500 | hopen connection timeout (milliseconds) |
.dm.i.retries |
int | 10 | Maximum number of reconnect attempts |
.dm.i.initRetryTimeout |
long | 30000 | Connection timeout |
.dm.i.initRetryMode |
symbol | off | Configurable mode for reconnect |
.dm.i.retryPriority
This instance parameter dictates the behavior when a messaging connection is dropped. In deciding which process will reconnect, the determination will be made as follows:
- process with lower priority
- if equal priority, use the one with the higher messaging ID
- if one of the processes has null a priority (0N), neither process will reconnect
Below is a table of scenarios with .dm.i.retryPriority
values for processes A and B and the reconnection initiator (assume A has higher messaging ID)
A | B | Initiator |
---|---|---|
0N | 0N | - |
0N | 1 | - |
10 | 0N | - |
10 | 1 | B |
10 | 10 | A |
It is clear that reconnects can be disabled for all, or a certain set of processes, by setting their priorities to null. Important processes should be set to high priority - lower priority processes will reconnect to them but not visa versa.
Connection from client to MS
If the connection from a client to the MS is dropped the retry hopen
timeout and repeat interval can be configured using an override of INSTANCE_CONFIG
Enhanced Instance Configuration
parameter | type | default | description |
---|---|---|---|
.dm.i.reconnFreq |
integer | 10000 | Interval between reconnect attempts (milliseconds) from client to MS when connection dropped |
.dm.i.openTimeout |
long | 250 | hopen connection timeout (milliseconds) from client to MS when connection dropped |
Connection between MS processes
For connections between processes in a server cluster the hopen
timeout and repeat interval can be configured using an override of INSTANCE_CONFIG
Enhanced Instance Configuration
parameter | type | default | description |
---|---|---|---|
.rpl.reconnFreq |
integer | 10000 | Interval between reconnect attempts (milliseconds) between MS processes in a cluster |
.rpl.openTimeout |
integer | 200 (windows)/1000 (linux) | hopen connection timeout (milliseconds) between MS processes in a cluster |
Table callbacks
The ability to specify function callbacks against a table is available in subscribers. Each table can have one or more functions to be executed whenever an update is received.
The standard model would be to;
- setup the table to function(s) callbacks on instance startup
- apply the callbacks whenever a new message is received to the upd or realTimeMsgFunct
This allows multiple instances to share one generic upd function but specify different callbacks per process.
An example of the API calls is provided below.
callback:{[t;x] 0N!(t;x) };
trade:([] time:.z.p; sym:`A`B; price:100 200.; size:1000 1000)
// add a callback
.dm.addCallback[`trade; `callback];
// execute callbacks
.dm.applyCallbacks[`trade; trade];
/=> (`trade;+`time`sym`price`size!(2018.08.06D05:42:44.188299000 2018.08.06D05:42:44.188299000;`A`B;100 200f;1000 1000))
// remove callback
.dm.removeCallback[`trade; `callback];
Performance tuning
The Messaging server template contains two parameters that can help improve messaging performance.
useUnixDomainSockets
- Enables the Unix Domain Socket functionality of kdb+. If publishers and subscribers reside on the same server and this option is enabled, they will communicate via domain sockets. This can show considerable efficiency gains over standard TCP/IP.
broadcastPublish
- Improves performance of publishers if multiple subscriptions exist for the same topic. Instead of the publisher serializing the same message for each duplicate subscription, it will serialize only once and broadcast-publish to multiple handles. Under the covers it maps to the
-25!
operator in kdb+.
Both modes are disabled by default.
Event callbacks
Callbacks can be set up for various messaging events. This can be accomplished via the INSTANCE_CONFIG parameter and overrides.
Enhanced Instance Configuration under the Developers section of the KX Control document for details of how to set up an INSTANCE_CONFIG override for a process.
callback | event | parameter | type |
---|---|---|---|
.dm.cb.connect |
When a connection is made to another process | remoteId | integer |
dict (keys host, port, tls, UDS) | dictionary | ||
dict[`host] | symbol | ||
dict[`port] | integer | ||
dict[`tls] | symbol | ||
dict[`UDS] | boolean | ||
.dm.cb.addSub (RTE) |
When a subscription is added to a publisher | subscriberId | integer |
dict (keys topic, channel, broadcast) | dictionary | ||
dict[`topic] | mixed list | ||
dict[`channel] | symbol list | ||
dict[`broadcast] | boolean | ||
.dm.cb.addSub (TP) |
When a subscription is added to a publisher | subscriberId | integer |
dict (keys topic, channel, broadcast, logFile, numMsgs, schema, logStreamer, instance, logFileH, numMsgsH, intradayLogFiles, intradayTables) | dictionary | ||
dict[`topic] | mixed list | ||
dict[`channel] | symbol list | ||
dict[`broadcast] | boolean | ||
dict[`logFile] | symbol | ||
dict[`numMsgs] | long | ||
dict[`schema] | mixed list | ||
dict[`logStreamer] | symbol | ||
dict[`instance] | symbol | ||
dict[`logFileH] | symbol | ||
dict[`numMsgsH] | long | ||
dict[`intradayLogFiles] | symbol list | ||
dict[`intradayTables] | symbol list | ||
.dm.cb.connected |
When connection to another process is confirmed | remoteId | integer |
.dm.cb.deleteSub |
When a subscription is removed from a publisher | subscriberId | integer |
dict (keys topic, channel) | dictionary | ||
dict[`topic] | string | ||
dict[`channel] | symbol | ||
.dm.cb.disconnect |
When a connection is removed due to dropped connection | remoteId | integer |
.dm.cb.receiveConn |
When a connection is received from another process | remoteId | integer |
dict (keys host, port, tls, UDS) | dictionary | ||
dict[`host] | integer | ||
dict[`port] | integer | ||
dict[`tls] | symbol | ||
dict[`UDS] | boolean | ||
.dm.cb.receiveReconn |
When a reconnection is confirmed on a process being reconnected to | remoteId | integer |
.dm.cb.reconnect |
When a reconnection is registered on a process making the reconnection | remoteId | integer |
dict (keys host, port, tls, UDS) | dictionary | ||
dict[`host] | integer | ||
dict[`port] | integer | ||
dict[`tls] | symbol | ||
dict[`UDS] | boolean | ||
.dm.cb.subConfirm (TP) |
When a successful subscription is confirmed | publisherId | integer |
dict (keys topic, channel, broadcast) | dictionary | ||
dict[`topic] | mixed list | ||
dict[`channel] | symbol list | ||
dict[`broadcast] | boolean | ||
.dm.cb.subConfirm (RTE) |
When a successful subscription is confirmed | publisherId | integer |
dict (keys topic, channel, broadcast, logFile, numMsgs, schema, logStreamer, instance, logFileH, numMsgsH, intradayLogFiles, intradayTables) | dictionary | ||
dict[`topic] | mixed list | ||
dict[`channel] | symbol list | ||
dict[`broadcast] | boolean | ||
dict[`logFile] | symbol | ||
dict[`numMsgs] | long | ||
dict[`schema] | mixed list | ||
dict[`logStreamer] | symbol | ||
dict[`instance] | symbol | ||
dict[`logFileH] | symbol | ||
dict[`numMsgsH] | long | ||
dict[`intradayLogFiles] | symbol list | ||
dict[`intradayTables] | symbol list | ||
.dm.cb.subDeleted |
When confirmation that a subscription has been deleted is received | publisherId | integer |
dict (keys topic, channel) | dictionary | ||
dict[`topic] | string | ||
dict[`channel] | symbol | ||
.dm.cb.connectionFailed |
When a unsuccessful connection or reconnection is made to another process | remoteId | integer |
dict (keys host, port, tls, UDS) | dictionary | ||
dict[`host] | symbol | ||
dict[`port] | integer | ||
dict[`tls] | symbol | ||
dict[`UDS] | boolean |