Skip to content

Writers

Writers are a specialized type of operator that allow users to push data from a streaming pipeline to different external data sources.

.qsp.write.toConsole

Write to the console

.qsp.write.toConsole[]
.qsp.write.toConsole[prefix]
.qsp.write.toConsole[prefix; .qsp.use (!) . flip (
    (`split    ; split);
    (`timestamp; timestamp);
    (`qlog     ; qlog))]

Parameters:

name type description default
prefix string A prefix for output messages. ""

options:

name type description default
split boolean Controls how vectors are printed (see below). 0b
timestamp symbol Either local,utc, ` for no timestamp, or . to use the UTC time when qlog is false, and no timestamp when it is true. `.
qlog boolean Prints all console logs to a QLog stream. 0b

For all common arguments, refer to configuring operators

This operator adds a console writer to the current stream.

A console writer outputs content to standard out on the current process prefixed with the event timestamp.

Splitting output:

By default, all vectors (lists of the same type) are printed on a single line. By setting the split option to 1b, vectors will be printed on separate lines. Lists of mixed types are always printed on separate lines.

Examples:

Basic writer:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toConsole[];
// Callback takes lists of messages, so enlist the single message
publish enlist "hi!"
2021.03.08D19:19:08.171805000 | "hi!"

Write with prefix:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toConsole["INFO: "];

publish enlist "hi!"
INFO: 2021.03.08D19:19:08.171805000 | "hi!"

Write with split vectors:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toConsole[.qsp.use ``split!(::;1b)];

publish enlist 1 2 3 4
2021.09.30D14:01:50.359012847 | 1
2021.09.30D14:01:50.359012847 | 2
2021.09.30D14:01:50.359012847 | 3
2021.09.30D14:01:50.359012847 | 4

Write output to QLog format

.qsp.run
  .qsp.read.fromCalback[`publish]
  .qsp.write.toConsole[.qsp.use ``qlog!(::;1b)]

publish ("hello"; "world")
publish "==="
publish ([] date: .z.p+til 2; open: 2?100f; high: 2?100f; low:2?100f; close: 2?100f)
2021-10-04 11:35:33.189 [console_f08427d1] INFO  "hello"
2021-10-04 11:35:33.189 [console_f08427d1] INFO  "world"
2021-10-04 11:35:33.189 [console_f08427d1] INFO  "==="
2021-10-04 11:36:12.193 [console_f08427d1] INFO  date                          open     high     low      close
2021-10-04 11:36:12.193 [console_f08427d1] INFO  -----------------------------------------------------------------
2021-10-04 11:36:12.194 [console_f08427d1] INFO  2021.10.04D11:36:12.193211900 74.84699 20.68577 85.28164 46.82838
2021-10-04 11:36:12.194 [console_f08427d1] INFO  2021.10.04D11:36:12.193211901 97.91903 36.07874 69.44221 4.251175

.qsp.write.toDatabase

Write data to a kdb Insights Database

.qsp.write.toDatabase[table; assembly]
.qsp.write.toDatabase[table; assembly; .qsp.use (!) . flip (
    (`retries    ; retries);
    (`retryWait  ; retryWait);
    (`timeout    ; timeout);
    (`deduplicate; deduplicate);
    (`directWrite; directWrite);
    (`overwrite  ; overwrite);
    (`retries    ; retries);
    (`retryWait  ; retryWait);
    (`mountName  ; mountName);
    (`statusTable; statusTable))]

Parameters:

name type description default
table symbol A table name. Note that at present if using directWrite table name must be a symbol. Required
assembly symbol or string The assembly name Required, unless the pipeline has been deployed as part of an assembly, in which case the pipeline will default to writing to that assembly.

options:

name type description default
retries long Number of times to retry the connection to the storage manager. 60
retryWait timespan Time to wait between storage manager connection attempts. 0D00:00:03
timeout int Timeout value 0Ni (no timeout)
deduplicate boolean If the outbound stream should drop duplicate messages that may have been created during a failure event. If enabled, the pipeline must produce deterministic data. If $KXI_ALLOW_NONDETERMINISM is set, this value will be forced to be false, potentially resulting in duplicate data after failure recover events. 1b
directWrite boolean Bypass RDB & IDB and write to HDB directly for historical data. Direct table writing is useful for large backfill ingestion where data is mostly historical. When using direct write, the system will use significantly less resources but the data will not be available for query until the entire ingest is complete. At present direct write is only intended for writing to partitioned tables. Attempts to write other table types will error. 0b
overwrite boolean When using direct write, indicates if data should overwrite existing date partitions. If disabled, data will be merged with existing data. 1b
mountName symbol Name of mount. hdb
statusTable symbol If using direct write with an unbounded stream, this is the name of a table to store session statuses. `

For all common arguments, refer to configuring operators

This operator writes data using kdb Insights Reliable Transport streams, except when doing a direct write.

Automatic Assembly Discovery

If the assembly parameter is not set, and the pipeline is deployed as part of an assembly, the assembly will be automatically set to the assembly containing the pipeline.

kdb Insights Enterprise only

The .qsp.write.toDatabase writer operates only within kdb Insights Enterprise deployments of the Stream Processor at this time. This functionality does not operate within the kdb Insights Stream Processor Microservice. It is dependant on the ability to query deployed assemblies within Kubernetes.

directWrite should not to be used with a reader using the watch option

When using the directWrite option it is not possible to pair this with a reader using the watch option since directWrite relies on a definitive finish point which is not guaranteed when using a watcher.

Basic example publishing streaming data:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toDatabase[`quotes; `assembly];

quotes: flip `time`ticker`bid`bidSize`ask`askSize!enlist each (.z.P; `test; 99.9; 100i; 100.1; 200i)
publish quotes

Using directWrite for historical data:

quotesOld: flip `time`ticker`bid`bidSize`ask`askSize!enlist each (.z.P-1D; `test; 99.9; 100i; 100.1; 200i)
quotesNew: flip `time`ticker`bid`bidSize`ask`askSize!enlist each (.z.P; `test2; 99.9; 100i; 100.1; 200i)
quotes: quotesOld, quotesNew;

.qsp.run
  .qsp.read.fromExpr["quotes"]
  .qsp.write.toDatabase[`quotes; `assembly; .qsp.use enlist[`directWrite]!enlist 1b];

 // quotesOld will be written to hdb directly bypassing RDB & IDB.
 // quotesNew will be treated as stream data as it will be newer than the HDB purview.

Direct write purviews

When using direct write, data that is older than the purview of the HDB will go directly to disk but will not be available for query until the pipeline completes. All other data will be streamed to the database and be ready for query immediately.

Direct write compatibility

The direct write functionality is intended for batch processing only. As such when using direct write it should be paired with a reader operator with a finish point. Examples would include - read from file, read from cloud storage, read from database or read from expression. Reading from continuous streaming sources is not supported. Some reader operators support partitioning and will automatically scale up the number of workers. Using this with direct write results in undefined behavior. Therefore set the maxWorkers setting to 1 to ensure no scaling.

Direct write overwrite

When using direct write, overwrite is enabled by default. This is to preserve the old behavior of having each date partition in the ingest data overwrite the existing date partition. In a future release, this will be disabled by default making merge the default option. If you intend to continue to use the overwrite behavior, it is recommended that you set it explicitly in your pipeline configuration.

statusTable

statusTable is required when using direct write in an unbounded stream.

When using direct write in an unbounded stream, data which has been received but not yet written down is organized into a session. Triggering a writedown will write an entire session to the database.

If batches come in with no kx_DWSessionID field in the metadata, they will all be staged as a single session, with an automatically generated session ID, prefixed with "kx_default_", that gets updated after each writedown.

If batches come in with the kx_DWSessionID field in the metadata set to a symbol, they will be staged under this session ID and writedown messages must include the kx_DWSessionID field in the metadata, indicating which session ID to write down.

Without a finish event to trigger the writedown, writedowns must be triggered manually by sending a message with the kx_DWWritedown field in its metadata set to 1b. Warnings: - When kx_DWWritedown is present, the data for that batch is ignored. - session IDs cannot be reused. Reusing session ID will result in those sessions being discarded. - This symbol must only contain characters valid in a linux file name. - The overwrite option must be false when using statusTable. - keyed streams are unsupported and will be treated as unkeyed streams.

The status of completed sessions will be written to the status table, which must be defined, and must have the schema ([] sessionID: string; isComplete: boolean). Once the direct write is complete, the status table will be updated to include that session ID, with isComplete set to 1b.

The statusTable field can only be used with direct write.

.qsp.write.toKafka

Publish data on a Kafka topic

.qsp.write.toKafka[topic]
.qsp.write.toKafka[topic; brokers]
.qsp.write.toKafka[topic; brokers; .qsp.use (!) . flip (
    (`retries     ; retries);
    (`retryWait   ; retryWait);
    (`topicConfig ; topicConfig);
    (`registry    ; registry);
    (`subject     ; subject);
    (`autoRegister; autoRegister);
    (`schemaType  ; schemaType))]

Parameters:

name type description default
topic symbol A Kafka topic name to publish on. `
brokers string or string[] One or more brokers as host:port. "localhost:9092"

options:

name type description default
retries long Max retry attempts for Kafka API calls. 10
retryWait timespan Period to wait between retry attempts. 0D00:00:02
topicConfig dictionary A dictionary of Kafka topic configuration options (see below). ()!()
registry string Optional URL to a Kafka Schema Registry. When provided, Kafka Schema Registry mode is enabled, allowing for automatic payload encoding. ""
subject string A Kafka subject to read schemas from and publish schemas to. If none is provided, uses "-value". ""
autoRegister boolean Controls whether or not to generate and publish schemas automatically. 0b
schemaType string Schema type to generate, one of: "JSON", "PROTOBUF", "AVRO" "JSON"

For all common arguments, refer to configuring operators

This operator acts as a Kafka producer, pushing data to a Kafka topic.

A Kafka producer will publish data to a Kafka broker which can then be consumed by any downstream listeners. All data published to Kafka must be encoded as either strings or serialized as bytes. If data reaches the Kafka publish point that is not encoded, it is converted to q IPC serialization representation. Data can also be linked to Kafka Schema Registry to use automatic type conversion.

AVRO is not supported

Encoding messages with AVRO using the Kafka Schema Registry integration is currently not supported.

Kafka topic configuration options

All the Kafka topic configuration options are supported by providing a config dictionary as the argument for topicConfig.

This includes properties such as request.timeout.ms, queuing.strategy, socket.timeout.ms, connection.max.idle.ms, etc.

Reserved keys

metadata.broker.list and producer.id are reserved values and are maintained directly by the Kafka writer. The Kafka writer will error if these options are used.

Localhost broker:

// Write data from a function called 'publish' and write to a Kafka topic
// 'words' with a default broker at 'localhost:9092'.
.qsp.run .qsp.read.fromCallback[`publish] .qsp.write.toKafka[`words]
// Both 'Hello' and 'World' are sent to the Kafka topic as independent events.
publish "Hello"
publish "World"

Multiple brokers:

// Write data from a function called 'publish' and write to a Kafka topic
// 'words' to multiple Kafka brokers.
.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toKafka[`words; ("localhost:1234"; "localhost:1235")]
// Both 'Hello' and 'World' are sent to the Kafka topic as independent events.
publish "Hello"
publish "World"

Advanced configuration:

// Writes data to a topic 'words' with a default broker setting custom values
// for advanced producer configuration.
.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toKafka[.qsp.use (!) . flip (
    (`topic                     ; `words);
    (`socket.timeout.ms         ; 60000);        // Wait 1 minute for socket timeout
    (`socket.send.buffer.bytes  ; 4*1024*1024);  // Buffer up to 4MiB max msg size
    (`max.in.flight             ; 100000))];     // Allow up to 100k inflight msgs
// Publish 'Hello' and 'World' as a single batch publish
publish ("Hello"; "World")

Kafka Schema Registry with existing schema:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toKafka[.qsp.use (!) . flip (
    (`topic    ; `words);
    (`brokers  ; "localhost:9092");
    (`registry ; "http://localhost:8081"))];

Kafka Schema Registry with generated schema:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toKafka[.qsp.use (!) . flip (
    (`topic        ; `words);
    (`brokers      ; "localhost:9092");
    (`registry     ; "http://localhost:8081");
    (`autoRegister ; 1b);
    (`schemaType   ; "JSON"))];

Kafka Schema Registry with custom subject:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toKafka[.qsp.use (!) . flip (
    (`topic    ; `words);
    (`brokers  ; "localhost:9092");
    (`registry ; "http://localhost:8081");
    (`subject  ; "custom-subject"))];

Writing to explicit partitions

Future versions of the Kafka writer will allow writing to explicit partitions/keys.

However, the current Kafka writer will write to unassigned partitions without using an explicit key.

.qsp.write.toKDB

(Beta Feature) Writes data to an on-disk partition table

Beta Features

To enable beta features, set the environment variable KXI_SP_BETA_FEATURES to true.

.qsp.write.toKDB[path;prtnCol;table]

Parameters:

name type description default
path symbol The database path. Required
prtnCol symbol The name of the partition column. Required
table symbol The name of the table. Required

For all common arguments, refer to configuring operators

This operator writes an on-disk partition table.

Writing keyed columns

Plugin supports keyed columns but writes to disk as an unkeyed table.

Stream data into a table:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toKDB[`db;`date;`myTable];

publish ([] date: 2022.01.01 2022.01.02 2022.01.03 2022.01.04 2022.01.05; open: 1 2 3 4 5; close: 6 7 8 9 10);

.qsp.write.toProcess

Write data to a kdb+ process

.qsp.write.toProcess[handle]
.qsp.write.toProcess[.qsp.use (!) . flip (
    (`mode       ; mode);
    (`target     ; target);
    (`async      ; async);
    (`queueLength; queueLength);
    (`queueSize  ; queueSize);
    (`spread     ; spread);
    (`handle     ; handle);
    (`retries    ; retries);
    (`retryWait  ; retryWait))]

Parameters:

name type description default
handle symbol Handle of destination process to write to. `

options:

name type description default
mode symbol `function (call) or `table (upsert) (symbol). `function
target symbol Target function name or table name. ""
async boolean Whether writing should be async. 1b
queueLength long Max async message queue length before flush. 0Wj
queueSize long Max number of bytes to buffer before flushing. 1MB
spread boolean Treat the pipeline data as a list of arguments in function mode (can not be set with params also set). 0b
params symbol[] List of any parameters that should appear before the message data in function mode. ()
retries long Number of times to retry the connection to the process. 60
retryWait timespan Time to wait between process connection attempts. 0D00:00:03

For all common arguments, refer to configuring operators

This operator writes data to another kdb+ process using IPC.

During a retry loop, on connection loss and on startup, all processing on the Worker is halted until the connection to the output process is re-established or the max number of retries is reached.

Publish to table:

q -p 1234
.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toProcess[.qsp.use `handle`mode`target!`::1234`table`output]
q)publish ([] til 2)
q)`::1234 `output
x
-
0
1

Publish to function:

q -p 1234 <<< "upd: {x set y}"
.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.map[{ (`table; x) }]
  .qsp.write.toProcess[.qsp.use `handle`target`spread!(`::1234; `upd; 1b)]
q)publish ([] til 2)
q)`::1234 `data
x
-
0
1

Re-connection to output process:

# Quit the process after some time to force a reconnect
q -p 1234
// Set number of retries to 2 and wait time between retries to 2 seconds
.qsp.run  .qsp.read.fromCallback[`publish]
  .qsp.write.toProcess[.qsp.use (!) . flip (
    (`handle   ; `::1234);
    (`mode     ; `table);
    (`target   ; `destTable);
    (`retries  ; 2);
    (`retryWait; 0D00:00:02) ]
WARN  [] SPWRITE Connection lost to IPC output process, attempting to reconnect, writer=ipc-91f585df
INFO  [] CONN Closing connection, h=9
INFO  [] SPWRITE Connection re-established, handle=::1234

.qsp.write.toStream

Write data using a KX Reliable Transport stream

.qsp.write.toStream[table]
.qsp.write.toStream[table; stream]
.qsp.write.toStream[table; stream; .qsp.use (!) . flip (
    (`prefix     ; prefix);
    (`deduplicate; deduplicate))]

Parameters:

name type description default
table symbol or string A table name. Required
stream symbol or string The outbound stream $RT_PUB_TOPIC

options:

name type description default
prefix string Stream prefix for URL mapping $RT_TOPIC_PREFIX
deduplicate boolean If the outbound stream should drop duplicate messages that may have been created during a failure event. If enabled, the pipeline must produce deterministic data. If $KXI_ALLOW_NONDETERMINISM is set, this value will be forced to be false, potentially resulting in duplicate data after failure recover events. 1b

For all common arguments, refer to configuring operators

This operator writes data using kdb Insights Reliable Transport streams.

Note

Writing to a kdb Insights Reliable Transport stream facilitates two principal use cases, chaining of pipelines and the ability to write to a database. If explicitly intending to write to a database however it is strongly suggested that you use .qsp.write.toDatabase as this includes functionality for the validation of incoming schemas.

.qsp.write.toAmazonS3

Writes data to an object in an Amazon S3 bucket

.qsp.write.toAmazonS3[path; isComplete; region]
.qsp.write.toAmazonS3[path; isComplete; region; .qsp.use (!) . flip (
    (`tenant       ; tenant);
    (`credentials  ; credentials);
    (`onTeardown   ; onTeardown))]
Parameters:

name type description default
path string or symbol or function The path of an object to read from S3. A function can also be provide that can dynamically set the file path for a given batch of data. Required
isComplete :: or function A binary function that accepts a metadata dictionary as its first argument and data as its second. The return is a boolean to indicate if the upload of the current file should be marked as completed. This will cause any further writes to the given file path to fail. Writes will finish when the first non-partial batch is received for a given path. Required
region string The AWS region to authenticate against. us-east-1

options:

name type description default
tenant string The authentication tenant
domain string The name of your domain (private config) ""
credentials string Name of a credentials secret to mount. See the authentication section below for details ""
onTeardown symbol For an in-flight uploads at the time of upload, this indicates the behavior. By default, all in-flight uploads will be left pending (none). This is useful for pipelines that need to resume an upload between teardowns (ex. an upgrade). Alternatively, you can either chose to abort any pending uploads or complete any pending uploads none

For all common arguments, refer to configuring operators

This operator reads an object from an Amazon Web Services S3 bucket as either text or binary.

Teardown Behavior

By default, teardown will leave pending uploads queued so that when the pipeline is restarted the pending uploads are resumed. This behaviour is useful for allowing pipelines to stop and restart for regularly scheduled maintenance (i.e. version upgrades). When an upload remains in a pending state, it will consume resources in Amazon S3 as a partial file upload. By default, Amazon will expunge these partial uploads after 7 days.

If this behavior is undesired, the teardown behavior can be modified to be either abort or complete. If marked as abort, any pending uploads will be terminated. If the upload is resumed, any already processed data will be lost. If marked as complete, any pending uploads will be pushed to S3. However, if the pipeline was to resume, it will start overwriting the previous object with a new object. It is recommended to only use the complete behavior when a path function is provided that will ensure a new file path is used when resuming.

Authentication

The S3 writer uses kurl for credential discovery. See credential discovery in the AWS tab for more information.

Environment variable authentication:

To setup authentication using environment variables, set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.

For Docker based configurations, set the variables in the worker image configuration.

docker-compose:

version: "3.3"
services:
  worker:
    image: portal.dl.kx.com/kxi-sp-worker:1.9.0
    environment:
      AWS_ACCESS_KEY_ID: "abcd"
      AWS_SECRET_ACCESS_KEY: "iamasecret"

Docker credentials file for authentication:

To load a custom configuration file into a Docker deployment, mount the credentials file directory into the container and set KXI_SP_CONFIG_PATH to point to the configuration directory.

version: "3.3"
services:
  worker:
    image: portal.dl.kx.com/kxi-sp-worker:1.9.0
    volumes:
      - $HOME/.aws/:/config/awscreds
    environment:
      KXI_SP_CONFIG_PATH: "/config"

Next, add the secret name to the S3 reader configuration

.qsp.write.toAmazonS3[`:s3://bucket/hello; ::; "us-east-1"; .qsp.use``credentials!``awscreds]

Now deploying the pipeline will read the credentials from the AWS credentials file. Note that the credentials file name must be credentials.

For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name     : "s3-writer",
        type     : "spec",
        config   : { content: $spec },
        settings : { minWorkers: "1", maxWorkers: "10" },
        env      : { AWS_ACCESS_KEY_ID: "abcd", AWS_SECRET_ACCESS_KEY: "iamasecret" }
    }' | jq -asR .)"

Kubernetes secrets for authentication:

When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.

First, create a secret using an AWS credentials file. Take care to ensure the secret is created in the correct namespace.

kubectl create secret generic --from-file credentials=$HOME/.aws/credentials awscreds

Note that the secret must be formatted so that the variable names are lower case and there is a space either side of the equals sign like so:

[default]
aws_access_key_id = abcd1234
aws_secret_access_key = abcd1234

Next, add the secret name to the S3 reader configuration

.qsp.write.toAmazonS3[`:s3://bucket/hello; ::; "us-west-1"; .qsp.use``credentials!``awscreds]

Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "s3-writer",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets : ["awscreds"] }
    }' | jq -asR .)"

Examples

Uploading a local file:

Permissions

When running this example inside a kxi-sp-worker image, you must first run system "cd /tmp" to avoid encountering permissions errors.

t:([] date: .z.d; name: 1000?10?`3; id: 1000?0ng; units: 1000?100f; price: 1000?1000f);
`:data.csv 0: csv 0: t;

.qsp.run
    .qsp.read.fromFile[`:data.csv]
    .qsp.write.toAmazonS3[`:s3://mybucket/file.csv; ::; "us-west-1"]

Uploading a stream:

// The second argument to `write.toAmazonS3` indicates that data should not be uploaded
// until an empty batch is received in the stream.
.qsp.run
    .qsp.read.fromCallback[`pub]
    .qsp.write.toAmazonS3[`:s3://mybucket/file.txt; ::; "us-west-1"; {[md;data] 0 ~ count data}]

pub "Lorem ipsum dolor sit amet, consectetur adipiscing elit, "
pub "sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. "
pub "Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat."

// At this point, the chunks have been buffered and are waiting to be converted to
// an S3 object. Flush the file and write it to S3.
pub ()

.qsp.write.toSubscriber

Writes data to a subscriber

.qsp.write.toSubscriber[table; keyCols]

Parameters:

name type description default
table symbol Name of the subscription. Used to subscribe to data emitted from this node. Required
keyCol symbol or symbol [] A key column, or list of key columns Required

For all common arguments, refer to configuring operators

This operator publishes data over QIPC to a subscriber.

Publish to subscriber:

q -p 1234
.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toSubscriber[`data; `keyCol]

For more information on using the Subscriber writer in an end-to-end deployment, see either the kdb Insights Microservices example or the Enterprise example

.qsp.write.toVariable

Writes data to a variable in the local process

.qsp.write.toVariable[variable]
.qsp.write.toVariable[variable; mode]

Parameters:

name type description default
variable symbol The name of the variable to write the output to. Required
mode symbol The writing behavior. Mode can be one of the following options:
- append outputs the stream into a variable
- overwrite sets the variable to the last output of the pipeline
- upsert performs an upsert on table data to the output variable.
append

For all common arguments, refer to configuring operators

This operator writes to a local variable, either overwriting, appending, or upserting to the variable depending on the selected mode.

Append joins data

When using append as the write mode, values will be joined, even if they do not have matching types. If tables are being used in the stream, the result will also be a table. If a matching schema is desired, consider using upsert mode to force a type error for table schema mismatches.

Join data in a single output:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toVariable[`output]

publish 1 2 3
publish "abc"
output
1
2
3
"abc"

Capture an output value:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toVariable[`output; `overwrite];

publish 1 2 3
publish 4 5 6
output
4 5 6

Stream data into a table:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toVariable[`output; `upsert];

publish ([] x: 1 2 3; y: "abc");
publish ([] x: 4 5; y: "de");
output
x y
---
1 a
2 b
3 c
4 d
5 e