Messaging
This module contains functions for interfacing with the pub/sub framework of Control. Components register as sources of or subscribers to streams of data. Messaging-enabled processes will register their interested topics with a set of Messaging Servers. These are discovery processes that match publishers to subscribers for a given topic. Once the source and subscribers are matched, a set of APIs is provided is provided for the publisher to push data between them.
For more information, please see the KX Stream documentation.
Subscription Modes
APIs for each of the subscription modes.
Segmented
- .dm.regsub
- .dm.regsubc
- .dm.regsrc
- .dm.regsrcc
Bulk
- .dm.regsubbulk
- .dm.regsrcbulk
Sharded
- .dm.regsubshard
.dm.bdc.disable
Disables broadcast messaging for the current process. If broadcast publishing is enabled at the messaging server level, all processes will use it by default. This provides an override to disable it if required.
Example:
.dm.bdc.disable[]
.dm.setRegHost
Sets the host to use for messaging. Should be called prior to registration with the messaging server.
The API should be called as part of an instruction run on startup. In order to run the it before registration, the instruction
should be added to the .al.preStartupInstructions
parameter of the INSTANCE_CONFIG
override.
Parameter:
Name | Type | Description |
---|---|---|
host | symbol | Registration host |
Example:
.dm.setRegHost[`host]
.dm.addCallback
Adds a callback function against a table. Function must have two parameters;
- table name (symbol)
- data.(table)
Parameters:
Name | Type | Description |
---|---|---|
table | symbol | Table name |
function | symbol | Function name |
Example:
.dm.addCallback[`dxCPUStats; `updCPU]
.dm.applyCallbacks
Applies any function callbacks for incoming topic and data
Parameters:
Name | Type | Description |
---|---|---|
table | symbol | Table name |
data | table | Message data |
Example:
updCPU:{[t;x] 0N!(t;x)}
data:([] time:.z.p; sym:`server1`server2; cpu:20 40f);
.dm.addCallback[`dxCPUStats; `updCPU]
.dm.applyCallbacks[`dxCPUStats; data]
.dm.removeCallback
Removes a callback function against a table.
Parameters:
Name | Type | Description |
---|---|---|
table | symbol | Table name |
function | symbol | Function name |
Example:
.dm.removeCallback[`dxCPUStats; `updCPU]
.dm.pubnoreply
Publishes a table name and data to subscribers except calling handle (handle=.z.w
).
Parameters:
Name | Type | Description |
---|---|---|
t | symbol | Table name |
x | table | Data to be published |
Example:
data:flip `time`sym`src!(.z.p; `$("EUR/USD";"EUR/GBP"); `DB);
.dm.pubnoreply[`trade; data]
.dm.pub
Publishes a table name and data to all subscribers. Will apply any column or filtering for each subscription.
Parameters:
Name | Type | Description |
---|---|---|
t | symbol | Table name |
x | table | Data to be published |
Example:
data:flip `time`sym`src!(.z.p; `$("EUR/USD";"EUR/GBP"); `DB);
.dm.pub[`trade; data]
.dm.pubc
Publishes a table name and data to all subscribers on a given channel. Will apply any column or filtering for each subscription.
Parameters:
Name | Type | Description |
---|---|---|
t | symbol | Table name |
x | table | Data to be published |
Example:
data:flip `time`sym`src!(.z.p; `$("EUR/USD";"EUR/GBP"); `DB);
.dm.pubc[`channel; `trade; data]
.dm.pubdata
Publishes a table name and data to subscribers. See .dm.pub for full details. Published messsage has the format
(`.u.upd; `table; (col1...; col2..; col3...))
Publishing data in the list format results in a smaller message and performance gains can be seen in some circumstances. However filtering will only be applied at the table level.
Parameters:
Name | Type | Description |
---|---|---|
t | symbol | Table name |
x | list | List of lists of column data |
Example:
data:(2#.z.p; `$("EUR/USD";"EUR/GBP"); `DB`UBS);
.dm.pubdata[`trade; data]
.dm.pubdatac
Publishes a table name and data to subscribers on a given channel. See .dm.pub for full details. Published messsage has the format
(`.u.upd; `table; (col1...; col2..; col3...))
Publishing data in the list format results in a smaller message and performance gains can be seen in some circumstances. However filtering will only be applied at the table level.
Parameters:
Name | Type | Description |
---|---|---|
t | symbol | Table name |
x | list | List of lists of column data |
Example:
data:(2#.z.p; `$("EUR/USD";"EUR/GBP"); `DB`UBS);
.dm.pubdata[`channel; `trade; data]
.dm.pubflush
Publishes a table name and data to subscribers. Will flush each handle after publishing to ensure the data is sent immediately.
Parameters:
Name | Type | Description |
---|---|---|
t | symbol | Table name |
x | table | Data to be published |
Example:
data:flip `time`sym`src!(.z.p; `$("EUR/USD";"EUR/GBP"); `DB);
.dm.pubflush[`trade; data]
.dm.pubflushc
Publishes a table name and data to subscribers on a given channel. Will flush each handle after publishing to ensure the data is sent immediately.
Parameters:
Name | Type | Description |
---|---|---|
t | symbol | Table name |
x | table | Data to be published |
Example:
data:flip `time`sym`src!(.z.p; `$("EUR/USD";"EUR/GBP"); `DB);
.dm.pubflushc[`channel; `trade; data]
.dm.pubmult
Publishes multiple tables downstream in a single message. Messages have the format
q
(`updM; `table1`table2`table3; ( data1 ; data2 ; data3 ))
Filtering is only applied at the table level
Parameters:
Name | Type | Description |
---|---|---|
t | symbol[] | Table names |
x | table[] | List of table data |
Example:
t1:flip `time`sym`src!(.z.p; `$("EUR/USD";"EUR/GBP"); `DB);
t2:flip `time`sym`cond!(.z.p; `$("EUR/USD";"EUR/GBP"); "AD");
.dm.pubmult[`trade`quote; (t1; t2)]
.dm.pubmultc
Publishes multiple tables downstream in a single message to a given channel. Messages have the format;
q
(`updM; `table1`table2`table3; ( data1 ; data2 ; data3 ))
Filtering is only applied at the table level
Parameters:
Name | Type | Description |
---|---|---|
t | symbol[] | Table names |
x | table[] | List of table data |
Example:
t1:flip `time`sym`src!(.z.p; `$("EUR/USD";"EUR/GBP"); `DB);
t2:flip `time`sym`cond!(.z.p; `$("EUR/USD";"EUR/GBP"); "AD");
.dm.pubmultc[`channel; `trade`quote; (t1; t2)]
.dm.pubnoreplyc
Publishes a table name and data to subscribers on a given channel except calling handle (handle=.z.w
).
Parameters:
Name | Type | Description |
---|---|---|
t | symbol | Table name |
x | table | Data to be published |
See Also: .dm.pubnoreply
Example:
data:flip `time`sym`src!(.z.p; `$("EUR/USD";"EUR/GBP"); `DB);
.dm.pubnoreplyc[`channel; `trade; data]
.dm.buildSubscriptionString
Creates a JSON messaging topic from a table and filter dictionary
Parameters:
Name | Type | Description |
---|---|---|
table | symbol | Table name |
filter | dict | Dictionary of column names and filters |
Example:
.dm.buildSubscriptionString[`trade; `sym`src!(`$"EUR/USD"; `UBS`DB)]
/=> "{\"trade\":{\"sym\":\"EUR/USD\",\"src\":[\"UBS\",\"DB\"]}}"
.dm.regsrc
Register as a publisher of a topic with a blank channel
Parameter:
Name | Type | Description |
---|---|---|
t | string | Topic |
Example: Publisher of trade
.dm.regsrc["trade"]
.dm.regsrcbulk
Registers as a bulk publisher of a channel and topic
Parameters:
Name | Type | Description |
---|---|---|
c | symbol | Channel name |
t | string | Topic |
Example: Register bulk sub for trade table
syms:`$("EUR/USD";"USD/JPY";"GBP/USD");
topic:.dm.buildSubscriptionString[`trade; enlist[`sym]!enlist[syms]]
.dm.regsrcbulk[`tickerplant; topic]
.dm.regsrcc
Register as a topic source on a specific channel
Parameters:
Name | Type | Description |
---|---|---|
c | symbol | Channel name |
t | string | Topic |
Example: Publisher of trade on tickerplant channel
.dm.regsrcc[`tickerplant; "trade"]
.dm.regsub
Register a subscription to a topic with a blank channel
Parameter:
Name | Type | Description |
---|---|---|
t | string | Topic |
Example: Subscriber to trade
.dm.regsub["trade"]
.dm.regsubbulk
Registers bulk subscription of a channel and topic
Parameters:
Name | Type | Description |
---|---|---|
c | symbol | Channel name |
t | string | Topic |
Example: Register bulk sub for trade table and three symbols
syms:`$("EUR/USD";"USD/JPY";"GBP/USD");
topic:.dm.buildSubscriptionString[`trade; enlist[`sym]!enlist[syms]]
.dm.regsubbulk[`tickerplant; topic]
.dm.regsubc
Register as a subscriber to a topic on a specific channel
Parameters:
Name | Type | Description |
---|---|---|
c | symbol | Channel name |
t | string | Topic |
Example: Subscriber to trade on tickerplant channel
.dm.regsubc[`tickerplant; "trade"]
.dm.regsubshard
Registers as a sharded subscriber for a channel and topic. The topic acts like a bulk subscriber but with an additional sharding filter.
Hence the function signature is slightly different. It takes the usual elements of channel, table and filters but with an additional two params: column and filter.
These indicate what column to perform the sharding on and what the condition is.
An example shard would be sym like "[a-g]*"
. This would subscribe to a table for records where the sym
column begins with characters a-g
.
Parameters:
Name | Type | Description |
---|---|---|
c | symbol | Channel name |
t | symbol | Table |
col | symbol | Shard column |
shard | string | Shard topic |
filt | dict | Dictionary of additional filters |
Example: Shard on sym
.dm.regsubshard[`tickerplant; `trade; `sym; "[a-g]*"; ()!()]
Example: Shard with additional filter
.dm.regsubshard[`tickerplant; `trade; `sym; "[a-g]*"; enlist[`src]!enlist `UBS`DB]
.dm.unsub
Unsubscribe from topic
Parameters:
Name | Type | Description |
---|---|---|
c | symbol | Unused parameter |
t | string | Topic |
Example: Unsubscribe from trade topic
.dm.unsub[`; "trade"]
.dm.unsubbulk
Unsubscribe from a bulk channel and topic
Parameters:
Name | Type | Description |
---|---|---|
c | symbol | Channel |
t | symbol | Topic |
Example:
syms:`$("EUR/USD";"USD/JPY";"GBP/USD");
topic:.dm.buildSubscriptionString[`trade; enlist[`sym]!enlist[syms]]
.dm.unsubbulk[`tickerplant; topic]
.dm.unsubc
Unsubscribe from topic and channel
Parameters:
Name | Type | Description |
---|---|---|
c | symbol | Channel name |
t | string | Topic |
Example: Unsubscribe from topic and channel
.dm.unsubc[`tickerplant; "trade"]
Callbacks
This section details the APIs for adding functional callbacks on subscribed topics. It includes APIs to add and trigger callbacks for specified tables.
Publishing
The APIs in this module related to publishing data. They are called on a publisher process and send data downstream to any subscribed processes.
The publisher will have a list of topics and associated process handles. The topics consist of table names and column filters.
Each API (except .dm.pubdata
) takes a name and table of data. The table will be filtered for each topic so each downstream process
only receives the data it's interested in. The downstream consumer will receive a message in the form
(`upd; `table; (data))
.dm.pubdata
is slightly different. Instead of the data being a table, it's a list of lists. It doesn't
perform any filtering on the received data and publishes to .u.upd instead of upd. This is used for publishing to tickerplants where throughput is the priority.
(col1[]; col2[]; ...; colN[])
Multi-publish
The ability to publish multiple tables in a single message is available via .dm.pubmult
. This means the publisher only has to
serialize and send a single message instead of one per table. The message format is below.
(`updM; `table1`table2; ( table1 ; table2 ))
Row level filtering is not supported for multi-publish
API sets
There are five sets of publish APIs. In each set, the main API will publish to all channels and the other (with the trailing c suffix) will publish to a specific channel.
data:flip `time`sym`src!(.z.p; `$("EUR/USD";"EUR/GBP"); `DB);
.dm.pub[`trade; data]
.dm.pubc[`channel; `trade; data]
Null channel publish
When using the per-channel publish APIs, the behavior when using a null channel can be unexpected. See here for more information.
Registration
This section details the APIs for managing messaging topics. It includes APIs to build topics and register as a publisher or subscriber with the messaging servers.