Skip to content

Writers

Writers are a specialized type of operator that allow users to push data from a streaming pipeline to different external data sources.

Each writer has its own custom setup and teardown functions to handle different streaming lifecycle events.

.qsp.write.toConsole

Write to the console

.qsp.write.toConsole[]
.qsp.write.toConsole[prefix]
.qsp.write.toConsole[prefix; .qsp.use (!) . flip (
    (`split    ; split);
    (`timestamp; timestamp);
    (`qlog     ; qlog))]

Parameters:

name type description default
prefix string A prefix for output messages. ""

options:

name type description default
split boolean Controls how vectors are printed (see below). 0b
timestamp symbol Either local,utc, empty for no timestamp, or . to use the UTC time when qlog is false, and no timestamp when it is true. `.
qlog boolean Prints all console logs to a QLog stream. 0b

For all common arguments, refer to configuring operators

This operator adds a console writer to the current stream.

A console writer outputs content to standard out on the current process prefixed with the event timestamp.

Splitting output:

By default, all vectors (lists of the same type) are printed on a single line. By setting the split option to 1b, vectors will be printed on separate lines. Lists of mixed types are always printed on separate lines.

Examples:

Basic writer:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toConsole[];
// Callback takes lists of messages, so enlist the single message
publish enlist "hi!"
2021.03.08D19:19:08.171805000 | "hi!"

Write with prefix:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toConsole["INFO: "];

publish enlist "hi!"
INFO: 2021.03.08D19:19:08.171805000 | "hi!"

Write with split vectors:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toConsole[.qsp.use ``split!(::;1b)];

publish enlist 1 2 3 4
2021.09.30D14:01:50.359012847 | 1
2021.09.30D14:01:50.359012847 | 2
2021.09.30D14:01:50.359012847 | 3
2021.09.30D14:01:50.359012847 | 4

Write output to QLog format

.qsp.run
  .qsp.read.fromCalback[`publish]
  .qsp.write.toConsole[.qsp.use ``qlog!(::;1b)]

publish ("hello"; "world")
publish "==="
publish ([] date: .z.p+til 2; open: 2?100f; high: 2?100f; low:2?100f; close: 2?100f)
2021-10-04 11:35:33.189 [console_f08427d1] INFO  "hello"
2021-10-04 11:35:33.189 [console_f08427d1] INFO  "world"
2021-10-04 11:35:33.189 [console_f08427d1] INFO  "==="
2021-10-04 11:36:12.193 [console_f08427d1] INFO  date                          open     high     low      close
2021-10-04 11:36:12.193 [console_f08427d1] INFO  -----------------------------------------------------------------
2021-10-04 11:36:12.194 [console_f08427d1] INFO  2021.10.04D11:36:12.193211900 74.84699 20.68577 85.28164 46.82838
2021-10-04 11:36:12.194 [console_f08427d1] INFO  2021.10.04D11:36:12.193211901 97.91903 36.07874 69.44221 4.251175

.qsp.write.toDatabase

Write data to a KX Insights Database

.qsp.write.toDatabase[table; assembly]
.qsp.write.toDatabase[table; assembly; .qsp.use (!) . flip (
    (`timeout    ; timeout);
    (`deduplicate; deduplicate))]

Parameters:

name type description default
table symbol or string A table name. Required
assembly symbol or string The assembly name Required

options:

name type description default
timeout int Timeout value 0Ni (no timeout)
deduplicate boolean If the outbound stream should drop duplicate messages that may have been created during a failure event. If enabled, the pipeline must produce deterministic data. If $KXI_ALLOW_NONDETERMINISM is set, this value will be forced to be false, potentially resulting in duplicate data after failure recover events. 1b

For all common arguments, refer to configuring operators

This operator writes data using KX Insights Reliable Transport streams.

.qsp.write.toKafka

Publish data on a Kafka topic

.qsp.write.toKafka[topic]
.qsp.write.toKafka[topic; brokers]
.qsp.write.toKafka[topic; brokers; .qsp.use (!) . flip (
    (`retries     ; retries);
    (`retryWait   ; retryWait);
    (`topicConfig ; topicConfig);
    (`registry    ; registry);
    (`subject     ; subject);
    (`autoRegister; autoRegister);
    (`schemaType  ; schemaType))]

Parameters:

name type description default
topic symbol A Kafka topic name to publish on. `
broker string or string[] One or more brokers as host:port. "localhost:9092"

options:

name type description default
retries long Max retry attempts for Kafka API calls. 10
retryWait timespan Period to wait between retry attempts. 0D00:00:02
topicConfig dictionary A dictionary of Kafka topic configuration options (see below). ()!()
registry string Optional URL to a Kafka Schema Registry. When provided, Kafka Schema Registry mode is enabled, allowing for automatic payload encoding. ""
subject string A Kafka subject to read schemas from and publish schemas to. If none is provided, uses "-value". ""
autoRegister boolean Controls whether or not to generate and publish schemas automatically. 0b
schemaType string Schema type to generate, one of: "JSON", "PROTOBUF", "AVRO" "JSON"

For all common arguments, refer to configuring operators

This operator acts as a Kafka producer, pushing data to a Kafka topic.

A Kafka producer will publish data to a Kafka broker which can then be consumed by any downstream listeners. All data published to Kafka must be encoded as either strings or serialized as bytes. If data reaches the Kafka publish point that is not encoded, it is converted to q IPC serialization representation. Data can also be linked to Kafka Schema Registry to use automatic type conversion.

AVRO is not supported

Encoding messages with AVRO using the Kafka Schema Registry integration is currently not supported.

Kafka topic configuration options

All the Kafka topic configuration options are supported by providing a config dictionary as the argument for topicConfig.

This includes properties such as request.timeout.ms, queuing.strategy, socket.timeout.ms, connection.max.idle.ms, etc.

Note that metadata.broker.list and producer.id are reserved values and are maintained directly by the Kafka writer.

Localhost broker:

// Write data from a function called 'publish' and write to a Kafka topic
// 'words' with a default broker at 'localhost:9092'.
.qsp.run .qsp.read.fromCallback[`publish] .qsp.write.toKafka[`words]
// Both 'Hello' and 'World' are sent to the Kafka topic as independent events.
publish "Hello"
publish "World"

Multiple brokers:

// Write data from a function called 'publish' and write to a Kafka topic
// 'words' to multiple Kafka brokers.
.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toKafka[`words; ("localhost:1234"; "localhost:1235")]
// Both 'Hello' and 'World' are sent to the Kafka topic as independent events.
publish "Hello"
publish "World"

Advanced configuration:

// Writes data to a topic 'words' with a default broker setting custom values
// for advanced producer configuration.
.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toKafka[.qsp.use (!) . flip (
    (`topic                     ; `words);
    (`socket.timeout.ms         ; 60000);        // Wait 1 minute for socket timeout
    (`socket.send.buffer.bytes  ; 4*1024*1024);  // Buffer up to 4MiB max msg size
    (`max.in.flight             ; 100000))];     // Allow up to 100k inflight msgs
// Publish 'Hello' and 'World' as a single batch publish
publish ("Hello"; "World")

Kafka Schema Registry with existing schema:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toKafka[.qsp.use (!) . flip (
    (`topic    ; `words);
    (`brokers  ; "localhost:9092");
    (`registry ; "http://localhost:8081"))];

Kafka Schema Registry with generated schema:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toKafka[.qsp.use (!) . flip (
    (`topic        ; `words);
    (`brokers      ; "localhost:9092");
    (`registry     ; "http://localhost:8081");
    (`autoRegister ; 1b);
    (`schemaType   ; "JSON"))];

Kafka Schema Registry with custom subject:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toKafka[.qsp.use (!) . flip (
    (`topic    ; `words);
    (`brokers  ; "localhost:9092");
    (`registry ; "http://localhost:8081");
    (`subject  ; "custom-subject"))];

Writing to explicit partitions

Future versions of the Kafka writer will allow writing to explicit partitions/keys.

However, the current Kafka writer will write to unassigned partitions without using an explicit key.

.qsp.write.toProcess

Write data to a kdb+ process

.qsp.write.toProcess[handle]
.qsp.write.toProcess[.qsp.use (!) . flip (
    (`mode       ; mode);
    (`target     ; target);
    (`async      ; async);
    (`queueLength; queueLength);
    (`queueSize  ; queueSize);
    (`spread     ; spread);
    (`params     ; params);
    (`handle     ; handle);
    (`retries    ; retries);
    (`retryWait  ; retryWait))]

Parameters:

name type description default
handle symbol Handle of destination process to write to. `

options:

name type description default
mode symbol `function (call) or `table (upsert) (symbol). `function
target symbol Target function name or table name. ""
async boolean Whether writing should be async. 1b
queueLength long Max async message queue length before flush. 0Wj
queueSize long Max number of bytes to buffer before flushing. 1MB
spread boolean Treat the pipeline data as a list of arguments in function mode (can not be set with params also set). 0b
params symbol[] List of any parameters that should appear before the message data in function mode. ()
retries long Max number of retry attempts. 5
retryWait timespan Wait time between connection retry attempts. 1 second

For all common arguments, refer to configuring operators

This operator writes data to another kdb+ process using IPC.

During a retry loop, on connection loss and on startup, all processing on the Worker is halted until the connection to the output process is re-established or the max number of retries is reached.

Publish to table:

q -p 1234
.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toProcess[.qsp.use `handle`mode`target!`::1234`table`output]
q)publish ([] til 2)
q)`::1234 `output
x
-
0
1

Publish to function:

q -p 1234 <<< "upd: {x set y}"
.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.map[{ (`table; x) }]
  .qsp.write.toProcess[.qsp.use `handle`target`spread!(`::1234; `upd; 1b)]
q)publish ([] til 2)
q)`::1234 `data
x
-
0
1

Re-connection to output process:

# Quit the process after some time to force a reconnect
q -p 1234
// Set number of retries to 2 and wait time between retries to 2 seconds
.qsp.run  .qsp.read.fromCallback[`publish]
  .qsp.write.toProcess[.qsp.use (!) . flip (
    (`handle   ; `::1234);
    (`mode     ; `table);
    (`target   ; `destTable);
    (`retries  ; 2);
    (`retryWait; 0D00:00:02) ]
WARN  [] SPWRITE Connection lost to IPC output process, attempting to reconnect, writer=ipc-91f585df
INFO  [] CONN Closing connection, h=9
INFO  [] SPWRITE Connection re-established, handle=::1234

.qsp.write.toStream

Write data using a KX Reliable Transport client

.qsp.write.toStream[table]
.qsp.write.toStream[table; stream]
.qsp.write.toStream[table; stream; .qsp.use (!) . flip (
    (`prefix     ; prefix);
    (`deduplicate; deduplicate))]

Parameters:

name type description default
table symbol or string A table name. Required
stream symbol or string The inbound stream $RT_PUB_TOPIC

options:

name type description default
prefix string Stream prefix for URL mapping $RT_TOPIC_PREFIX
deduplicate boolean If the outbound stream should drop duplicate messages that may have been created during a failure event. If enabled, the pipeline must produce deterministic data. If $KXI_ALLOW_NONDETERMINISM is set, this value will be forced to be false, potentially resulting in duplicate data after failure recover events. 1b

For all common arguments, refer to configuring operators

This operator writes data using KX Insights Reliable Transport streams.

.qsp.write.toVariable

Writes data to a variable in the local process

.qsp.write.toVariable[variable]
.qsp.write.toVariable[variable; mode]

Parameters:

name type description default
variable symbol The name of the variable to write the output to. Required
mode symbol The writing behavior. Mode can be one of the following options:
- append outputs the stream into a variable
- overwrite sets the variable to the last output of the pipeline
- upsert performs an upsert on table data to the output variable.
append

For all common arguments, refer to configuring operators

This operator writes to a local variable, either overwriting, appending, or upserting to the variable depending on the selected mode.

Append joins data

When using append as the write mode, values will be joined, even if they do not have matching types. If tables are being used in the stream, the result will also be a table. If a matching schema is desired, consider using upsert mode to force a type error for table schema mismatches.

Join data in a single output:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toVariable[`output]

publish 1 2 3
publish "abc"
output
1
2
3
"abc"

Capture an output value:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toVariable[`output; `overwrite];

publish 1 2 3
publish 4 5 6
output
4 5 6

Stream data into a table:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toVariable[`output; `upsert];

publish ([] x: 1 2 3; y: "abc");
publish ([] x: 4 5; y: "de");
output
x y
---
1 a
2 b
3 c
4 d
5 e