Skip to content

Readers

Readers are a specialized type of operator that allow users to feed data from different external data sources into a streaming pipeline. Each reader has its own start, stop, setup and teardown functions to handle different streaming lifecycle events. Readers are assumed to be asynchronous and push data into a pipeline using .qsp.push.

Some readers, such as .qsp.read.fromCallback and .qsp.read.fromKafka, have implemented a stream-partitioning interface. When using these readers, the partitions are distributed over multiple Workers, orchestrated by the Controller.

Scaling

.qsp.read.fromCallback

Define callback in the q global namespace

.qsp.read.fromCallback[callback]
.qsp.read.fromCallback[callback; .qsp.use enlist[`partitions]!enlist partitions]

Parameters:

name type description default
callback symbol The name of the function to define. Required

options:

name type description default
partitions vector A list of partition identifiers to distribute over available Workers. ()
key symbol or generic null (::) Name of the field which contains the key of the published event, or generic null for unkeyed data. Only works for dictionary events. (::)

For all common arguments, refer to configuring operators

This operator defines callback as a function in the q global namespace, causing data passed to the function (locally or over IPC) to enter the pipeline data flow.

Single worker callback:

// This reader creates a monadic callback function that can be executed to push
// data into a pipeline. When the function `publish` is executed on an SP
// Worker with input data, that data is forwarded to subsequent operators in
// the pipeline.
.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toConsole[]
// Since the callback is defined in the global scope, data can be passed
// directly within the same program. This could be used to define `upd`
// handlers in kdb+ tick, or to wrap other data-acquisition methods such as `.Q.fs`.

publish ([] val: 100?10f )           // Call directly from the same process
upd: enlist[`trade]!enlist publish   // Define kdb+ tick upd handler
.Q.fs[publish] `:myfile.json         // Stream the file 'myfile.json' of
                                     // '\n'-delimited JSON records
                                     // through the pipeline

Multi-worker callback:

// Creates a partitioned callback reader that can be distributed over multiple
// SP Workers. This is creating a callback for each partition called `publish`
// and assign one of 10 partitions to that callback.
.qsp.run
  .qsp.read.fromCallback[.qsp.use `callback`partitions!(`publish; til 10)]
  .qsp.write.toConsole[]

.qsp.read.fromDatabase

Reads data from an Insights database

.qsp.read.fromDatabase[query]
.qsp.read.fromDatabase[query; .qsp.use (!) . flip (
    (table      ; table);
    (startTS    ; startTS);
    (endTS      ; endTS);
    (filter     ; filter);
    (groupBy    ; groupBy);
    (agg        ; agg);
    (fill       ; fill);
    (temporality; temporality);
    (sortCols   ; sortCols);
    (retries    ; retries);
    (retryWait  ; retryWait))]

Parameters:

name type description default
query string qSQL query to execute on the database. Required

options:

name type description default
table symbol Name of table to retrieve data from. `
startTS timestamp Inclusive start time of period of interest. 0Np
endTS timestamp Exclusive end time of period of interest. 0Np
filter list List of triadic lists of the form (function;column name;parameter). ()
groupBy symbol[] List of columns to group aggregation result by. ()
agg symbol[] List of triples of aggregations or columns to select. e.g. Aggregation dict example:(`c1`avg`price;`c2`sum`size), basic select example `price`size. ()
fill symbol How to handle nulls in the data. Supported values are zero and forward. The zero option fills numeric types with zeroes. The forward option fills nulls with previous, non-null entry. `zero
temporality symbol Sets the range of data in view for each day within the query. Supports two types of temporality: snapshot which takes a continuous range of the data, and slice which grabs data between the time values of startTS and endTS parameters, for each date within the start and end timestamp. `snapshot
sortCols symbol[] Columns to sort result data on. ()
retries long Number of times to retry the connection to the database. 5
retryWait timespan Time to wait between database connection attempts. 0D00:00:02

For all common arguments, refer to configuring operators

This operator issues a query either using qSQL or a functional 'get data' query. If a single string parameter is provided, a qSQL query is issued. Otherwise, the configuration dictionary is used, and table, startTS and endTS are considered required fields.

Database Host

In a Kubernetes deployment, the database reader will use the deployment release name to locate the query service gateway. When using a Docker configuration or to override the host address of the database, set $KXI_SP_DATABASE_HOST. The hostname should be the full URL of the Service Gateway including the port to connect to.

q SQL query:

.qsp.run
  .qsp.read.fromDatabase["select from stocks"]
  .qsp.write.toConsole[]
                             | date        id  sym       market   name                                                  cap
-----------------------------| ---------------------------------------------------------------------------------------------------
2021.09.10D18:29:31.124921700| 2021.09.10  1   "OXLC"    "NASDAQ" "Oxford Lane Capital Corp."                           "$229.56M"
2021.09.10D18:29:31.124921700| 2021.09.10  2   "CIA"     "NYSE"   "Citizens, Inc."                                      "$350.57M"
2021.09.10D18:29:31.124921700| 2021.09.10  3   "PEI"     "NYSE"   "Pennsylvania Real Estate Investment Trust"           "$785.28M"
2021.09.10D18:29:31.124921700| 2021.09.10  4   "SPWR"    "NASDAQ" "SunPower Corporation"                                "$1.06B"
2021.09.10D18:29:31.124921700| 2021.09.10  5   "DVA"     "NYSE"   "DaVita Inc."                                         "$12.88B"
2021.09.10D18:29:31.124921700| 2021.09.10  6   "BHACW"   "NASDAQ" "Barington/Hilco Acquisition Corp."                   "n/a"
2021.09.10D18:29:31.124921700| 2021.09.10  7   "BCRX"    "NASDAQ" "BioCryst Pharmaceuticals, Inc."                      "$446.33M"
..

Get data API:

.qsp.run
  .qsp.read.fromDatabase[.qsp.use `table`startTS`endTS!(`stocks; .z.p-1D; .z.p)]
  .qsp.write.toConsole[]
                             | date        id  sym       market   name                                                  cap
-----------------------------| ---------------------------------------------------------------------------------------------------
2021.09.10D18:29:31.124921700| 2021.09.10  1   "OXLC"    "NASDAQ" "Oxford Lane Capital Corp."                           "$229.56M"
2021.09.10D18:29:31.124921700| 2021.09.10  2   "CIA"     "NYSE"   "Citizens, Inc."                                      "$350.57M"
2021.09.10D18:29:31.124921700| 2021.09.10  3   "PEI"     "NYSE"   "Pennsylvania Real Estate Investment Trust"           "$785.28M"
2021.09.10D18:29:31.124921700| 2021.09.10  4   "SPWR"    "NASDAQ" "SunPower Corporation"                                "$1.06B"
2021.09.10D18:29:31.124921700| 2021.09.10  5   "DVA"     "NYSE"   "DaVita Inc."                                         "$12.88B"
2021.09.10D18:29:31.124921700| 2021.09.10  6   "BHACW"   "NASDAQ" "Barington/Hilco Acquisition Corp."                   "n/a"
2021.09.10D18:29:31.124921700| 2021.09.10  7   "BCRX"    "NASDAQ" "BioCryst Pharmaceuticals, Inc."                      "$446.33M"
..

Enrich data with stream

stocks: .qsp.read.fromDatabase["select * from stocks"];

// Reads from a stream and filters on a table called 'trades', windows
// the data into 5 second intervals, then enriches the data with a join
// on time. Finally the data is written to a table called 'avgPrice'.
.qsp.run
  .qsp.read.fromStream[`trades]
  .qsp.window.tumbling[0D00:00:05; `time]
  .qsp.merge[stocks; { x lj `sym xkey y }]  // Join stocks to trades on sym
  .qsp.map[{ select avg price by sym from x }]
  .qsp.write.toStream[`avgPrice]
An example output of this pipeline using a toConsole writer might be as follows.

                             | sym  price
-----------------------------| -------------
2022.02.23D17:38:31.256064000| AAPL 360.1039
2022.02.23D17:38:31.256064000| GOOG 658.463
2022.02.23D17:38:31.256064000| HBP  737.0521
2022.02.23D17:38:31.256064000| NFLX 289.6375
2022.02.23D17:38:31.256064000| PEA  191.2175
..

.qsp.read.fromExpr

Evaluate expression or function into the pipeline

.qsp.read.fromExpr[expr]

Parameters:

name type description default
expr string or function A q expression, or in q only, a nullary synchronous function. Required

For all common arguments, refer to configuring operators

This operator evaluates the expression or function, and incorporates the resulting data into the pipeline.

Enter the table refData into the pipeline:

refData: ([] sensorID: 100?`8; sensorCode: 100?100)

.qsp.run
  .qsp.read.fromExpr["refData"]
  .qsp.write.toConsole[]

Query a separate process for data:

getSensor: { `:dap:4000 "select from trace where date = .z.d, sensorID = `id" }

.qsp.run
  .qsp.read.fromExpr[getSensor]
  .qsp.write.toConsole[]

.qsp.read.fromFile

Read file contents into pipeline

.qsp.read.fromFile[path]
.qsp.read.fromFile[path; .qsp.use (!) . flip (
    (`mode     ; mode);
    (`chunking ; chunking);
    (`chunkSize; chunkSize))]

Parameters:

name type description default
path string A filepath. Required

options:

name type description default
mode symbol Either binary or text. binary
chunking symbol Either enabled, disabled or auto. auto
chunkSize long or string The size in bytes of chunks to read when chunking is enabled or the number of bytes to read using a size suffix. Sizes that are specified as strings are always assumed to be bytes. "1MB"

For all common arguments, refer to configuring operators

This operator reads a file, pushing its contents (bytes or characters) to the pipeline.

File mode

Setting the file mode to binary will read content as a byte vector. When reading a file as text, data will be read as strings and be split on newlines. Each string vector represents a single line.

File Chunking

Chunking a file splits the file into smaller batches and streams the batches through the pipeline. If chunking is set to auto, the reader will determine the size of the target file and if it is sufficiently large (more than a few megabytes) it will be read in chunks.

Reading a file:

`:numbers.txt 0: string 100?1000;

.qsp.run
  .qsp.read.fromFile["numbers.txt"]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..

Reading text data by line:

`:numbers.txt 0: string 100?1000;

.qsp.run
  .qsp.read.fromFile["numbers.txt"; ``mode!``text]
  .qsp.map["J"$]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..

Reading binary data:

`:numbers.bin 1: 100?0x0;

.qsp.run
  .qsp.read.fromFile["numbers.bin"]
  .qsp.write.toVariable[`output]

output
0xebf68daee782ca3f0156512c444a35e1dfe4bc471594..

.qsp.read.fromGoogleStorage

Reads an object from Google Cloud Storage

.qsp.read.fromGoogleStorage[path]
.qsp.read.fromGoogleStorage[path; .qsp.use (!) . flip (
    (`mode     ; mode);
    (`tenant   ; tenant);
    (`chunking ; chunking);
    (`chunkSize; chunkSize))]

Parameters:

name type description default
path string or symbol The path of an object to read from Google Cloud Storage. Required

options:

name type description default
mode symbol Either binary or text. binary
tenant string The authentication tenant. ""
chunking symbol Either enabled, disabled or auto. auto
chunkSize long or string The size in bytes of chunks to read when chunking is enabled or the number of bytes to read using a size suffix. Sizes that are specified as strings are always assumed to be bytes. "1MB"

For all common arguments, refer to configuring operators

This operator reads an object from a Google Cloud Storage bucket.

Object mode

Setting the object mode to binary will read content as a byte vector. When reading an object as text, data will be read as strings and be split on newlines. Each string vector represents a single line.

Object chunking

Chunking an object splits the object into smaller batches and streams the batches through the pipeline. If chunking is set to auto, the reader will determine the size of the target object and if it is sufficiently large (more than a few megabytes) it will be read in chunks.

Authentication

The Google Cloud Storage reader uses kurl for credential discovery. See credential discovery in the GCP tab for more information. When running on Google provisioned cloud infrastructure, the credential discovery will automatically use the credentials of the user that launched the instance.

Environment variable authentication:

To setup authentication using an environment variable, set GOOGLE_STORAGE_TOKEN with the output of running gcloud auth print-access-token. To install the Google SDK, refer to these instructions.

For Docker based configurations, set the variables in the worker image configuration.

docker-compose:

version: "3.3"
services:
  worker:
    image: registry.dl.kx.com/kxi-sp-worker:1.1.0
    environment:
      GOOGLE_STORAGE_TOKEN: "123abc"

For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name     : "gs-reader",
        type     : "spec",
        config   : { content: $spec },
        settings : { minWorkers: "1", maxWorkers: "10" },
        env      : { GOOGLE_STORAGE_TOKEN: "123abc" }
    }' | jq -asR .)"

Kubernetes secrets for authentication:

When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.

First, create a secret using a generic secret.

kubectl create secret generic --from-literal token=$(gcloud auth print-access-token) gscreds

Next, add the secret name to the Google Cloud Reader reader configuration

.qsp.read.fromGoogleStorage[`:gs://bucket/hello; .qsp.use``credentials!``gscreds]

Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "gs-reader",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets : ["gscreds"] }
    }' | jq -asR .)"

Examples

Processing an Google Cloud Storage object:

.qsp.run
  .qsp.read.fromGoogleStorage[`:gs://bucket/numbers.txt]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..

Reading a text object by line:

.qsp.run
  .qsp.read.fromGoogleStorage[`:gs://bucket/numbers.txt; ``mode!``text]
  .qsp.map["J"$]
  .qsp.write.toConsole[]

2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..
Reading with a custom tenant:

.qsp.run
  .qsp.read.fromGoogleStorage[`:gs://bucket/object; .qsp.use ``tenant!(""; "custom")]
  .qsp.write.toVariable[`output]

.qsp.read.fromHTTP

Requests data from an HTTP(S) endpoint

.qsp.read.fromHTTP[url]
.qsp.read.fromHTTP[url; method]
.qsp.read.fromHTTP[url; method; .qsp.use (!) . flip (
    (`body            ; body);
    (`header          ; header);
    (`onResponse      ; onResponse);
    (`followRedirects ; followRedirects);
    (`maxRedirects    ; maxRedirects);
    (`maxRetryAttempts; maxRetryAttempts);
    (`timeout         ; timeout);
    (`tenant          ; tenant);
    (`insecure        ; insecure);
    (`binary          ; binary);
    (`sync            ; sync);
    (`rejectErrors    ; rejectErrors)
    )]

Parameters:

name type description default
url string or symbol The URL to send a request to. Required
method string or symbol The HTTP method for the HTTP request (ex. GET, POST, etc.). GET

options:

name type description default
body string The payload of the HTTP request. ""
header dict A map of header fields to send as part of the request. ()!()
onResponse function After a response, allows the response to be preprocessed or to trigger another request. Returning :: will process the return from the original request immediately. A return of a string will issue another request with the return value as the URL. A return of a dictionary allows for any of the operator parameters to be reset and a new HTTP request issued. A special 'response' key can be used in the return dictionary to change the payload of the response. If the response key is set to ::, no data is pushed into the pipeline. ::
followRedirects boolean If set, any redirects will automatically be followed up to the maximum number of redirects. 1b
maxRedirects long The maximum number of redirects to follow before reporting an error. 5
maxRetryAttempts long The maximum number of times to retry a request that fails due to a timeout. 10
timeout long The duration in milliseconds to wait for a request to be completed before reporting an error. 0W
tenant symbol The request tenant to use for providing request authentication details. ""
insecure boolean Indicates if unverified server SSL/TLS certificates should be trusted. 0b
binary boolean Indicates that the resulting payload should be returned as binary data, otherwise text is assumed. 0b
sync boolean Indicates if this request should be made synchronously or asynchronously. Setting the request to be synchronous will block the process until the request is completed. 0b
rejectErrors boolean Non-successful response codes will generate an error and stop the pipeline. 1b

For all common arguments, refer to configuring operators

Return data from a GET request:

.qsp.run
  .qsp.read.fromHTTP["https://example.com"]
  .qsp.write.toVariable[`output]

Read paged data:

URL: "https://example.com"
nextPage: {[res] $[0 < count page: res[2]"S"$"next-page"; URL,"?page=",page; ::]}

.qsp.run
 .qsp.read.fromHTTP[URL,"?page=0"; .qsp.use``onResponse!(::;nextPage)]
 .qsp.write.toVariable[`output]

.qsp.read.fromKafka

Consume data from a Kafka topic

.qsp.read.fromKafka[topic]
.qsp.read.fromKafka[topic; brokers]
.qsp.read.fromKafka[topic; brokers; .qsp.use (!) . flip (
    (`retries      ; retries);
    (`retryWait    ; retryWait);
    (`pollLimit    ; pollLimit);
    (`offset       ; offset);
    (`registry     ; registry);
    (`includeFields; includeFields);
    (`options      ; options))]

Parameters:

name type description default
topic symbol The name of a topic. Required
brokers string or string[] One or more brokers identified as host:port pairs. "localhost:9092"

options:

name type description default
retries long Max retry attempts for Kafka API calls. 10
retryWait timespan Time to wait between retry attempts. 0D00:00:02
pollLimit long Maximum number of records to process in a single poll loop. 1000
offset dictionary Partitions to offsets to start reading from. (`int$())!()
registry string Optional URL to a Kafka Schema Registry. When provided, Kafka Schema Registry mode is enabled, allowing for automatic payload decoding. ""
includeFields boolean Set to true for Kafka Schema Registry messages to include the field name when decoding Protocol Buffer schemas. 1b
options dictionary Options to be passed through to Kafka. ()!()

For all common arguments, refer to configuring operators

This operator acts as a Kafka consumer, consuming data from a Kafka topic and pushing it to the pipeline.

A Kafka consumer will distribute load across parallel readers to maximize the throughput of a given topic.

Workers that join the stream will start reading from the end of the stream unless an explicit offset is provided. To start by reading from the beginning of the stream, a special start symbol can be used as the offset value, as in the Custom Offsets example below.

Beside the above keys, all available Kafka consumer configuration options are supported as options in the API, and can be passed in using the options dictionary. This includes properties such as socket.timeout.ms, fetch.message.max.bytes, etc.

Reserved keys

group.id, metadata.broker.list and consumer.id are reserved values and are maintained directly by the Kafka reader.

Maximum Poll Limit

The Kafka reader reads multiple messages in a single poll iteration. A global limit is imposed on the number of messages to read in a single cycle to avoid locking the process in a read loop only serving Kafka messages. By default this limit is set to 1000 messages. Setting the configuration option pollLimit will change this value. This limit is global so if multiple readers set this value, only one will be used.

Running with TLS

It is recommended for all production deployments that TLS be enabled for encrypting data in transit. For more information about configuring TLS, refer to either the microservice tutorial or the platform tutorial depending on the configuration.

Localhost broker:

// Read topic 'numbers' with a default broker at 'localhost:9092'.
// This topic is generating random numbers between 0 and 10
.qsp.run .qsp.read.fromKafka[`numbers] .qsp.write.toConsole[]
2021.03.08D19:19:08.171805000 | 4
2021.03.08D19:19:08.171915000 | 7
2021.03.08D19:19:08.172081000 | 3
..

Multiple brokers:

// Read topic 'numbers' from multiple brokers. This topic is generating
// random numbers between 0 and 10
.qsp.run
  .qsp.read.fromKafka[.qsp.use `topic`brokers!(`numbers;("localhost:1234";"localhost:1235"))]
  .qsp.write.toConsole[]
2021.03.08D19:19:08.171805000 | 4
2021.03.08D19:19:08.171915000 | 7
2021.03.08D19:19:08.172081000 | 3
..

Advanced configuration:

// Reads topic 'numbers' with a default broker setting custom values for advanced
// consumer configuration.
.qsp.run
  .qsp.read.fromKafka[.qsp.use (!) . flip (
    (`topic  ; `numbers);
    (`options; (!) . flip (
        (`socket.timeout.ms         ; 60000);       // Wait 1 minute for socket timeout
        (`fetch.message.max.bytes   ; 4*1024*1024); // Allow 4MiB max message size
        (`fetch.wait.max.ms         ; 500);         // Wait up to 500ms for fetch cycle
        (`queued.max.message.chunks ; 100))))]      // Allow up to 100 queued msg chunks
  .qsp.write.toConsole[]
2021.03.08D19:19:08.171805000 | 4
2021.03.08D19:19:08.171915000 | 7
2021.03.08D19:19:08.172081000 | 3
..

Custom offsets:

// Reads topic 'numbers' from the end of the data stream. This will skip any
// old messages in the stream and start reading from the point that a given
// worker joins the stream. Note that the default offset for a given stream is
// to read from the beginning of the stream.
.qsp.run
  .qsp.read.fromKafka[.qsp.use (!) . flip (
    (`topic  ; `numbers);
    (`offset ; `start))]   // The 'offset' field can be set to a single value
                            // to apply to all partitions. Alternatively it can
                            // be set to a dictionary of partitions and
                            // offsets. Offsets can either be the special
                            // symbols `start or `end or a literal offset index
                            // as a number. Example: 0 1i!(`start; 10)
  .qsp.write.toConsole[]
2021.03.08D19:19:08.171805000 | 8
2021.03.08D19:19:08.171915000 | 1
2021.03.08D19:19:08.172081000 | 9
..

TLS authentication:

For more information on TLS configuration, see either the microservice example or platform example depending on the desired deployment configuration.

// Reads topic 'numbers' from a broker configured at `kafka:9092` using
// custom TLS/SSL certificates.
.qsp.run
  .qsp.read.fromKafka[.qsp.use (!) . flip (
    (`brokers; "kafka:9092");
    (`topic  ; `numbers);
    (`options; (!) . flip (
        (`ssl.secret       ; "certs");
        (`ssl.key.password ; "iamsecure"))))]
  .qsp.write.toConsole[]
2021.03.08D19:19:08.171805000 | 4
2021.03.08D19:19:08.171915000 | 7
2021.03.08D19:19:08.172081000 | 3
..

Kafka Schema Registry:

.qsp.run
  .qsp.read.fromKafka[.qsp.use (!) . flip (
    (`topic;`numbers);
    (`brokers;"localhost:9092");
    (`registry;"http://localhost:8081"))]
  .qsp.write.toConsole[];

.qsp.read.fromAzureStorage

Reads an object from Microsoft Azure Storage

.qsp.read.fromAzureStorage[path]
.qsp.read.fromAzureStorage[path; account]
.qsp.read.fromAzureStorage[path; account; .qsp.use (!) . flip (
    (`mode     ; mode);
    (`tenant   ; tenant);
    (`chunking ; chunking);
    (`chunkSize; chunkSize))]

Parameters:

name type description default
path string or symbol The path of an object to read from Microsoft Azure Storage. Required
account string or symbol The name of the Microsoft Azure Storage account to read an object from $AZURE_STORAGE_ACCOUNT

options:

name type description default
mode symbol Either binary or text. binary
tenant string The authentication tenant. ""
chunking symbol Either enabled, disabled or auto. auto
chunkSize long or string The size in bytes of chunks to read when chunking is enabled or the number of bytes to read using a size suffix. Sizes that are specified as strings are always assumed to be bytes. "1MB"

For all common arguments, refer to configuring operators

This operator reads an object from a Microsoft Azure Blob Storage bucket.

Object mode

Setting the object mode to binary will read content as a byte vector. When reading an object as text, data will be read as strings and be split on newlines. Each string vector represents a single line.

Object chunking

Chunking an object splits the object into smaller batches and streams the batches through the pipeline. If chunking is set to auto, the reader will determine the size of the target object and if it is sufficiently large (more than a few megabytes) it will be read in chunks.

Authentication

The Microsoft Azure Storage reader uses kurl for credential discovery. See credential discovery in the Azure tab for more information. When running on Azure provisioned cloud infrastructure, the credential discovery will automatically use the credentials of the user that launched the instance.

Environment variable authentication:

To setup authentication using environment variables, set AZURE_STORAGE_ACCOUNT to the name of the storage account to read from and AZURE_STORAGE_SHARED_KEY to one of the keys for that account. To use shared keys, refer to these instructions

For Docker based configurations, set the variables in the worker image configuration.

docker-compose:

version: "3.3"
services:
  worker:
    image: registry.dl.kx.com/kxi-sp-worker:1.1.0
    environment:
      AZURE_STORAGE_ACCOUNT: "123abc"
      AZURE_STORAGE_SHARED_KEY: "aAabBbcCcdDd111222333"

For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name     : "ms-reader",
        type     : "spec",
        config   : { content: $spec },
        settings : { minWorkers: "1", maxWorkers: "10" },
        env      : {
            AZURE_STORAGE_ACCOUNT: "123abc",
            AZURE_STORAGE_SHARED_KEY: "aAabBbcCcdDd111222333"
        }
    }' | jq -asR .)"

Kubernetes secrets for authentication:

When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.

First, create a secret using a generic secret.

kubectl create secret generic --from-literal token="aAabBbcCcdDd111222333" mscreds

Next, add the secret name and the account name to the Azure Storage Reader reader configuration.

.qsp.read.fromAzureStorage[`:ms://bucket/hello; "abc123"; .qsp.use``credentials!``mscreds]

Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "ms-reader",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets : ["mscreds"] }
    }' | jq -asR .)"

Examples

Processing an Microsoft Azure Storage object:

.qsp.run
  .qsp.read.fromAzureStorage[`:ms://bucket/numbers.txt; `myaccount]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..

Reading a text object by line:

.qsp.run
  .qsp.read.fromAzureStorage[`:ms://bucket/numbers.txt; `myaccount; .qsp.use ``mode!``text]
  .qsp.map["J"$]
  .qsp.write.toConsole[]

2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..
Reading with a custom tenant:

.qsp.run
  .qsp.read.fromAzureStorage[`:ms://bucket/object; .qsp.use `account`tenant!("myaccount"; "custom")]
  .qsp.write.toVariable[`output]

.qsp.read.fromPostgres

Execute query on a PostgreSQL database

.qsp.read.fromPostgres[query]
.qsp.read.fromPostgres[query; database]
.qsp.read.fromPostgres[query; database; .qsp.use (!) . flip (
    (`server  ; server);
    (`post    ; post);
    (`username; username);
    (`password; password))]

Parameters:

name type description default
query string Query to execute on the Postgres database. Required
database string Name of the database to connect to. $KXI_SP_POSTGRES_DATABASE

options:

name type description default
server string Address of the database to connect to. $KXI_SP_POSTGRES_SERVER
post string Port of database. $KXI_SP_POSTGRES_PORT
username string Username to authenticate with. $KXI_SP_POSTGRES_USERNAME
password string Password to authenticate with. $KXI_SP_POSTGRES_PASSWORD

For all common arguments, refer to configuring operators

This operator executes a query on a PostgreSQL database and pushes the result into the pipeline.

After the query has completed processing, the Postgres reader will signal a 'finish' command which will teardown the pipeline if there are no other pending requests.

Reading from a table 'stocks':

.qsp.run
  .qsp.read.fromPostgres["select * from stocks"]
  .qsp.write.toConsole[]
                             | id  sym       market   name                                                  cap
-----------------------------| ---------------------------------------------------------------------------------------
2021.09.10D18:29:31.124921700| 1   "OXLC"    "NASDAQ" "Oxford Lane Capital Corp."                           "$229.56M"
2021.09.10D18:29:31.124921700| 2   "CIA"     "NYSE"   "Citizens, Inc."                                      "$350.57M"
2021.09.10D18:29:31.124921700| 3   "PEI"     "NYSE"   "Pennsylvania Real Estate Investment Trust"           "$785.28M"
2021.09.10D18:29:31.124921700| 4   "SPWR"    "NASDAQ" "SunPower Corporation"                                "$1.06B"
2021.09.10D18:29:31.124921700| 5   "DVA"     "NYSE"   "DaVita Inc."                                         "$12.88B"
2021.09.10D18:29:31.124921700| 6   "BHACW"   "NASDAQ" "Barington/Hilco Acquisition Corp."                   "n/a"
2021.09.10D18:29:31.124921700| 7   "BCRX"    "NASDAQ" "BioCryst Pharmaceuticals, Inc."                      "$446.33M"
..

.qsp.read.fromStream

Read data using a KX Insights Stream

.qsp.read.fromStream[table]
.qsp.read.fromStream[table; stream]
.qsp.read.fromStream[table; stream; .qsp.use enlist[`index]!enlist index]

Parameters:

name type description default
table symbol or string A table name. Required
stream symbol or string The inbound stream $RT_SUB_TOPIC

options:

name type description default
index long The position in the stream to replay from. 0

For all common arguments, refer to configuring operators

This operator reads data using KX Insights Reliable Transport Streams.

.qsp.read.fromAmazonS3

Reads a file from AWS S3

.qsp.read.fromAmazonS3[path]
.qsp.read.fromAmazonS3[path; .qsp.use (!) . flip (
    (`mode     ; mode);
    (`tenant   ; tenant);
    (`region   ; region);
    (`chunking ; chunking);
    (`chunkSize; chunkSize))]

Parameters:

name type description default
path symbol The path of an object to read from S3. Required

options:

name type description default
mode long Either binary or text. binary
tenant symbol The authentication tenant. `
region string The AWS region to authenticate against. us-east-1
chunking string Either enabled, disabled or auto. auto
chunkSize long or string The size in bytes of chunks to read when chunking is enabled or the number of bytes to read using a size suffix. Sizes that are specified as strings are always assumed to be bytes. "1MB"

For all common arguments, refer to configuring operators

This operator reads an object from an Amazon Web Services S3 bucket as either text or binary.

File mode

Setting the file mode to binary will read content as a byte vector. When reading a file as text, data will be read as strings and be split on newlines. Each string vector represents a single line.

File chunking

Chunking a file splits the file into smaller batches and streams the batches through the pipeline. If chunking is set to auto, the reader will determine the size of the target file and if it is sufficiently large (more than a few megabytes) it will be read in chunks.

Authentication

The S3 reader uses kurl for credential discovery. See credential discovery in the AWS tab for more information.

Environment variable authentication:

To setup authentication using environment variables, set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.

For Docker based configurations, set the variables in the worker image configuration.

docker-compose:

version: "3.3"
services:
  worker:
    image: registry.dl.kx.com/kxi-sp-worker:1.1.0
    environment:
      AWS_ACCESS_KEY_ID: "abcd"
      AWS_SECRET_ACCESS_KEY: "iamasecret"

Docker credentials file for authentication:

To load a custom configuration file into a Docker deployment, mount the credentials file directory into the container and set KXI_SP_CONFIG_PATH to point to the configuration directory.

version: "3.3"
services:
  worker:
    image: registry.dl.kx.com/kxi-sp-worker:1.1.0
    volumes:
      - $HOME/.aws/:/config/awscreds
    environment:
      KXI_SP_CONFIG_PATH: "/config"

Next, add the secret name to the S3 reader configuration

.qsp.read.fromAmazonS3[`:s3://bucket/hello; .qsp.use``credentials!``awscreds]

Now deploying the pipeline will read the credentials from the AWS credentials file. Note that the credentials file name must be credentials.

For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name     : "s3-reader",
        type     : "spec",
        config   : { content: $spec },
        settings : { minWorkers: "1", maxWorkers: "10" },
        env      : { AWS_ACCESS_KEY_ID: "abcd", AWS_SECRET_ACCESS_KEY: "iamasecret" }
    }' | jq -asR .)"

Kubernetes secrets for authentication:

When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.

First, create a secret using an AWS credentials file.

kubectl create secret generic --from-file credentials=$HOME/.aws/credentials awscreds

Next, add the secret name to the S3 reader configuration

.qsp.read.fromAmazonS3[`:s3://bucket/hello; .qsp.use``credentials!``awscreds]

Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "s3-reader",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets : ["awscreds"] }
    }' | jq -asR .)"

Examples

Processing an AWS S3 object:

.qsp.run
  .qsp.read.fromAmazonS3[`:s3://bucket/numbers.txt]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..

Reading a text object by line:

.qsp.run
  .qsp.read.fromAmazonS3[`:s3://bucket/numbers.txt; ``mode!``text]
  .qsp.map["J"$]
  .qsp.write.toConsole[]

2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..
Reading with a custom region and tenant:

.qsp.run
  .qsp.read.fromAmazonS3[`:s3://bucket/object; `tenant`region!("custom"; "us-west-1")]
  .qsp.write.toVariable[`output]

.qsp.read.fromSQLServer

Execute a query on an SQL Server database

.qsp.read.fromSQLServer[query]
.qsp.read.fromSQLServer[query; database]
.qsp.read.fromSQLServer[query; database; .qsp.use (!) . flip (
    (`server  ; server);
    (`post    ; post);
    (`username; username);
    (`password; password))]

Parameters:

name type description default
query string Query to execute on the SQL Server database. Required
database string Name of the database to connect to. $KXI_SP_SQLSERVER_DATABASE

options:

name type description default
server string Address of the database. $KXI_SP_SQLSERVER_SERVER
post string Port of database. $KXI_SP_SQLSERVER_PORT
username string Username to authenticate with. $KXI_SP_SQLSERVER_USERNAME
password string Password to authenticate with. $KXI_SP_SQLSERVER_PASSWORD

For all common arguments, refer to configuring operators

This operator executes a query on a Microsoft SQL Server database and pushes the result into the pipeline.

After the query has completed processing, the SQL Server reader will signal a 'finish' command which will teardown the pipeline if there are no other pending requests.

Reading from a table 'stocks':

.qsp.run
  .qsp.read.fromSQLServer["select * from stocks"]
  .qsp.write.toConsole[]
                             | id  sym       market   name                                                  cap
-----------------------------| ---------------------------------------------------------------------------------------
2021.09.10D18:29:31.124921700| 1   "OXLC"    "NASDAQ" "Oxford Lane Capital Corp."                           "$229.56M"
2021.09.10D18:29:31.124921700| 2   "CIA"     "NYSE"   "Citizens, Inc."                                      "$350.57M"
2021.09.10D18:29:31.124921700| 3   "PEI"     "NYSE"   "Pennsylvania Real Estate Investment Trust"           "$785.28M"
2021.09.10D18:29:31.124921700| 4   "SPWR"    "NASDAQ" "SunPower Corporation"                                "$1.06B"
2021.09.10D18:29:31.124921700| 5   "DVA"     "NYSE"   "DaVita Inc."                                         "$12.88B"
2021.09.10D18:29:31.124921700| 6   "BHACW"   "NASDAQ" "Barington/Hilco Acquisition Corp."                   "n/a"
2021.09.10D18:29:31.124921700| 7   "BCRX"    "NASDAQ" "BioCryst Pharmaceuticals, Inc."                      "$446.33M"
..