Skip to content

Readers

Readers are a specialized type of operator that allow users to feed data from different external data sources into a streaming pipeline. Readers are presented under the .qsp.read namespace. Each reader has their own custom start,stop,setup and teardown functions to handle different streaming lifecycle events. Readers are assumed to be asynchronous and push data into a pipeline using .qsp.push.

Some readers, such as the .qsp.read.fromCallback and .qsp.read.fromKafka, have implemented a stream partitioning interface. When using these readers, the partitions are distributed over multiple Workers, orchestrated by the Controller. See scaling for more information about scaling.

.qsp.read.fromCallback

The callback reader defines the given function in q, in the global namespace, causing data passed to the function (locally or over IPC) to enter the pipeline data flow.

Signatures:

    .qsp.read.fromCallback[name]

       - Symbol function name to define
    .qsp.read.fromCallback[opts]

       - `opts.callback`   - Symbol function name to define
       - `opts.partitions` - List of partition identifiers to distribute over available Workers

Example: Single worker callback

 // This reader creates a monadic callback function that can be executed to push data into a pipeline.
 // When the function `publish` is executed on an SP Worker with input data, that data is forwarded
 // to subsequent operators in the pipeline.
 .qsp.run
   .qsp.read.fromCallback[`publish]
   .qsp.write.toConsole[]

 // Since the callback is defined in the global scope, data can be passed directly within the same
 // program. This could be used to define `upd` handlers in kdb+ tick, or to wrap other data aquisition
 // methods such as `.Q.fs`.

 publish ([] val: 100?10f )           // Call directly from the same process
 upd: enlist[`trade]!enlist publish   // Define kdb+ tick upd handler
 .Q.fs[publish] `:myfile.json         // Stream the file 'myfile.json' of '\n'-delimited JSON records
                                      // through the pipeline

Example: Multi-worker callback

 // Creates a partitioned callback reader that can be distributed over multiple SP
 // Workers. This is creating a callback for each partition called `publish` and assign one of 10
 // 10 partitions to that callback.
 .qsp.run
   .qsp.read.fromCallback[`callback`partitions!(`publish; til 10)]
   .qsp.write.toConsole[]

.qsp.read.fromExpr

The expression reader evaluates the given expression or function, entering the resulting data into the pipeline.

Signatures:

    .qsp.read.fromExpr[expr]

       - `expr` - String expression to evaluate
    .qsp.read.fromExpr[fn]

       - `fn` - Niladic synchronous function to call

Example: From expression

 // This reader enters the table "refData" into the pipeline.

 refData: ([] sensorID: 100?`8; sensorCode: 100?100)

 .qsp.run
   .qsp.read.fromExpr["refData"]
   .qsp.write.toConsole[]

Example: From function

 // This reader queries a separate process for data.

 getSensor: { `:dap:4000 "select from trace where date = .z.d, sensorID = `id" }

 .qsp.run
   .qsp.read.fromExpr[getSensor]
   .qsp.write.toConsole[]

.qsp.read.fromFile

The file reader reads the given file, entering the file contents (bytes or characters) into the pipeline.

Signatures:

    .qsp.read.fromFile[path]

       - `path` - String file path (e.g., "/path/to/file.csv")
    .qsp.read.fromFile[opts]

       - `opts.path` - String file path
       - `opts.mode` - One of `text or `binary (default: `text)

Example: Text data from file


 qsp.run
   .qsp.read.fromFile["numbers.txt"]
   .qsp.write.toConsole[]

Example: Binary data from file


 .qsp.run
   .qsp.read.fromFile[`path`mode!("numbers.bin"; `binary)]
   .qsp.write.toConsole[]

.qsp.read.fromKafka

Consumes data from a Kafka topic. A Kafka consumer will distribute load across parallel readers to maximize the throughput of a given topic.

Workers that join the stream will start reading from the beginning of the stream unless an explicit offset is provided. To start by reading from the end of the stream, a special end symbol can be used as the offset value. See the 'Custom Offset' example below for more information.

Signatures:

    .qsp.read.fromKafka[topic; brokers]

       - `topic`   - Symbolic topic name to subscribe
       - `brokers` - String or list of strings of brokers host:port information
    .qsp.read.fromKafka[opts]

       - `opts.topic`     - Symbolic topic name to subscribe
       - `opts.brokers`   - String or list of strings of brokers (default: "localhost:9092")
       - `opts.retries`   - Number of retry attempts for Kafka API calls before failing (default: 10)
       - `opts.retryWait` - Amount of time to wait between retry attempts for Kafka API calls (default: 0D00:00:01)
       - `opts.offset`    - A dictionary of partitions to offsets to start reading from.
                            See <https://code.kx.com/q/interfaces/kafka/reference/#kfkassignoffsets>
       - `opts.<option>`  - All of the available Kafka consumer configuration options are supported as
                            first class options in the API. This includes options such as `socket.timeout.ms`,
                            `fetch.message.max.bytes`, etc. For the full list of options, please visit
                            https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. Note that
                            `group.id`, `metadata.broker.list` and `consumer.id` are reserved values and
                            are maintained directly by the Kafka reader.

Example: Localhost Broker

 // Read topic 'numbers' with a default broker at 'localhost:9092'.
 // This topic is generating random numbers between 0 and 10
 .qsp.run .qsp.read.fromKafka[`numbers] .qsp.write.toConsole[]

 /=> 2021.03.08D19:19:08.171805000 | 4
 /=> 2021.03.08D19:19:08.171915000 | 7
 /=> 2021.03.08D19:19:08.172081000 | 3
 /=> ..

Example: Multiple Brokers

 // Read topic 'numbers' from multiple brokers. This topic is generating
 // random numbers between 0 and 10
 .qsp.run
      .qsp.read.fromKafka[`topic`brokers!(`numbers; ("localhost:1234"; "localhost:1235"))]
      .qsp.write.toConsole[]

 /=> 2021.03.08D19:19:08.171805000 | 4
 /=> 2021.03.08D19:19:08.171915000 | 7
 /=> 2021.03.08D19:19:08.172081000 | 3
 /=> ..

Example: Advanced Configuration

 // Reads topic 'numbers' with a default broker setting custom values for advanced
 // consumer configuration. For more information about the available configuration values,
 // visit: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
 .qsp.run
      .qsp.read.fromKafka[(!) . flip (
          (`topic                     ; `numbers);
          (`socket.timeout.ms         ; 60000);       // Wait 1 minute for socket timeout
          (`fetch.message.max.bytes   ; 4*1024*1024); // Allow 4MiB max message size
          (`fetch.wait.max.ms         ; 500);         // Wait up to 500ms for a fetch cycle
          (`queued.max.message.chunks ; 100))]        // Allow up to 100 queued message chunks
      .qsp.write.toConsole[]

 /=> 2021.03.08D19:19:08.171805000 | 4
 /=> 2021.03.08D19:19:08.171915000 | 7
 /=> 2021.03.08D19:19:08.172081000 | 3
 /=> ..

Example: Custom Offsets

 // Reads topic 'numbers' from the end of the data stream. This will skip any old messages
 // in the stream and start reading from the point that a given worker joins the stream. Note
 // that the default offset for a given stream is to read from the beginning of the stream.
 .qsp.run
      .qsp.read.fromKafka[(!) . flip (
          (`topic  ; `numbers);
          (`offset ; `end))]      // The 'offset' field can be set to a single value to
                                  // apply to all partitions. Alternatively it can be set to
                                  // a dictionary of partitions and offsets. Offsets can either
                                  // be the special symbols `start or `end or a literal offset
                                  // index as a number. Example: 0 1i!(`start; 10)
      .qsp.write.toConsole[]

 /=> 2021.03.08D19:19:08.171805000 | 8
 /=> 2021.03.08D19:19:08.171915000 | 1
 /=> 2021.03.08D19:19:08.172081000 | 9
 /=> ..

.qsp.read.fromRT

Reads data using a KX Insights Reliable Transport client conforming to the standard RT API. See the configuration section for details about how to inject a customized RT library into the Stream Processor.

By default, the RT messages will be read from the start of the RT log. See the examples below for customizing the start position.

Signatures:

    .qsp.read.fromRT[topic]

       - `topic`     - String topic name
    .qsp.read.fromRT[opts]

       - `opts.topic`     - String topic name
       - `opts.index`     - Position in the stream to replay from as a long (default: 0)

Example: Subscribe to a TP at a known location

      .qsp.read.fromRT[":tp:5000"]

Example: Subscribe to a TP at known location without replaying the stream

      .qsp.read.fromRT[`topic`index!(":tp:5000"; 0N)]

Example: Subscribe to a TP endpoint mapped to a topic name with the env variable RT_SUB_SENSOR

      .qsp.read.fromRT["sensor"]