Skip to content

Writers

Writers are a specialized type of operator that allow users to push data from a streaming pipeline to different external data sources. Readers are presented under the .qsp.write namespace. Each writer has their own custom setup and teardown functions to handle different streaming lifecycle events.

.qsp.write.toConsole

Adds a console writer to the current stream. A console writer outputs content to standard out on the current process prefixed with the event timestamp.

Signatures:

    .qsp.write.toConsole[]
    .qsp.write.toConsole[prefix]

        - `prefix` - String prefix for output messages

Example: Basic writer

 .qsp.run
      .qsp.read.fromCallback[`publish]
      .qsp.write.toConsole[];

 // Callback takes *lists* of messages, so enlist the single message
 publish enlist "hi!"
 /=> 2021.03.08D19:19:08.171805000 | "hi!"

Example: Write with prefix

 .qsp.run
     .qsp.read.fromCallback[`publish]
     .qsp.write.toConsole["INFO: "];

 publish enlist "hi!"
 /=> INFO: 2021.03.08D19:19:08.171805000 | "hi!"

.qsp.write.toKafka

Produces data on a Kafka topic. A Kafka producer will publish data to a Kafka broker which can then be consumed by any downstream listeners. All data published to Kafka must be encoded as either strings or serialized as bytes. If data reaches the Kafka publish point that is not encoded, it will be converted to q IPC serialization representation.

Signatures:

     .qsp.write.toKafka[topic; brokers]

       - `topic`   - Symbolic topic name to publish on
       - `brokers` - String or list of strings of brokers host:port information
     .qsp.write.toKafka[opts]

       - `opts.topic`       - Symbolic topic name to publish on
       - `opts.brokers`     - String or list of strings of brokers (default: "localhost:9092")
       - `opts.retries`     - Number of retry attempts for Kafka API calls before failing (default: 10)
       - `opts.retryWait`   - Amount of time to wait between retry attempts for Kafka API calls (default: 0D00:00:01)
       - `opts.topicConfig` - All available Kafka topic configuration options. These options include properties
                              such as `request.timeout.ms`, `queuing.strategy`, etc. For the full list of topic
                              options, please visit https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md#topic-configuration-properties.
       - `opts.<option>`    - All of the available Kafka producer configuration options are supported as
                              first class options in the API. This includes options such as `socket.timeout.ms`,
                              `connection.max.idle.ms`, etc. For the full list of options, please visit
                               https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. Note that
                              `metadata.broker.list` and `producer.id` are reserved values and
                              are maintained directly by the Kafka writer.

Note

Future versions of the Kafka writer will allow writing to explicit partitions/keys, however the current Kafka writer will write to unassigned partitions without using an explicit key.

Example: Localhost Broker

 // Write data from a function called 'publish' and write to a Kafka topic
 // 'words' with a default broker at 'localhost:9092'.
 .qsp.run .qsp.read.fromCallback[`publish] .qsp.write.toKafka[`words]

 // Both 'Hello' and 'World' are sent to the Kafka topic as independent events.
 publish "Hello"
 publish "World"

Example: Multiple Brokers

 // Write data from a function called 'publish' and write to a Kafka topic
 // 'words' to multiple Kafka brokers.
 .qsp.run
      .qsp.read.fromCallback[`publish]
      .qsp.write.toKafka[`words; ("localhost:1234"; "localhost:1235")]

 // Both 'Hello' and 'World' are sent to the Kafka topic as independent events.
 publish "Hello"
 publish "World"

Example: Advanced Configuration

 // Writes data to a topic 'words' with a default broker setting custom values for advanced
 // producer configuration. For more information about the available configuration values,
 // visit:  https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
 .qsp.run
      .qsp.read.fromCallback[`publish]
      .qsp.write.toKafka[(!) . flip (
          (`topic                     ; `words);
          (`socket.timeout.ms         ; 60000);        // Wait 1 minute for socket timeout
          (`socket.send.buffer.bytes  ; 4*1024*1024);  // Buffer up to 4MiB max message size
          (`max.in.flight             ; 100000))];     // Allow up to 100k in flight messages

 // Publish 'Hello' and 'World' as a single batch publish
 publish ("Hello"; "World")

.qsp.write.toProcess

Write data to a given kdb+ process.

Signature:

    .qsp.write.toProcess[opts]

        - `opts.mode`   - One of `function (call) or `table (upsert)
        - `opts.target` - Target function name or table name
        - `opts.async`  - Boolean to indicate whether writing should be async (default: 1b)
        - `opts.params` - List of any parameters that should appear *before* the message data
                             in `function mode
        - `opts.handle` - Handle of destination process to write data to. This handle
                             should follow the structure of a q IPC handle as described in
                             on https://code.kx.com/q/ref/hopen/
        - `opts.retries` - The number of retry attempts to connect to the output process, on
                             connection loss and on startup. During this retry loop, all processing
                             on the Worker will be halted until the connection is re-established
                             or the max number of retries is reached. (default: 5)
        - `opts.retryWait` - Wait time between connection retry attempts, on startup or on
                             connection loss. (default: 0D00:00:01)

Example: Publish to table

  system "q -p 1234";

  .qsp.run
      .qsp.read.fromCallback[`publish]
      .qsp.write.toProcess[`handle`mode`target!`::1234`table`output];

  publish ([] til 2);
  `::1234 `output
  /=> x
  /=> -
  /=> 0
  /=> 1

Example: Publish to function

  system "q -p 1234";
  `::1234 (set; `upd; {x set y});

  .qsp.run
      .qsp.read.fromCallback[`publish]
      .qsp.write.toProcess[`handle`target`params!(`::1234; `upd; enlist `data)];

  publish ([] til 2);
  `::1234 `data
  /=> x
  /=> -
  /=> 0
  /=> 1

Example: Re-connection to output process


  system "q -p 1234";
  log.info "Setting timer to close connection from output process in 10 seconds";
  restart:{last[key .z.W]"hclose .z.w"}
 .tm.add1shot[`restart; (`restart; ::); 0D00:00:10];

 / Set number of retries to 2 and wait time between retries to 2 seconds
 .qsp.run  .qsp.read.fromCallback[`src; `publish]
      .qsp.write.toProcess[(!) . flip (
          (`handle   ; `::1234);
          (`mode     ; `table);
          (`target   ; `destTable);
          (`retries  ; 2);
          (`retryWait; 0D00:00:02)];

 /=>  WARN  [] SPWRITE Connection lost to IPC output process, attempting to reconnect, writer=ipc-91f585df
 /=>  INFO  [] CONN Closing connection, h=9
 /=>  INFO  [] SPWRITE Connection re-established, handle=::1234

.qsp.write.toRT

Writes data using a KX Reliable Transport client conforming to the standard RT API. See the configuration section for details about how to inject a customized RT library into the Stream Processor.

Signature:

    .qsp.write.toRT[topic]

        - `topic` - String topic name

Example: Publish to a TP at a known location

      ...
      .qsp.write.toRT[":tp:5000"]

Example: Publish to a TP endpoint mapped to a topic name with the env variable RT_PUB_SENSOR

      ...
      .qsp.write.toRT["sensor"]