Skip to content

Messaging

This module contains functions for interfacing with the pub/sub framework of Control. Components register as sources of or subscribers to streams of data. Messaging-enabled processes will register their interested topics with a set of Messaging Servers. These are discovery processes that match publishers to subscribers for a given topic. Once the source and subscribers are matched, a set of APIs is provided is provided for the publisher to push data between them.

For more information, please see the KX Stream documentation.

Subscription Modes

APIs for each of the subscription modes.

Segmented

  • .dm.regsub
  • .dm.regsubc
  • .dm.regsrc
  • .dm.regsrcc

Bulk

  • .dm.regsubbulk
  • .dm.regsrcbulk

Sharded

  • .dm.regsubshard

.dm.bdc.disable

Disables broadcast messaging for the current process. If broadcast publishing is enabled at the messaging server level, all processes will use it by default. This provides an override to disable it if required.

Example:

 .dm.bdc.disable[]

.dm.setRegHost

Sets the host to use for messaging. Should be called prior to registration with the messaging server. The API should be called as part of an instruction run on startup. In order to run the it before registration, the instruction should be added to the .al.preStartupInstructions parameter of the INSTANCE_CONFIG override.

Parameter:

Name Type Description
host symbol Registration host

Example:

 .dm.setRegHost[`host]

.dm.addCallback

Adds a callback function against a table. Function must have two parameters;

  • table name (symbol)
  • data.(table)

Parameters:

Name Type Description
table symbol Table name
function symbol Function name

Example:

 .dm.addCallback[`dxCPUStats; `updCPU]

.dm.applyCallbacks

Applies any function callbacks for incoming topic and data

Parameters:

Name Type Description
table symbol Table name
data table Message data

Example:

 updCPU:{[t;x] 0N!(t;x)}
 data:([] time:.z.p; sym:`server1`server2; cpu:20 40f);
 .dm.addCallback[`dxCPUStats; `updCPU]
 .dm.applyCallbacks[`dxCPUStats; data]

.dm.removeCallback

Removes a callback function against a table.

Parameters:

Name Type Description
table symbol Table name
function symbol Function name

Example:

 .dm.removeCallback[`dxCPUStats; `updCPU]

.dm.pubnoreply

Publishes a table name and data to subscribers except calling handle (handle=.z.w).

Parameters:

Name Type Description
t symbol Table name
x table Data to be published

Example:

 data:flip `time`sym`src!(.z.p; `$("EUR/USD";"EUR/GBP"); `DB);
 .dm.pubnoreply[`trade; data]

.dm.pub

Publishes a table name and data to all subscribers. Will apply any column or filtering for each subscription.

Parameters:

Name Type Description
t symbol Table name
x table Data to be published

Example:

 data:flip `time`sym`src!(.z.p; `$("EUR/USD";"EUR/GBP"); `DB);
 .dm.pub[`trade; data]

.dm.pubc

Publishes a table name and data to all subscribers on a given channel. Will apply any column or filtering for each subscription.

Parameters:

Name Type Description
t symbol Table name
x table Data to be published

Example:

 data:flip `time`sym`src!(.z.p; `$("EUR/USD";"EUR/GBP"); `DB);
 .dm.pubc[`channel; `trade; data]

.dm.pubdata

Publishes a table name and data to subscribers. See .dm.pub for full details. Published messsage has the format

(`.u.upd; `table; (col1...; col2..; col3...))

Publishing data in the list format results in a smaller message and performance gains can be seen in some circumstances. However filtering will only be applied at the table level.

Parameters:

Name Type Description
t symbol Table name
x list List of lists of column data

Example:

 data:(2#.z.p; `$("EUR/USD";"EUR/GBP"); `DB`UBS);
 .dm.pubdata[`trade; data]

.dm.pubdatac

Publishes a table name and data to subscribers on a given channel. See .dm.pub for full details. Published messsage has the format

(`.u.upd; `table; (col1...; col2..; col3...))

Publishing data in the list format results in a smaller message and performance gains can be seen in some circumstances. However filtering will only be applied at the table level.

Parameters:

Name Type Description
t symbol Table name
x list List of lists of column data

Example:

 data:(2#.z.p; `$("EUR/USD";"EUR/GBP"); `DB`UBS);
 .dm.pubdata[`channel; `trade; data]

.dm.pubflush

Publishes a table name and data to subscribers. Will flush each handle after publishing to ensure the data is sent immediately.

Parameters:

Name Type Description
t symbol Table name
x table Data to be published

Example:

 data:flip `time`sym`src!(.z.p; `$("EUR/USD";"EUR/GBP"); `DB);
 .dm.pubflush[`trade; data]

.dm.pubflushc

Publishes a table name and data to subscribers on a given channel. Will flush each handle after publishing to ensure the data is sent immediately.

Parameters:

Name Type Description
t symbol Table name
x table Data to be published

Example:

 data:flip `time`sym`src!(.z.p; `$("EUR/USD";"EUR/GBP"); `DB);
 .dm.pubflushc[`channel; `trade; data]

.dm.pubmult

Publishes multiple tables downstream in a single message. Messages have the format

q (`updM; `table1`table2`table3; ( data1 ; data2 ; data3 ))

Filtering is only applied at the table level

Parameters:

Name Type Description
t symbol[] Table names
x table[] List of table data

Example:

 t1:flip `time`sym`src!(.z.p; `$("EUR/USD";"EUR/GBP"); `DB);
 t2:flip `time`sym`cond!(.z.p; `$("EUR/USD";"EUR/GBP"); "AD");
 .dm.pubmult[`trade`quote; (t1; t2)]

.dm.pubmultc

Publishes multiple tables downstream in a single message to a given channel. Messages have the format;

q (`updM; `table1`table2`table3; ( data1 ; data2 ; data3 ))

Filtering is only applied at the table level

Parameters:

Name Type Description
t symbol[] Table names
x table[] List of table data

Example:

 t1:flip `time`sym`src!(.z.p; `$("EUR/USD";"EUR/GBP"); `DB);
 t2:flip `time`sym`cond!(.z.p; `$("EUR/USD";"EUR/GBP"); "AD");
 .dm.pubmultc[`channel; `trade`quote; (t1; t2)]

.dm.pubnoreplyc

Publishes a table name and data to subscribers on a given channel except calling handle (handle=.z.w).

Parameters:

Name Type Description
t symbol Table name
x table Data to be published

See Also: .dm.pubnoreply

Example:

 data:flip `time`sym`src!(.z.p; `$("EUR/USD";"EUR/GBP"); `DB);
 .dm.pubnoreplyc[`channel; `trade; data]

.dm.buildSubscriptionString

Creates a JSON messaging topic from a table and filter dictionary

Parameters:

Name Type Description
table symbol Table name
filter dict Dictionary of column names and filters

Example:

 .dm.buildSubscriptionString[`trade; `sym`src!(`$"EUR/USD"; `UBS`DB)]
 /=> "{\"trade\":{\"sym\":\"EUR/USD\",\"src\":[\"UBS\",\"DB\"]}}"

.dm.regsrc

Register as a publisher of a topic with a blank channel

Parameter:

Name Type Description
t string Topic

Example: Publisher of trade

 .dm.regsrc["trade"]

.dm.regsrcbulk

Registers as a bulk publisher of a channel and topic

Parameters:

Name Type Description
c symbol Channel name
t string Topic

Example: Register bulk sub for trade table

 syms:`$("EUR/USD";"USD/JPY";"GBP/USD");
 topic:.dm.buildSubscriptionString[`trade; enlist[`sym]!enlist[syms]]
 .dm.regsrcbulk[`tickerplant; topic]

.dm.regsrcc

Register as a topic source on a specific channel

Parameters:

Name Type Description
c symbol Channel name
t string Topic

Example: Publisher of trade on tickerplant channel

 .dm.regsrcc[`tickerplant; "trade"]

.dm.regsub

Register a subscription to a topic with a blank channel

Parameter:

Name Type Description
t string Topic

Example: Subscriber to trade

 .dm.regsub["trade"]

.dm.regsubbulk

Registers bulk subscription of a channel and topic

Parameters:

Name Type Description
c symbol Channel name
t string Topic

Example: Register bulk sub for trade table and three symbols

 syms:`$("EUR/USD";"USD/JPY";"GBP/USD");
 topic:.dm.buildSubscriptionString[`trade; enlist[`sym]!enlist[syms]]
 .dm.regsubbulk[`tickerplant; topic]

.dm.regsubc

Register as a subscriber to a topic on a specific channel

Parameters:

Name Type Description
c symbol Channel name
t string Topic

Example: Subscriber to trade on tickerplant channel

 .dm.regsubc[`tickerplant; "trade"]

.dm.regsubshard

Registers as a sharded subscriber for a channel and topic. The topic acts like a bulk subscriber but with an additional sharding filter. Hence the function signature is slightly different. It takes the usual elements of channel, table and filters but with an additional two params: column and filter. These indicate what column to perform the sharding on and what the condition is. An example shard would be sym like "[a-g]*". This would subscribe to a table for records where the sym column begins with characters a-g.

Parameters:

Name Type Description
c symbol Channel name
t symbol Table
col symbol Shard column
shard string Shard topic
filt dict Dictionary of additional filters

Example: Shard on sym

 .dm.regsubshard[`tickerplant; `trade; `sym; "[a-g]*"; ()!()]

Example: Shard with additional filter

 .dm.regsubshard[`tickerplant; `trade; `sym; "[a-g]*"; enlist[`src]!enlist `UBS`DB]

.dm.unsub

Unsubscribe from topic

Parameters:

Name Type Description
c symbol Unused parameter
t string Topic

Example: Unsubscribe from trade topic

 .dm.unsub[`; "trade"]

.dm.unsubbulk

Unsubscribe from a bulk channel and topic

Parameters:

Name Type Description
c symbol Channel
t symbol Topic

Example:

 syms:`$("EUR/USD";"USD/JPY";"GBP/USD");
 topic:.dm.buildSubscriptionString[`trade; enlist[`sym]!enlist[syms]]
 .dm.unsubbulk[`tickerplant; topic]

.dm.unsubc

Unsubscribe from topic and channel

Parameters:

Name Type Description
c symbol Channel name
t string Topic

Example: Unsubscribe from topic and channel

 .dm.unsubc[`tickerplant; "trade"]

Callbacks

This section details the APIs for adding functional callbacks on subscribed topics. It includes APIs to add and trigger callbacks for specified tables.

Publishing

The APIs in this module related to publishing data. They are called on a publisher process and send data downstream to any subscribed processes. The publisher will have a list of topics and associated process handles. The topics consist of table names and column filters. Each API (except .dm.pubdata) takes a name and table of data. The table will be filtered for each topic so each downstream process only receives the data it's interested in. The downstream consumer will receive a message in the form

(`upd; `table; (data))

.dm.pubdata is slightly different. Instead of the data being a table, it's a list of lists. It doesn't perform any filtering on the received data and publishes to .u.upd instead of upd. This is used for publishing to tickerplants where throughput is the priority.

(col1[]; col2[]; ...; colN[])

Multi-publish

The ability to publish multiple tables in a single message is available via .dm.pubmult. This means the publisher only has to serialize and send a single message instead of one per table. The message format is below.

(`updM; `table1`table2; ( table1 ; table2 ))

Row level filtering is not supported for multi-publish

API sets

There are five sets of publish APIs. In each set, the main API will publish to all channels and the other (with the trailing c suffix) will publish to a specific channel.

data:flip `time`sym`src!(.z.p; `$("EUR/USD";"EUR/GBP"); `DB);
.dm.pub[`trade; data]
.dm.pubc[`channel; `trade; data]

Null channel publish

When using the per-channel publish APIs, the behavior when using a null channel can be unexpected. See here for more information.

Registration

This section details the APIs for managing messaging topics. It includes APIs to build topics and register as a publisher or subscriber with the messaging servers.