Ingesting to a kdb Insights Database
One of the core use-cases for the Stream Processor is to ingest data into the kdb Insights Database. This is implemented using the kdb Insights Database write operator. It operates in two modes;
- Streaming - where data is streamed to the database and stored in-memory. This is primarily used for real-time data.
- Direct write - where data is written directly to the database's disk and ingested in batches. This is primarily used for larger datasets and backfill operations as it reduces resource overhead.
This page primarily deals with the Direct write mode.
Database writer versions
The Database Writer has multiple versions but this document focuses on the v2 as it provides significantly improved functionality and user experience compared to v1. The complete differences between the two versions are outlined below.
v2 q interface
To specify explicitly the v2 Database Writer in a q code pipeline, use the following syntax:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.v2.write.toDatabase[`quotes; `taq];
For details, see .qsp.v2.write.toDatabase
. Note that, at the moment, the default .qsp.write.toDatabase
is v1 of the operator. You can explicitly choose the v1 by using .qsp.v1.write.toDatabase
.
v2 Python interface
To specify explicitly the v2 Database Writer in a python code pipeline, use the following syntax:
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish')
| sp.write.to_database(table='trade', database='taq', api_version=2))
kxi.sp.write.to_database
. Note that, at the moment, the default if api_version
is not set is v1 of the operator. You can explicitly choose the v1 by setting api_version
to 1
.
kdb Insights Enterprise
The v2 database writer is available in the kdb Insights Enterprise Web Interface from a version dropdown. See the UI docs.
Triggering writedowns
With the v2 writer it may be necessary to manually trigger the database writedowns depending on your pipeline specification. If your pipeline reaches a FINISHED status, it automatically triggers a writedown to the database. This is known as a bounded pipeline and relates to situations where there is a finite amount of data to process. For example, if the pipeline has one or more S3 readers with a fixed list of files. The pipeline completes once it has processed all of the data and trigger in the database ingest.
In other situations, your pipeline runs indefinitely and never reaches a FINISHED state. This is known as an unbounded pipeline and consequently it doesn't automatically trigger a database ingest. Examples of unbounded pipelines would be ones with readers like the kdb Insights Stream, Kafka, or a file reader (S3, Google, Azure) with watching enabled. In these cases, the pipeline needs to be instructed when to trigger the writedown.
There are two methods to do this;
- q and Python developer APIs
- REST API
POST pipeline/{id}/admin/triggerwrite
(kdb Insights Enterprise only)
v1 support
Unbounded pipelines in the v1 database writer operator are not supported out of the box so the following examples focus on v2.
Example 1: File watching in q
See .qsp.read.fromAmazonS3
for how to correctly configure for watching and setting credentials.
See .qsp.triggerWrite for more details on the .qsp.triggerWrite
API.
.qsp.run
.qsp.read.fromAmazonS3["s3://bucket/trades*.txt"; .qsp.use `region`mode`watch!(`$"us-east-2";`text;1b)]
.qsp.decode.csv[([]file:(); time:`timestamp$();ticker:`symbol$();price:`float$();size:`long$())]
.qsp.v2.write.toDatabase[`trade; `$":db-one"; .qsp.use enlist[`directWrite]!enlist 1b]
// Global code. This will schedule .qsp.triggerWrite
// to run at 00:02 every day.
.tm.add[`EOD; (`.qsp.triggerWrite; (::)); 1D; 0D00:02 + .tm.nextt 1D]
Note .qsp.triggerWrite
is a generic interface for triggering writers.
In future other operators may implement the interface this triggers though at present only .qsp.v2.write.toDatabase
does.
If you want to be certain you only trigger the write on a specific operator(s), you can instead provide .qsp.triggerWrite
with a symbol list of operator IDs. Recall we can explicitly set an operator ID by naming it:
// Explicitly name the v2 DB Writer operator to be myDBWriter:
.qsp.v2.write.toDatabase[`trade; `$":db-one"; .qsp.use `name`directWrite!(`myDBWriter;1b)];
// Now only trigger that operator:
.qsp.triggerWrite enlist `myDBWriter
Example 2: File watching in Python
See kxi.sp.read.from_amazon_s3
for how to correctly configure for watching and setting credentials.
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_amazon_s3('s3://bucket/trades*.txt', mode='text', region='us-east-2', watch=True)
| sp.decode.csv({
'file': 'string',
'time': 'timestamp',
'ticker': 'symbol',
'price': 'float',
'size': 'int',
})
| sp.write.to_database(table='trade', target='db-one-sm:10001', directWrite=True, api_version=2, name='myPythonDBWriter'))
We can now trigger the write using the REST API.
curl -X POST https://insights.kx.com/streamprocessor/pipeline/{pipeline-id}/admin/triggerwrite \
--header "Authorization: Bearer $INSIGHTS_TOKEN"
Or specifically just trigger the database writer with:
curl -X POST https://insights.kx.com/streamprocessor/pipeline/{pipeline-id}/admin/triggerwrite/opIDs=myPythonDBWriter \
--header "Authorization: Bearer $INSIGHTS_TOKEN"
Differences between v1 and v2
- v1 of the Database Writer takes into account the purview of the database. Data within the current purview is treated as realtime and streamed to the database, whereas older data gets written directly. In v2 all data is written directly.
- The default mode for v1 of the Database Writer is
overwrite
set totrue
. Therefore, if data already exists in the database for a particular date prior to the pipeline running, it gets overwritten with the new data. The default setting in v2 foroverwrite
isfalse
. So unless the user overrides this, data merges with existing data, by default. - v1 is only supported in kdb Insights Enterprise whereas v2 also supports Insights SDK in Docker or Kubernetes.
- Since it is limited to Insights Enterprise, v1 requires the
assembly
parameter, while v2 requires either thedatabase
parameter or thetarget
parameter. - v1 could not generally be used with an unbounded reader. A workaround did exist using the
statustable
, however this would not work with all unbounded readers, like the the file watcher. v2 can be used with unbounded readers and direct write sessions are triggered using the trigger write API. - The
mountName
,statusTable
andtimeout
parameters are all removed as options in v2.
How to stream realtime data
If you prefer the behavior of the v1 where data for the current purview gets written to the RDB, you can do this using a split and a stream writer. This pipeline will split the stream and filter.
// Dynamically define the purview.
purview: {"p"$.z.d}
input: .qsp.run
.qsp.read.fromAmazonS3["s3://bucket/trades*.txt"; .qsp.use `region`mode`watch!(`$"us-east-2";`text;1b)]
.qsp.decode.csv[([]file:(); time:`timestamp$();ticker:`symbol$();price:`float$();size:`long$())]
realTimePipe: input
.qsp.filter[{exec $[ti>=purview[];1b;0b] from x }; .qsp.use ``dropEmptyBatches!11b]
.qsp.write.toStream[`trade; `stream]
historicalPipe: input
.qsp.filter[{exec $[ti<purview[];1b;0b] from x }; .qsp.use ``dropEmptyBatches!11b]
.qsp.v2.write.toDatabase[`trade; `$":db-one"; .qsp.use enlist[`directWrite]!enlist 1b]
.qsp.run (realTimePipe; historicalPipe)
Database vs target
The v2 Database writer removes the assembly
parameter and replaces it with the database
parameter. This is designed to work with Insights Enterprise where deploying a package named db-one
would create a database service running at db-one-sm:10001
.
The target
parameter is designed to work with Insights SDK where the same packaging pattern may not be followed. In this case you provide the full database connection string.
So the following are equivalent:
Database parameter example:
.qsp.v2.write.toDatabase[`trade; `$"db-one"; .qsp.use `name`directWrite!(`myDBWriter;1b)];
Target parameter example:
.qsp.v2.write.toDatabase[`trade; .qsp.use `name`directWrite`target!(`myDBWriter;1b; `$"db-one-sm:10001")];