Skip to content

kdb Insights Streams in Pipelines

This page explains how to use kdb Insights Streams (Reliable Transport) within pipelines.

Overview

In kdb Insights, Streams are the deployment form of Reliable Transport (RT) clusters and provide highly available, high‑performance messaging.

A stream is an instance of RT (typically a 3‑node Raft cluster for HA, or 1 node for fault tolerance without HA), which reliably accepts published messages and makes them available to subscribers.

A Stream Processor pipeline interacts with kdb Insights Streams in two ways:

  • As a subscriber (via a stream reader): the pipeline consumes messages from an RT stream (optionally filtered by topic/table).
  • As a publisher (via a stream writer): the pipeline publishes messages into an RT stream (again optionally labelled to support downstream filtering).

Writing to a kdb Insights Stream is explicitly positioned for two main use cases: chaining pipelines (pipeline-to-pipeline via a stream) and enabling database ingestion flows.

If your explicit goal is persisting to a database, the Stream Processor documentation recommends using the Database Writer (rather than writing to a stream and relying on downstream ingestion) because the Database Writer includes schema validation capabilities.

The following example shows how data would flow from a feedhandler into a kdb Insights Database through the pipeline utilizing Streams:

  1. A Feedhandler publishes data to a Stream
  2. The pipeline subscribes to the Stream
  3. The pipeline transforms the data into a format that can be written to the database
  4. The pipeline publishes the transformed data downstream, to be stored in the database or processed by other subscribers

Note

In this example the pipeline acts as a publisher and a subscriber.

When configuring any integration between a Stream and a pipeline, the Stream name must be configured. Additionally, if using Topic filtering as a:

  • Publisher - The pipeline must also set a topic for the data it publishes, facilitating topic filtering performed by any downstream subscribers.
  • Subscriber - The pipeline must also set a topic to filter incoming data.

Refer to Constructing the stream parameter and Topic Filtering for more information.

Version 2 benefits

We strongly recommend using Version 2 of the stream reader and writer.

The reader benefits are:

  • Improved performance and lower resource consumption compared with v1
  • Server‑side topic filtering (rather than client‑side filtering)
  • Reduced resource use because it does not replicate stream logs to the pipeline, which lowers CPU and disk consumption in the pipeline runtime.

These points are particularly relevant for production deployments where pipelines read from high‑volume RT streams and you want filtering to occur upstream, rather than paying for local replication + discard.

The writer benefits are:

  • Improved support for multiple stream writers
  • More robust failure/recovery behavior, particularly for recovery scenarios involving multiple writers publishing into the same stream.

Stream reader configuration

A stream reader consumes data from an RT stream and can optionally apply a filter. You must specify the stream the pipeline will subscribe to and you may also provide a topic to filter the data. In the v1 reader, this is called the table name.

When starting a pipeline up fresh, you can choose to replay the stream from the start or begin consuming from the end. On subsequent restarts, the reader resumes from the last checkpointed position rather than replaying from the beginning.

The table below consolidates the common reader parameters you need to set. The exact names might differ across the q, Python, and UI interfaces.

Parameter Description Default / typical value
stream Selects which RT stream to subscribe to. Required
topics Filters incoming messages to only those labeled with that topic. Leaving it blank consumes all messages. Called table in v1. No filter (all topics)
position Determines where to start consuming from: start (replay from beginning) or end (only new data). start
prefix Used where your stream's hostname has a prefix. Refer to the Constructing the stream parameter section for deployment‑specific defaults.

For UI-based pipelines that use the v1 reader, the stream parameter is not configured within the reader itself. Instead, it is defined in the Assembly Integration section of the Settings panel.

Stream writer configuration

Database ingestion

The stream writer publishes directly to an RT stream. This supports pipeline chaining and can be placed upstream of database ingestion. However, if the goal is to persist data to a database, you should use the Database Writer instead, as it provides schema validation and handles database ingestion properly.

The table below consolidates the common writer parameters you need to set. The exact names might differ across the q, Python, and UI interfaces.

Parameter Description Default / typical value
stream Selects the RT stream to publish into. Required
topic Labels outgoing messages with this topic. Called table in v1 Optional
prefix Used where your stream's hostname has a prefix. See the “Constructing the stream parameter” section for deployment‑specific defaults.
deduplicate Helps ensure that duplicates created during failure recovery do not appear downstream, but requires deterministic processing. If $KXI_ALLOW_NONDETERMINISM is set, deduplicate should be set off, potentially allowing duplicates. true

When configuring pipelines in the web interface with the v1 writer, the stream parameter is not defined in the writer configuration itself. Instead, it is set in the Assembly Integration section of the Settings panel.

Constructing the stream parameter

To publish or subscribe to data successfully, the pipeline must specify the stream using the stream parameter. This is then used in conjunction with the topic prefix to determine the endpoint that the pipeline will connect to.

The topic prefix is a standalone parameter that controls the hostname/DNS prefix used to locate RT sequencers. The pipeline and deployment type both dictate the default value and behavior, and the naming convention depends on how and where the Stream is deployed.

The stream and prefix parameters can differ depending on whether kdb Insights Enterpise or kdb Insights SDK is being used. To demonstrate this we will use the default value of "rt-" as the topic prefix, "dataStream" as the Stream name, and "ingestPackage" as the package name.

  • In kdb Insights Enterprise - the stream parameter should be a combination of the stream name and the name of package the Stream was defined in. For example: ingestPackage-dataStream
  • This is then used in conjunction with the default prefix value of "rt-" to determine the fully qualified stream endpoint: rt-ingestPackage-dataStream.
  • Pipelines created using the web interface will automatically set the prefix correctly. Code pipelines created in q or Python will need them set explicitly, i.e.:

    .qsp.v2.read.fromStream[(::); "ingestPackage-dataStream"; .qsp.use enlist[`prefix]!enlist "rt-"]
    

  • In kdb Insights SDK - the name can vary depending on how the Stream is deployed. You should set the stream parameter to match the name, for example: dataStream

  • If the Stream is not prefixed, you must ensure the prefix parameter is not set. The default prefix value is empty, and the fully qualified endpoint remains dataStream. If the stream is prefixed, set the prefix value accordingly.

Subscription topic filtering

Topic filtering is the process of receiving a certain subsection of data within a Stream, defined by message topic. This can be useful in many scenarios that call for only specific data to be processed, and would otherwise require manual filtering as a part of pipeline logic. If the topic parameter is not supplied, the default behaviour is to subscribe to all data in a Stream.

Note

Topic filtering is also known as table filtering, and for the purposes of this document, the terms 'topic' and 'table' are interchangeable.

There are some differences in how topic filtering works between Version 1 and Version 2. Version 1 of the Stream Reader performs topic filtering 'client-side'. If you populate the topic parameter, the pipeline subscribes to all data and then filters based on the provided value as it receives the data. This approach increases CPU and disk overhead because the pipeline must discard unwanted data. Building on the examples above:

```q
.qsp.read.fromStream[`topic1; "ingestPackage-dataStream"; .qsp.use enlist[`prefix]!enlist "rt-"]
```

The Version 2 Stream Reader uses Stream APIs to configure the Stream and perform the topic filtering 'server-side'. This means data is filtered in the Stream not the pipeline. You can also specify more than one topic you wish to subscribe to. The pipeline then receives only data published under those topics, without requiring additional overhead. Additionally, V2 does not replicate any stream log files locally, meaning the CPU and disk overhead is even further reduced. Again building on the above examples:

```q
.qsp.v2.read.fromStream[`topic1`topic2; "ingestPackage-dataStream"; .qsp.use enlist[`prefix]!enlist "rt-"]
```

When publishing, the topic parameter is used to label messages with a topic.

Further reading