Skip to content

Stream configuration

Streams move and sequence data and messages between components within kdb Insights. kdb Insights includes Reliable Transport (RT) as the primary stream bus. Custom streams can also be used, but they must comply with the RT interface.

Configuration

Within an assembly configuration, the bus section consists of a dictionary of stream entries. The names internal and external are suggested for a stream used for communication within the assembly, and communication with the outside world (perhaps other assemblies), but assemblies may contain further entries for user-defined purposes.

When running kdb Insights microservices, the bus field is required and lists the nodes to connect to.

bus:
  internal:
    protocol: rt
    topic: internal

Each bus entry provides:

name type required description
protocol string Yes The protocol of the messaging system, one of rt, tp or custom. See protocols below for details.
topic string No The name of this stream's topic. For RT streams, this is the name of the assembly concatenated to the topic name with a dash.
nodes string[] No Connection strings to the host and port of the node to connect to for subscribing to the bus.

Protocols

A bus configuration can use one of two possible protocols, rt or custom.

rt Protocol

For streams using RT, you must deploy the RT microservice. In a microservice deployment of RT, the combination of the topic and nodes must resolve to the hostname of the RT deployment.

tp Protocol

Ticker Plant protocol compatible with tick.q. The nodes parameter is used to determine the address to connect to, but only the first address on the list is used.

custom Protocol

The custom protocol can be used to provide a custom stream protocol. To connect to a custom stream, you must load a package that implements the bus interface as detailed below. The package must be loaded into all included DAPs and the Storage Manager.

Interface:

The bus interface must implement .kxbus.custom.pub and .kxbus.custom.sub with the following signatures.

Furthermore if .kxbus.custom.sub is executed, .kxbus.triggerReplayEnd[] must be called at some point to indicate that any replay has finished. This must be called even if the custom bus does not actually provide replay functionality. DAPs will not mark themselves as ready until this function has been called.

// @overview
// Returns a publisher function for publishing data into the stream. The function
// accepts a single `topic` parameter as provided from by the `bus` configuration.
// The returned function must accept one argument, the message payload to publish.
//
// @param config {dict} The bus configuration from the assembly.
// @return {fn(any)}
.kxbus.custom.pub: {[config]
  // Your implementation here ...
  }

// @overview
// Subscribes to a topic from an offset index with a callback function. The offset index
// is used to replay messages after a recover event. Messages that are sent to the subscriber
// must be sequential and in the same order when performing a replay. The callback function
// must be passed two arguments, a tuple of the a table name and data, and the current offset
// index.
//
// @param config {dict} The bus configuration from the assembly. It includes the following additional elements:
//                      pos      {any|long} the replay position, or (::) to resume from the beginning of stream
//                      callback {symbol}   the name of the callback function
// @return {null}
.kxbus.custom.sub:{[config]
  // Your implementation here ...
  .kxbus.triggerReplayEnd[];
  }