Writers
.qsp.write toAmazonS3 write to an object in Amazon S3 toConsole write to the console toDatabase write data to a kdb Insights Database toKafka publish data on a Kafka topic toKDB (Beta) write data to an on-disk partitioned table toProcess write data to a kdb+ process toStream write data using a kdb Insights stream toVariable write to a local variable
Writers are a specialized type of operator that allow users to push data from a streaming pipeline to different external data sources.
.qsp.write.toConsole
Write to the console
.qsp.write.toConsole[]
.qsp.write.toConsole[prefix]
.qsp.write.toConsole[prefix; .qsp.use (!) . flip (
(`split ; split);
(`timestamp; timestamp);
(`qlog ; qlog))]
Parameters:
name | type | description | default |
---|---|---|---|
prefix | string | A prefix for output messages. | "" |
options:
name | type | description | default |
---|---|---|---|
split | boolean | Controls how vectors are printed (see below). | 0b |
timestamp | symbol | Either local ,utc , empty for no timestamp, or . to use the UTC time when qlog is false, and no timestamp when it is true. |
`. |
qlog | boolean | Prints all console logs to a QLog stream. | 0b |
For all common arguments, refer to configuring operators
This operator adds a console writer to the current stream.
A console writer outputs content to standard out on the current process prefixed with the event timestamp.
Splitting output:
By default, all vectors (lists of the same type) are printed on a single line. By setting the
split
option to 1b
, vectors will be printed on separate lines. Lists of mixed types are
always printed on separate lines.
Examples:
Basic writer:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toConsole[];
// Callback takes lists of messages, so enlist the single message
publish enlist "hi!"
2021.03.08D19:19:08.171805000 | "hi!"
Write with prefix:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toConsole["INFO: "];
publish enlist "hi!"
INFO: 2021.03.08D19:19:08.171805000 | "hi!"
Write with split vectors:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toConsole[.qsp.use ``split!(::;1b)];
publish enlist 1 2 3 4
2021.09.30D14:01:50.359012847 | 1
2021.09.30D14:01:50.359012847 | 2
2021.09.30D14:01:50.359012847 | 3
2021.09.30D14:01:50.359012847 | 4
Write output to QLog format
.qsp.run
.qsp.read.fromCalback[`publish]
.qsp.write.toConsole[.qsp.use ``qlog!(::;1b)]
publish ("hello"; "world")
publish "==="
publish ([] date: .z.p+til 2; open: 2?100f; high: 2?100f; low:2?100f; close: 2?100f)
2021-10-04 11:35:33.189 [console_f08427d1] INFO "hello"
2021-10-04 11:35:33.189 [console_f08427d1] INFO "world"
2021-10-04 11:35:33.189 [console_f08427d1] INFO "==="
2021-10-04 11:36:12.193 [console_f08427d1] INFO date open high low close
2021-10-04 11:36:12.193 [console_f08427d1] INFO -----------------------------------------------------------------
2021-10-04 11:36:12.194 [console_f08427d1] INFO 2021.10.04D11:36:12.193211900 74.84699 20.68577 85.28164 46.82838
2021-10-04 11:36:12.194 [console_f08427d1] INFO 2021.10.04D11:36:12.193211901 97.91903 36.07874 69.44221 4.251175
.qsp.write.toDatabase
Write data to a kdb Insights Database
.qsp.write.toDatabase[table; assembly]
.qsp.write.toDatabase[table; assembly; .qsp.use (!) . flip (
(`timeout ; timeout);
(`deduplicate; deduplicate);
(`directWrite; directWrite);
(`mountName ; mountName))]
Parameters:
name | type | description | default |
---|---|---|---|
table | symbol | A table name. Note that at present if using directWrite table name must be a symbol. |
Required |
assembly | symbol or string | The assembly name | Required |
options:
name | type | description | default |
---|---|---|---|
timeout | int | Timeout value | 0Ni (no timeout) |
deduplicate | boolean | If the outbound stream should drop duplicate messages that may have been created during a failure event. If enabled, the pipeline must produce deterministic data. If $KXI_ALLOW_NONDETERMINISM is set, this value will be forced to be false, potentially resulting in duplicate data after failure recover events. |
1b |
directWrite | boolean | Bypass RDB & IDB and write to HDB directly for historical data. Direct table writing is useful for large backfill ingestion where data is mostly historical. When using direct write, the system will use significantly less resources but the data will not be available for query until the entire ingest is complete. | 0b |
mountName | symbol | Name of mount. | hdb |
For all common arguments, refer to configuring operators
This operator writes data using kdb Insights Reliable Transport streams, except when doing a direct write.
Direct write configuration
When using directWrite
, at present the table name specified must be a symbol value.
This requirement will be removed in a future release.
Also when using directWrite
, you must set an environment variable KXI_SP_DIRECT_WRITE_ASSEMBLY
to point to
the assembly you want to write to. If you want to set the mountName
to something other than hdb
, you must
set KXI_SP_DIRECT_WRITE_MOUNT_NAME
to the desired mount name. These variables are required for assembly deployments
and REST-based deployments. They are not needed for UI based deployments. This requirement will be removed
in a future release.
Note that these environment variables must be set using either the pipeline environment variable config, not in the pipeline spec's code.
Pipeline create request body example:
{
"name": "myPipeline",
"type": "spec",
"config": {
"content": ".qsp.run .qsp.read.fromCallback[`publish] .qsp.write.toDatabase[`myTable; `myAssembly; .qsp.use``directWrite!(::; 1b)]"
},
"env": {
"KXI_SP_DIRECT_WRITE_ASSEMBLY": "myAssembly"
}
}
Pipeline assembly configuration example:
...
spec:
elements:
pipelines:
myPipeline:
spec: |-
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toDatabase[`myTable; `myAssembly; .qsp.use``directWrite!(::;1b)]
env:
- name: "KXI_SP_DIRECT_WRITE_ASSEMBLY"
value: "myAssembly"
...
Basic example publishing streaming data:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toDatabase[`quotes; `assembly];
quotes: flip `time`ticker`bid`bidSize`ask`askSize!enlist each (.z.P; `test; 99.9; 100i; 100.1; 200i)
publish quotes
Using directWrite
for historical data:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toDatabase[`quotes; `assembly; .qsp.use enlist[`directWrite]!enlist 1b];
quotesOld: flip `time`ticker`bid`bidSize`ask`askSize!enlist each (.z.P-1D; `test; 99.9; 100i; 100.1; 200i)
quotesNew: flip `time`ticker`bid`bidSize`ask`askSize!enlist each (.z.P; `test2; 99.9; 100i; 100.1; 200i)
where quotesOld
is:
time ticker bid bidSize ask askSize
---------------------------------------------------------------
2022.12.20D14:32:18.949795000 test 99.9 100 100.1 200
and quotesNew
is
time ticker bid bidSize ask askSize
---------------------------------------------------------------
2022.12.21D14:37:35.971644000 test2 99.9 100 100.1 200
publish quotesOld // directly write old data to HDB bypass RDB & IDB
publish quotesNew // write new data using a stream
Direct write purviews
When using direct write, data that is older than the purview of the HDB will go directly to disk but will not be available for query until the pipeline completes. All other data will be streamed to the database and be ready for query immediately.
.qsp.write.toKafka
Publish data on a Kafka topic
.qsp.write.toKafka[topic]
.qsp.write.toKafka[topic; brokers]
.qsp.write.toKafka[topic; brokers; .qsp.use (!) . flip (
(`retries ; retries);
(`retryWait ; retryWait);
(`topicConfig ; topicConfig);
(`registry ; registry);
(`subject ; subject);
(`autoRegister; autoRegister);
(`schemaType ; schemaType))]
Parameters:
name | type | description | default |
---|---|---|---|
topic | symbol | A Kafka topic name to publish on. | ` |
broker | string or string[] | One or more brokers as host:port. | "localhost:9092" |
options:
name | type | description | default |
---|---|---|---|
retries | long | Max retry attempts for Kafka API calls. | 10 |
retryWait | timespan | Period to wait between retry attempts. | 0D00:00:02 |
topicConfig | dictionary | A dictionary of Kafka topic configuration options (see below). | ()!() |
registry | string | Optional URL to a Kafka Schema Registry. When provided, Kafka Schema Registry mode is enabled, allowing for automatic payload encoding. | "" |
subject | string | A Kafka subject to read schemas from and publish schemas to. If none is provided, uses " |
"" |
autoRegister | boolean | Controls whether or not to generate and publish schemas automatically. | 0b |
schemaType | string | Schema type to generate, one of: "JSON", "PROTOBUF", "AVRO" | "JSON" |
For all common arguments, refer to configuring operators
This operator acts as a Kafka producer, pushing data to a Kafka topic.
A Kafka producer will publish data to a Kafka broker which can then be consumed by any downstream listeners. All data published to Kafka must be encoded as either strings or serialized as bytes. If data reaches the Kafka publish point that is not encoded, it is converted to q IPC serialization representation. Data can also be linked to Kafka Schema Registry to use automatic type conversion.
AVRO is not supported
Encoding messages with AVRO using the Kafka Schema Registry integration is currently not supported.
Kafka topic configuration options
All the Kafka
topic configuration options
are supported by providing a config dictionary as the argument for topicConfig
.
This includes properties such as request.timeout.ms
, queuing.strategy
,
socket.timeout.ms
, connection.max.idle.ms
, etc.
Note that metadata.broker.list
and producer.id
are reserved values and are maintained
directly by the Kafka writer.
Localhost broker:
// Write data from a function called 'publish' and write to a Kafka topic
// 'words' with a default broker at 'localhost:9092'.
.qsp.run .qsp.read.fromCallback[`publish] .qsp.write.toKafka[`words]
// Both 'Hello' and 'World' are sent to the Kafka topic as independent events.
publish "Hello"
publish "World"
Multiple brokers:
// Write data from a function called 'publish' and write to a Kafka topic
// 'words' to multiple Kafka brokers.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toKafka[`words; ("localhost:1234"; "localhost:1235")]
// Both 'Hello' and 'World' are sent to the Kafka topic as independent events.
publish "Hello"
publish "World"
Advanced configuration:
// Writes data to a topic 'words' with a default broker setting custom values
// for advanced producer configuration.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toKafka[.qsp.use (!) . flip (
(`topic ; `words);
(`socket.timeout.ms ; 60000); // Wait 1 minute for socket timeout
(`socket.send.buffer.bytes ; 4*1024*1024); // Buffer up to 4MiB max msg size
(`max.in.flight ; 100000))]; // Allow up to 100k inflight msgs
// Publish 'Hello' and 'World' as a single batch publish
publish ("Hello"; "World")
Kafka Schema Registry with existing schema:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toKafka[.qsp.use (!) . flip (
(`topic ; `words);
(`brokers ; "localhost:9092");
(`registry ; "http://localhost:8081"))];
Kafka Schema Registry with generated schema:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toKafka[.qsp.use (!) . flip (
(`topic ; `words);
(`brokers ; "localhost:9092");
(`registry ; "http://localhost:8081");
(`autoRegister ; 1b);
(`schemaType ; "JSON"))];
Kafka Schema Registry with custom subject:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toKafka[.qsp.use (!) . flip (
(`topic ; `words);
(`brokers ; "localhost:9092");
(`registry ; "http://localhost:8081");
(`subject ; "custom-subject"))];
Writing to explicit partitions
Future versions of the Kafka writer will allow writing to explicit partitions/keys.
However, the current Kafka writer will write to unassigned partitions without using an explicit key.
.qsp.write.toKDB
(Beta Feature) Writes data to an on-disk partition table
Beta Features
To enable beta features, set the environment variable KXI_SP_BETA_FEATURES
to true
.
.qsp.write.toKDB[path;prtnCol;table]
Parameters:
name | type | description | default |
---|---|---|---|
path | symbol | The database path. | Required |
prtnCol | symbol | The name of the partition column. | Required |
table | symbol | The name of the table. | Required |
For all common arguments, refer to configuring operators
This operator writes an on-disk partition table.
Writing keyed columns
Plugin supports keyed columns but writes to disk as an unkeyed table.
Stream data into a table:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toKDB[`db;`date;`myTable];
publish ([] date: 2022.01.01 2022.01.02 2022.01.03 2022.01.04 2022.01.05; open: 1 2 3 4 5; close: 6 7 8 9 10);
.qsp.write.toProcess
Write data to a kdb+ process
.qsp.write.toProcess[handle]
.qsp.write.toProcess[.qsp.use (!) . flip (
(`mode ; mode);
(`target ; target);
(`async ; async);
(`queueLength; queueLength);
(`queueSize ; queueSize);
(`spread ; spread);
(`handle ; handle);
(`retries ; retries);
(`retryWait ; retryWait))]
Parameters:
name | type | description | default |
---|---|---|---|
handle | symbol | Handle of destination process to write to. | ` |
options:
name | type | description | default |
---|---|---|---|
mode | symbol | `function (call) or `table (upsert) (symbol). | `function |
target | symbol | Target function name or table name. | "" |
async | boolean | Whether writing should be async. | 1b |
queueLength | long | Max async message queue length before flush. | 0Wj |
queueSize | long | Max number of bytes to buffer before flushing. | 1MB |
spread | boolean | Treat the pipeline data as a list of arguments in function mode (can not be set with params also set). |
0b |
params | symbol[] | List of any parameters that should appear before the message data in function mode. | () |
retries | long | Max number of retry attempts. | 5 |
retryWait | timespan | Wait time between connection retry attempts. | 1 second |
For all common arguments, refer to configuring operators
This operator writes data to another kdb+ process using IPC.
During a retry loop, on connection loss and on startup, all processing on the Worker is halted until the connection to the output process is re-established or the max number of retries is reached.
Publish to table:
q -p 1234
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toProcess[.qsp.use `handle`mode`target!`::1234`table`output]
q)publish ([] til 2)
q)`::1234 `output
x
-
0
1
Publish to function:
q -p 1234 <<< "upd: {x set y}"
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.map[{ (`table; x) }]
.qsp.write.toProcess[.qsp.use `handle`target`spread!(`::1234; `upd; 1b)]
q)publish ([] til 2)
q)`::1234 `data
x
-
0
1
Re-connection to output process:
# Quit the process after some time to force a reconnect
q -p 1234
// Set number of retries to 2 and wait time between retries to 2 seconds
.qsp.run .qsp.read.fromCallback[`publish]
.qsp.write.toProcess[.qsp.use (!) . flip (
(`handle ; `::1234);
(`mode ; `table);
(`target ; `destTable);
(`retries ; 2);
(`retryWait; 0D00:00:02) ]
WARN [] SPWRITE Connection lost to IPC output process, attempting to reconnect, writer=ipc-91f585df
INFO [] CONN Closing connection, h=9
INFO [] SPWRITE Connection re-established, handle=::1234
.qsp.write.toStream
Write data using a KX Reliable Transport stream
.qsp.write.toStream[table]
.qsp.write.toStream[table; stream]
.qsp.write.toStream[table; stream; .qsp.use (!) . flip (
(`prefix ; prefix);
(`deduplicate; deduplicate))]
Parameters:
name | type | description | default |
---|---|---|---|
table | symbol or string | A table name. | Required |
stream | symbol or string | The outbound stream | $RT_PUB_TOPIC |
options:
name | type | description | default |
---|---|---|---|
prefix | string | Stream prefix for URL mapping | $RT_TOPIC_PREFIX |
deduplicate | boolean | If the outbound stream should drop duplicate messages that may have been created during a failure event. If enabled, the pipeline must produce deterministic data. If $KXI_ALLOW_NONDETERMINISM is set, this value will be forced to be false, potentially resulting in duplicate data after failure recover events. |
1b |
For all common arguments, refer to configuring operators
This operator writes data using kdb Insights Reliable Transport streams.
.qsp.write.toAmazonS3
Writes data to an object in an Amazon S3 bucket
.qsp.write.toAmazonS3[path]
.qsp.write.toAmazonS3[path; isComplete]
.qsp.write.toAmazonS3[path; isComplete; .qsp.use (!) . flip (
(`tenant ; tenant);
(`region ; region);
(`credentials ; credentials);
(`onTeardown ; onTeardown))]
name | type | description | default |
---|---|---|---|
path | string or symbol or function | The path of an object to read from S3. A function can also be provide that can dynamically set the file path for a given batch of data. | Required |
isComplete | function | A binary function that accepts a metadata dictionary as its first argument and data as its second. The return is a boolean to indicate if the upload of the current file should be marked as completed. This will cause any further writes to the given file path to fail. By default, writes will finish when the first non-partial batch is received for a given path. | :: |
options:
name | type | description | default |
---|---|---|---|
tenant | symbol | The authentication tenant. | ` |
region | string | The AWS region to authenticate against. | us-east-1 |
credentials | string | Name of a credentials secret to mount. See the authentication section below for details | "" |
onTeardown | symbol | For an in-flight uploads at the time of upload, this indicates the behavior. By default, all in-flight uploads will be left pending (none ). This is useful for pipelines that need to resume an upload between teardowns (ex. an upgrade). Alternatively, you can either chose to abort any pending uploads or complete any pending uploads |
none |
For all common arguments, refer to configuring operators
This operator reads an object from an Amazon Web Services S3 bucket as either text or binary.
Teardown Behavior
By default, teardown will leave pending uploads queued so that when the pipeline is restarted the pending uploads are resumed. This behaviour is useful for allowing pipelines to stop and restart for regularly scheduled maintenance (i.e. version upgrades). When an upload remains in a pending state, it will consume resources in Amazon S3 as a partial file upload. By default, Amazon will expunge these partial uploads after 7 days.
If this behavior is undesired, the teardown behavior can be modified to be either abort
or complete
.
If marked as abort
, any pending uploads will be terminated. If the upload is resumed, any already processed
data will be lost. If marked as complete
, any pending uploads will be pushed to S3. However, if the pipeline
was to resume, it will start overwriting the previous object with a new object. It is recommended to only
use the complete
behavior when a path
function is provided that will ensure a new file path is used
when resuming.
Authentication
The S3 writer uses kurl
for credential discovery.
See credential discovery
in the AWS tab for more information.
Environment variable authentication:
To setup authentication using environment variables, set AWS_ACCESS_KEY_ID
and
AWS_SECRET_ACCESS_KEY
.
For Docker based configurations, set the variables in the worker image configuration.
docker-compose
:
version: "3.3"
services:
worker:
image: registry.dl.kx.com/kxi-sp-worker:1.4.0
environment:
AWS_ACCESS_KEY_ID: "abcd"
AWS_SECRET_ACCESS_KEY: "iamasecret"
Docker credentials file for authentication:
To load a custom configuration file into a Docker deployment, mount the credentials file
directory into the container and set KXI_SP_CONFIG_PATH
to point to the configuration
directory.
version: "3.3"
services:
worker:
image: registry.dl.kx.com/kxi-sp-worker:1.4.0
volumes:
- $HOME/.aws/:/config/awscreds
environment:
KXI_SP_CONFIG_PATH: "/config"
Next, add the secret name to the S3 reader configuration
.qsp.write.toAmazonS3[`:s3://bucket/hello; .qsp.use``credentials!``awscreds]
Now deploying the pipeline will read the credentials from the AWS credentials file. Note
that the credentials file name must be credentials
.
For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "s3-writer",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
env : { AWS_ACCESS_KEY_ID: "abcd", AWS_SECRET_ACCESS_KEY: "iamasecret" }
}' | jq -asR .)"
Kubernetes secrets for authentication:
When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.
First, create a secret using an AWS credentials file.
kubectl create secret generic --from-file credentials=$HOME/.aws/credentials awscreds
Next, add the secret name to the S3 reader configuration
.qsp.write.toAmazonS3[`:s3://bucket/hello; .qsp.use``credentials!``awscreds]
Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "s3-writer",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
kubeConfig : { secrets : ["awscreds"] }
}' | jq -asR .)"
Examples
Uploading a local file:
Permissions
When running this example inside a kxi-sp-worker
image, you must first run
system "cd /tmp"
to avoid encountering permissions errors.
t:([] date: .z.d; name: 1000?10?`3; id: 1000?0ng; units: 1000?100f; price: 1000?1000f);
`:data.csv 0: csv 0: t;
.qsp.run
.qsp.read.fromFile[`:data.csv]
.qsp.write.toAmazonS3[`:s3://mybucket/file.csv]
Uploading a stream:
// The second argument to `write.toAmazonS3` indicates that data should not be uploaded
// until an empty batch is received in the stream.
.qsp.run
.qsp.read.fromCallback[`pub]
.qsp.write.toAmazonS3[`:s3://mybucket/file.txt; {[md;data] 0 ~ count data}]
pub "Lorem ipsum dolor sit amet, consectetur adipiscing elit, "
pub "sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. "
pub "Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat."
// At this point, the chunks have been buffered and are waiting to be converted to
// an S3 object. Flush the file and write it to S3.
pub ()
.qsp.write.toVariable
Writes data to a variable in the local process
.qsp.write.toVariable[variable]
.qsp.write.toVariable[variable; mode]
Parameters:
name | type | description | default |
---|---|---|---|
variable | symbol | The name of the variable to write the output to. | Required |
mode | symbol | The writing behavior. Mode can be one of the following options: - append outputs the stream into a variable- overwrite sets the variable to the last output of the pipeline- upsert performs an upsert on table data to the output variable. |
append |
For all common arguments, refer to configuring operators
This operator writes to a local variable, either overwriting, appending, or upserting to the variable depending on the selected mode.
Append joins data
When using append
as the write mode, values will be joined, even if they do not
have matching types. If tables are being used in the stream, the result will also be
a table. If a matching schema is desired, consider using upsert
mode to force a type
error for table schema mismatches.
Join data in a single output:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toVariable[`output]
publish 1 2 3
publish "abc"
output
1
2
3
"abc"
Capture an output value:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toVariable[`output; `overwrite];
publish 1 2 3
publish 4 5 6
output
4 5 6
Stream data into a table:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toVariable[`output; `upsert];
publish ([] x: 1 2 3; y: "abc");
publish ([] x: 4 5; y: "de");
output
x y
---
1 a
2 b
3 c
4 d
5 e