Stream configuration
Streams move and sequence data and messages between components within kdb Insights. kdb Insights includes Reliable Transport (RT) as the primary stream bus. Custom streams can also be used, but they must comply with the RT interface.
Configuration
Within an assembly configuration, the bus
section consists of a dictionary of stream entries. The names internal
and external
are suggested for a stream used for communication within the assembly, and communication with the outside world (perhaps other assemblies), but assemblies may contain further entries for user-defined purposes.
When running kdb Insights microservices, the bus field is required and lists the nodes to connect to.
bus:
internal:
protocol: rt
topic: internal
Each bus entry provides:
name | type | required | description |
---|---|---|---|
protocol |
string | Yes | The protocol of the messaging system, one of rt , tp or custom . See protocols below for details. |
topic |
string | No | The name of this stream's topic. For RT streams, this is the name of the assembly concatenated to the topic name with a dash. |
nodes |
string[] | No | Connection strings to the host and port of the node to connect to for subscribing to the bus. |
Protocols
A bus configuration can use one of two possible protocols, rt
or custom
.
rt
Protocol
For streams using RT, you must deploy the RT microservice. In a microservice deployment of RT, the combination of the topic
and nodes
must resolve to the hostname of the RT deployment.
tp
Protocol
Ticker Plant protocol compatible with tick.q
. The nodes
parameter is used to determine the address to connect to, but only the first address on the list is used.
custom
Protocol
The custom protocol can be used to provide a custom stream protocol. To connect to a custom stream, you must load a package that implements the bus interface as detailed below. The package must be loaded into all included DAPs and the Storage Manager.
Interface:
The bus interface must implement .kxbus.custom.pub
and .kxbus.custom.sub
with the following signatures.
// @overview
// Returns a publisher function for publishing data into the stream. The function
// accepts a single `topic` parameter as provided from by the `bus` configuration.
// The returned function must accept one argument, the message payload to publish.
//
// @param config {dict} The bus configuration from the assembly.
// @return {fn(any)}
.kxbus.custom.pub: {[config]
// Your implementation here ...
}
// @overview
// Subscribes to a topic from an offset index with a callback function. The offset index
// is used to replay messages after a recover event. Messages that are sent to the subscriber
// must be sequential and in the same order when performing a replay. The callback function
// must be passed two arguments, a tuple of the a table name and data, and the current offset
// index.
//
// @param config {dict} The bus configuration from the assembly. It includes the following additional elements:
// pos {long} the replay position, or null to resume from the end of stream
// callback {symbol} the name of the callback function
// @return {null}
.kxbus.custom.sub:{[config]
// Your implementation here ...
}