Skip to content

Ingesting to a kdb Insights Database

One of the core use-cases for the Stream Processor is to ingest data into the kdb Insights Database. This is implemented using the kdb Insights Database write operator. It operates in two modes;

  • Streaming - where data is streamed to the database and stored in-memory. This is primarily used for real-time data.
  • Direct write - where data is written directly to the database's disk and ingested in batches. This is primarily used for larger datasets and backfill operations as it reduces resource overhead.

This page primarily deals with the Direct write mode.

Database writer versions

The Database Writer has multiple versions but this document focuses on the v2 as it provides significantly improved functionality and user experience compared to v1. The complete differences between the two versions are outlined below.

v2 q interface

To specify explicitly the v2 Database Writer in a q code pipeline, use the following syntax:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.v2.write.toDatabase[`quotes; `taq];

For details, see .qsp.v2.write.toDatabase. Note that, at the moment, the default .qsp.write.toDatabase is v1 of the operator. You can explicitly choose the v1 by using .qsp.v1.write.toDatabase.

v2 Python interface

To specify explicitly the v2 Database Writer in a python code pipeline, use the following syntax:

>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx

>>> sp.run(sp.read.from_callback('publish')
        | sp.write.to_database(table='trade', database='taq', api_version=2))
For details, see kxi.sp.write.to_database. Note that, at the moment, the default if api_version is not set is v1 of the operator. You can explicitly choose the v1 by setting api_version to 1.

kdb Insights Enterprise

The v2 database writer is available in the kdb Insights Enterprise Web Interface from a version dropdown. See the UI docs.

Triggering writedowns

With the v2 writer it may be necessary to manually trigger the database writedowns depending on your pipeline specification. If your pipeline reaches a FINISHED status, it automatically triggers a writedown to the database. This is known as a bounded pipeline and relates to situations where there is a finite amount of data to process. For example, if the pipeline has one or more S3 readers with a fixed list of files. The pipeline completes once it has processed all of the data and trigger in the database ingest.

In other situations, your pipeline runs indefinitely and never reaches a FINISHED state. This is known as an unbounded pipeline and consequently it doesn't automatically trigger a database ingest. Examples of unbounded pipelines would be ones with readers like the kdb Insights Stream, Kafka, or a file reader (S3, Google, Azure) with watching enabled. In these cases, the pipeline needs to be instructed when to trigger the writedown.

There are two methods to do this;

v1 support

Unbounded pipelines in the v1 database writer operator are not supported out of the box so the following examples focus on v2.

Example 1: File watching in q

See .qsp.read.fromAmazonS3 for how to correctly configure for watching and setting credentials. See .qsp.triggerWrite for more details on the .qsp.triggerWrite API.

.qsp.run
    .qsp.read.fromAmazonS3["s3://bucket/trades*.txt"; .qsp.use `region`mode`watch!(`$"us-east-2";`text;1b)]
    .qsp.decode.csv[([]file:(); time:`timestamp$();ticker:`symbol$();price:`float$();size:`long$())]
    .qsp.v2.write.toDatabase[`trade; `$":db-one"; .qsp.use enlist[`directWrite]!enlist 1b]

// Global code. This will schedule .qsp.triggerWrite
// to run at 00:02 every day.
.tm.add[`EOD; (`.qsp.triggerWrite; (::)); 1D; 0D00:02 + .tm.nextt 1D]

Note .qsp.triggerWrite is a generic interface for triggering writers. In future other operators may implement the interface this triggers though at present only .qsp.v2.write.toDatabase does. If you want to be certain you only trigger the write on a specific operator(s), you can instead provide .qsp.triggerWrite with a symbol list of operator IDs. Recall we can explicitly set an operator ID by naming it:

// Explicitly name the v2 DB Writer operator to be myDBWriter:
.qsp.v2.write.toDatabase[`trade; `$":db-one"; .qsp.use `name`directWrite!(`myDBWriter;1b)];

// Now only trigger that operator:
.qsp.triggerWrite enlist `myDBWriter

Example 2: File watching in Python

See kxi.sp.read.from_amazon_s3 for how to correctly configure for watching and setting credentials.

>>> from kxi import sp
>>> import pykx as kx

>>> sp.run(sp.read.from_amazon_s3('s3://bucket/trades*.txt', mode='text', region='us-east-2', watch=True)
        | sp.decode.csv({
               'file': 'string',
               'time': 'timestamp',
               'ticker': 'symbol',
               'price': 'float',
               'size': 'int',
           })
        | sp.write.to_database(table='trade', target='db-one-sm:10001', directWrite=True, api_version=2, name='myPythonDBWriter'))

We can now trigger the write using the REST API.

curl -X POST https://insights.kx.com/streamprocessor/pipeline/{pipeline-id}/admin/triggerwrite \
  --header "Authorization: Bearer $INSIGHTS_TOKEN"

Or specifically just trigger the database writer with:

curl -X POST https://insights.kx.com/streamprocessor/pipeline/{pipeline-id}/admin/triggerwrite/opIDs=myPythonDBWriter \
  --header "Authorization: Bearer $INSIGHTS_TOKEN"

Differences between v1 and v2

  1. v1 of the Database Writer takes into account the purview of the database. Data within the current purview is treated as realtime and streamed to the database, whereas older data gets written directly. In v2 all data is written directly.
  2. The default mode for v1 of the Database Writer is overwrite set to true. Therefore, if data already exists in the database for a particular date prior to the pipeline running, it gets overwritten with the new data. The default setting in v2 for overwrite is false. So unless the user overrides this, data merges with existing data, by default.
  3. v1 is only supported in kdb Insights Enterprise whereas v2 also supports Insights SDK in Docker or Kubernetes.
  4. Since it is limited to Insights Enterprise, v1 requires the assembly parameter, while v2 requires either the database parameter or the target parameter.
  5. v1 could not generally be used with an unbounded reader. A workaround did exist using the statustable, however this would not work with all unbounded readers, like the the file watcher. v2 can be used with unbounded readers and direct write sessions are triggered using the trigger write API.
  6. The mountName, statusTable and timeout parameters are all removed as options in v2.

How to stream realtime data

If you prefer the behavior of the v1 where data for the current purview gets written to the RDB, you can do this using a split and a stream writer. This pipeline will split the stream and filter.

// Dynamically define the purview.
purview: {"p"$.z.d}

input: .qsp.run
    .qsp.read.fromAmazonS3["s3://bucket/trades*.txt"; .qsp.use `region`mode`watch!(`$"us-east-2";`text;1b)]
    .qsp.decode.csv[([]file:(); time:`timestamp$();ticker:`symbol$();price:`float$();size:`long$())]

realTimePipe: input
    .qsp.filter[{exec $[ti>=purview[];1b;0b] from x }; .qsp.use ``dropEmptyBatches!11b]
    .qsp.write.toStream[`trade; `stream]

historicalPipe: input
    .qsp.filter[{exec $[ti<purview[];1b;0b] from x }; .qsp.use ``dropEmptyBatches!11b]
    .qsp.v2.write.toDatabase[`trade; `$":db-one"; .qsp.use enlist[`directWrite]!enlist 1b]


.qsp.run (realTimePipe; historicalPipe)

Database vs target

The v2 Database writer removes the assembly parameter and replaces it with the database parameter. This is designed to work with Insights Enterprise where deploying a package named db-one would create a database service running at db-one-sm:10001. The target parameter is designed to work with Insights SDK where the same packaging pattern may not be followed. In this case you provide the full database connection string. So the following are equivalent:

Database parameter example:

.qsp.v2.write.toDatabase[`trade; `$"db-one"; .qsp.use `name`directWrite!(`myDBWriter;1b)];

Target parameter example:

.qsp.v2.write.toDatabase[`trade; .qsp.use `name`directWrite`target!(`myDBWriter;1b; `$"db-one-sm:10001")];