Skip to content

Readers

Readers are a specialized type of operator that allow users to feed data from different external data sources into a streaming pipeline. Each reader has its own start, stop, setup and teardown functions to handle different streaming lifecycle events. Readers are assumed to be asynchronous and push data into a pipeline using .qsp.push.

Some readers, such as .qsp.read.fromCallback and .qsp.read.fromKafka, have implemented a stream-partitioning interface. When using these readers, the partitions are distributed over multiple Workers, orchestrated by the Controller.

Scaling

.qsp.read.fromCallback

Receives data from a callback from the local process or over IPC

.qsp.read.fromCallback[callback]
.qsp.read.fromCallback[callback; .qsp.use (!) . flip (
    (`partitions; partitions);
    (`key       ; key);
    (`replay    ; replay))]

Parameters:

name type description default
callback symbol The name of the function to define. Required

options:

name type description default
partitions vector A list of partition identifiers to distribute over available Workers. ()
key symbol or generic null (::) Name of the field which contains the key of the published event, or generic null for unkeyed data. Only works for dictionary events. (::)
replay boolean (Beta feature) If true, message replay is enabled for the Callback reader. On recovery, messages that arrived after the last checkpoint will be pushed to the pipeline. 0b

For all common arguments, refer to configuring operators

Beta Features

To enable beta features, set the environment variable KXI_SP_BETA_FEATURES to true.

This operator defines callback as a function in the q global namespace, causing data passed to the function (locally or over IPC) to enter the pipeline data flow.

If using replay, you must set $KXI_SP_EVENT_JOURNAL to "true", and set $KXI_SP_JOURNAL_DIR to the directory where you'd like the event journals to be stored. Event journals are files containing the messages published to the callback reader. Messages are replayed from the journals on recovery. If $KXI_SP_JOURNAL_DIR is not set, it defaults to being $KXI_SP_CHECKPOINT_DIR/journals.

Note that enabling replay adversely affects performance, and is disabled by default for this reason.

Single worker callback:

// This reader creates a monadic callback function that can be executed to push
// data into a pipeline. When the function `publish` is executed on an SP
// Worker with input data, that data is forwarded to subsequent operators in
// the pipeline.
.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toConsole[]
// Since the callback is defined in the global scope, data can be passed
// directly within the same program. This could be used to define `upd`
// handlers in kdb+ tick, or to wrap other data-acquisition methods such as `.Q.fs`.

publish ([] val: 100?10f )           // Call directly from the same process
upd: enlist[`trade]!enlist publish   // Define kdb+ tick upd handler
.Q.fs[publish] `:myfile.json         // Stream the file 'myfile.json' of
                                     // '\n'-delimited JSON records
                                     // through the pipeline

Multi-worker callback:

// Creates a partitioned callback reader that can be distributed over multiple
// SP Workers. This is creating a callback for each partition called `publish`
// and assign one of 10 partitions to that callback.
.qsp.run
  .qsp.read.fromCallback[.qsp.use `callback`partitions!(`publish; til 10)]
  .qsp.write.toConsole[]

Callback with message replay:

`KXI_SP_JOURNAL_DIR setenv "/tmp/journals";

.qsp.run
  .qsp.read.fromCallback[`publish; .qsp.use ``replay!(::;1b)]
  .qsp.write.toConsole[];
publish 0
publish 1
2023.02.08D17:18:21.593351490 | 0
2023.02.08D17:18:22.106658246 | 1
If you restart the pipeline now to trigger recovery, you will see the messages replayed.
2023-02-08 17:19:41.285 [] INFO  SP Finished setup
2023-02-08 17:19:41.285 [] INFO  SP Starting readers...
2023-02-08 17:19:41.285 [] INFO  SPREAD Performing Callback reader replay, id=callback_publish
2023.02.08D17:19:41.285481189 | 0
2023.02.08D17:19:41.285516250 | 1
2023-02-08 17:19:41.285 [] INFO  SP Readers started

.qsp.read.fromDatabase

Reads data from a kdb Insights Database

.qsp.read.fromDatabase[query]
.qsp.read.fromDatabase[query; .qsp.use (!) . flip (
    (`table           ; table);
    (`startTS         ; startTS);
    (`endTS           ; endTS);
    (`filter          ; filter);
    (`groupBy         ; groupBy);
    (`agg             ; agg);
    (`fill            ; fill);
    (`temporality     ; temporality);
    (`sortCols        ; sortCols);
    (`labels          ; labels);
    (`retries         ; retries);
    (`retryWait       ; retryWait))]

Parameters:

name type description default
query string SQL query to execute on the database. Required

options:

name type description default
table symbol Name of table to retrieve data from. `
startTS timestamp Inclusive start time of period of interest. -0Wp
endTS timestamp Exclusive end time of period of interest. 0Wp
filter list List of triadic lists of the form (function;column name;parameter). ()
groupBy symbol[] List of columns to group aggregation result by. ()
agg symbol[] List of triples of aggregations or columns to select. e.g. Aggregation dict example:(`c1`avg`price;`c2`sum`size), basic select example `price`size. ()
fill symbol How to handle nulls in the data. Supported values are zero and forward. The zero option fills numeric types with zeroes. The forward option fills nulls with previous, non-null entry. `zero
temporality symbol Sets the range of data in view for each day within the query. Supports two types of temporality: snapshot which takes a continuous range of the data, and slice which grabs data between the time values of startTS and endTS parameters, for each date within the start and end timestamp. `snapshot
sortCols symbol[] Columns to sort result data on (ascending). ()
labels dict Map of label selectors for specifying Assemblies. ()!()
retries long Number of times to retry the connection to the database. 60
retryWait timespan Time to wait between database connection attempts. 0D00:00:03

For all common arguments, refer to configuring operators

This operator issues a query either using SQL or a functional 'get data' query. If a single string parameter is provided, a SQL query is issued. Otherwise, the configuration dictionary is used, and table, startTS and endTS are considered required fields.

Database Host

In a Kubernetes deployment, the database reader will use the deployment release name to locate the query service gateway. When using a Docker configuration or to override the host address of the database, set $KXI_SP_DATABASE_HOST. The hostname should be the full URL of the Service Gateway including the port to connect to.

SQL query:

.qsp.run
  .qsp.read.fromDatabase["SELECT * FROM stocks"]
  .qsp.write.toConsole[]
                             | date        id  sym       market   name                                                  cap
-----------------------------| ---------------------------------------------------------------------------------------------------
2021.09.10D18:29:31.124921700| 2021.09.10  1   "OXLC"    "NASDAQ" "Oxford Lane Capital Corp."                           "$229.56M"
2021.09.10D18:29:31.124921700| 2021.09.10  2   "CIA"     "NYSE"   "Citizens, Inc."                                      "$350.57M"
2021.09.10D18:29:31.124921700| 2021.09.10  3   "PEI"     "NYSE"   "Pennsylvania Real Estate Investment Trust"           "$785.28M"
2021.09.10D18:29:31.124921700| 2021.09.10  4   "SPWR"    "NASDAQ" "SunPower Corporation"                                "$1.06B"
2021.09.10D18:29:31.124921700| 2021.09.10  5   "DVA"     "NYSE"   "DaVita Inc."                                         "$12.88B"
2021.09.10D18:29:31.124921700| 2021.09.10  6   "BHACW"   "NASDAQ" "Barington/Hilco Acquisition Corp."                   "n/a"
2021.09.10D18:29:31.124921700| 2021.09.10  7   "BCRX"    "NASDAQ" "BioCryst Pharmaceuticals, Inc."                      "$446.33M"
..

Get data API:

.qsp.run
  .qsp.read.fromDatabase[.qsp.use `table`startTS`endTS!(`stocks; .z.p-1D; .z.p)]
  .qsp.write.toConsole[]
                             | date        id  sym       market   name                                                  cap
-----------------------------| ---------------------------------------------------------------------------------------------------
2021.09.10D18:29:31.124921700| 2021.09.10  1   "OXLC"    "NASDAQ" "Oxford Lane Capital Corp."                           "$229.56M"
2021.09.10D18:29:31.124921700| 2021.09.10  2   "CIA"     "NYSE"   "Citizens, Inc."                                      "$350.57M"
2021.09.10D18:29:31.124921700| 2021.09.10  3   "PEI"     "NYSE"   "Pennsylvania Real Estate Investment Trust"           "$785.28M"
2021.09.10D18:29:31.124921700| 2021.09.10  4   "SPWR"    "NASDAQ" "SunPower Corporation"                                "$1.06B"
2021.09.10D18:29:31.124921700| 2021.09.10  5   "DVA"     "NYSE"   "DaVita Inc."                                         "$12.88B"
2021.09.10D18:29:31.124921700| 2021.09.10  6   "BHACW"   "NASDAQ" "Barington/Hilco Acquisition Corp."                   "n/a"
2021.09.10D18:29:31.124921700| 2021.09.10  7   "BCRX"    "NASDAQ" "BioCryst Pharmaceuticals, Inc."                      "$446.33M"
..

Enrich data with stream

stocks: .qsp.read.fromDatabase["select * from stocks"];

// Reads from a stream and filters on a table called 'trades', windows
// the data into 5 second intervals, then enriches the data with a join
// on time. Finally the data is written to a table called 'avgPrice'.
.qsp.run
  .qsp.read.fromStream[`trades]
  .qsp.window.tumbling[0D00:00:05; `time]
  .qsp.merge[stocks; { x lj `sym xkey y }]  // Join stocks to trades on sym
  .qsp.map[{ select avg price by sym from x }]
  .qsp.write.toStream[`avgPrice]
An example output of this pipeline using a toConsole writer might be as follows.

                             | sym  price
-----------------------------| -------------
2022.02.23D17:38:31.256064000| AAPL 360.1039
2022.02.23D17:38:31.256064000| GOOG 658.463
2022.02.23D17:38:31.256064000| HBP  737.0521
2022.02.23D17:38:31.256064000| NFLX 289.6375
2022.02.23D17:38:31.256064000| PEA  191.2175
..

.qsp.read.fromExpr

Evaluate expression or function into the pipeline

.qsp.read.fromExpr[expr]

Parameters:

name type description default
expr string or function A q expression, or in q only, a nullary synchronous function. Required

For all common arguments, refer to configuring operators

This operator evaluates the expression or function, and incorporates the resulting data into the pipeline.

Enter the table refData into the pipeline:

refData: ([] sensorID: 100?`8; sensorCode: 100?100)

.qsp.run
  .qsp.read.fromExpr["refData"]
  .qsp.write.toConsole[]

Query a separate process for data:

getSensor: { `:dap:4000 "select from trace where date = .z.d, sensorID = `id" }

.qsp.run
  .qsp.read.fromExpr[getSensor]
  .qsp.write.toConsole[]

.qsp.read.fromFile

Read file contents into pipeline

.qsp.read.fromFile[path]
.qsp.read.fromFile[path; .qsp.use (!) . flip (
    (`mode     ; mode);
    (`chunking ; chunking);
    (`chunkSize; chunkSize);
    (`watch    ; watch))]

Parameters:

name type description default
path symbol or symbol[] or string or string[] A filepath or list of file paths. Glob patterns are supported. Required

options:

name type description default
mode symbol Either binary or text. binary
chunking symbol Either enabled, disabled or auto. auto
chunkSize long or string The size in bytes of chunks to read when chunking is enabled or the number of bytes to read using a size suffix. Sizes that are specified as strings are always assumed to be bytes. "1MB"
watch boolean or dictionary Either 0b or 1b or a dictionary with keys method and frequency. The only current method is timer. The default frequency is 0D00:00:05. If watch set to 1b use the watch defaults. 0b

For all common arguments, refer to configuring operators

This operator reads one or more files, pushing their contents (bytes or characters) to the pipeline.

File mode

Setting the file mode to binary will read content as a byte vector. When reading file(s) as text, data will be read as strings and be split on newlines. Each string represents a single line.

File Chunking

Chunking a file splits the file into smaller batches and streams the batches through the pipeline. If chunking is set to auto, the reader will determine the size of the target file and if it is sufficiently large (more than a few megabytes) it will be read in chunks.

File watching

Setting the file watch to 1b will enable file watching with default settings. Usually this will be paired with a glob pattern for the file path or list of glob patterns. The reader will continually watch for new files matching the file path(s) provided using the watch method. Partitioning is not possible when using file watching. Note that by using watching the pipeline will continue watching until there is manual intervention to finish the pipeline. Also note, at present the file watcher will do nothing other than print a warning if a file is modified. In particular for files that are appended to, the new data will not be processed.

File watching should not to be used with the directWrite option in .qsp.write.toDatabase

You should not set file watch to 1b when using the directWrite in .qsp.write.toDatabase since directWrite relies on a definitive finish point which is not guaranteed when using the file watcher.

Progress API

Used to subscribe to events relating to the progress of file reading. There are 4 possible event types described below. We use .qsp.subscribe to subscribe to an eventType with a handler - a unary function taking an event as argument.

file events:

event type description data dictionary keys
file.found file path(s) have been found paths
file.start started to process path path, size
file.progress update on progress processing path path, totalBytes, bytesRead
file.end finished reading path path, size

data values

name q type description
paths string[] list of file paths to be read
path string file path
size long size of file
totalBytes long total bytes to be read
bytesRead long bytes read so far

Reading a file:

`:numbers.txt 0: string 100?1000;

.qsp.run
  .qsp.read.fromFile["numbers.txt"]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..

Reading multiple files:

`:numbersOne.txt 0: string til 5;
`:numbersTwo.txt 0: string 5 + til 5;

.qsp.run
  .qsp.read.fromFile[("numbersOne.txt";"numbersTwo.txt"); .qsp.use``mode!``text]
  .qsp.map[{[md; data] (md; "c"$data)}; .qsp.use ``params!(::; `metadata`data)]
  .qsp.write.toConsole[]
2022.10.19D16:57:37.385577001 | `offset`key!(10;`numbersOne.txt)
2022.10.19D16:57:37.385577001 | (,"0";,"1";,"2";,"3";,"4")
2022.10.19D16:57:37.393146238 | `offset`key!(10;`numbersTwo.txt)
2022.10.19D16:57:37.393146238 | (,"5";,"6";,"7";,"8";,"9")

Reading text data by line:

`:numbers.txt 0: string 100?1000;

.qsp.run
  .qsp.read.fromFile["numbers.txt"; ``mode!``text]
  .qsp.map["J"$]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..

Reading binary data:

`:numbers.bin 1: 100?0x0;

.qsp.run
  .qsp.read.fromFile["numbers.bin"]
  .qsp.write.toVariable[`output]

output
0xebf68daee782ca3f0156512c444a35e1dfe4bc471594..

File Watching:

`:/tmp/numbers.txt 0: string til 5;

.qsp.run
  .qsp.read.fromFile["/tmp/number*.txt"; .qsp.use `mode`watch!(`text;1b)]
  .qsp.write.toConsole[]
2024.08.19D16:42:59.888120571 | ,"0"
2024.08.19D16:42:59.888120571 | ,"1"
2024.08.19D16:42:59.888120571 | ,"2"
2024.08.19D16:42:59.888120571 | ,"3"
2024.08.19D16:42:59.888120571 | ,"4"
`:/tmp/numberOnes.txt 0: string 1 1 1;
2024.08.19D16:45:05.122681552 | ,"1"
2024.08.19D16:45:05.122681552 | ,"1"
2024.08.19D16:45:05.122681552 | ,"1"

Subscribing to file events:

.file.events: ([] eventType: `symbol$(); eventTime: `timestamp$(); origin: `symbol$(); data:());
.qsp.subscribe[ ;{`.file.events upsert value x}] each `file.found`file.start`file.progress`file.end;
`:/tmp/numbers.txt 0: string til 5;

.qsp.run
  .qsp.read.fromFile[`:/tmp/numbers.txt]
  .qsp.write.toVariable[`test.output];
q).file.events
eventType     eventTime                     origin                 data
-------------------------------------------------------------------------------------------------------------------------
file.found    2022.11.18D09:41:14.111578506 file_:/tmp/numbers_txt (,`paths)!,,":/tmp/numbers.txt"
file.start    2022.11.18D09:41:14.111668936 file_:/tmp/numbers_txt `path`size!(":/tmp/numbers.txt";10)
file.progress 2022.11.18D09:41:14.111703746 file_:/tmp/numbers_txt `path`totalBytes`bytesRead!(":/tmp/numbers.txt";10;10)
file.end      2022.11.18D09:41:14.111710593 file_:/tmp/numbers_txt `path`size!(":/tmp/numbers.txt";10)

.qsp.read.fromGoogleStorage

Reads an object from Google Cloud Storage

.qsp.read.fromGoogleStorage[path]
.qsp.read.fromGoogleStorage[path; .qsp.use (!) . flip (
    (`project    ; project);
    (`mode       ; mode);
    (`tenant     ; tenant);
    (`chunking   ; chunking);
    (`chunkSize  ; chunkSize);
    (`credentials; credentials);
    (`watch      ; watch))]

Parameters:

name type description default
path string or string[] or symbol or symbol[] The Cloud Storage URI of an object or multiple objects to read from Google Cloud Storage. Glob patterns are supported. Required

options:

name type description default
project string The Google Cloud Storage project ID. ""
mode symbol Either binary or text. binary
tenant string The authentication tenant. ""
chunking symbol Either enabled, disabled or auto. auto
chunkSize long or string The size in bytes of chunks to read when chunking is enabled or the number of bytes to read using a size suffix. Sizes that are specified as strings are always assumed to be bytes. "1MB"
credentials string Name of a credentials secret to mount. See the authentication section below for details. ""
watch boolean or dictionary Either 0b or 1b or a dictionary with keys method and frequency. The only current method is timer. The default frequency is 0D00:00:05. If watch set to 1b use the watch defaults. 0b

For all common arguments, refer to configuring operators

This operator reads an object from a Google Cloud Storage bucket.

Object mode

Setting the object mode to binary will read content as a byte vector. When reading an object as text, data will be read as strings and be split on newlines. Each string vector represents a single line.

Partitioning

The pipeline will automatically scale the number of workers to the minimum of the number of file paths and the maxWorkers in the pipeline settings. The workers will read a disjoint union of the paths. You can disable partitioning by setting maxWorkers to 1. Note that partitioning is not currently possible when there is more than one reader.

File watching

Setting the file watch to 1b will enable file watching with default settings. Usually this will be paired with a glob pattern for the file path or list of glob patterns. The reader will continually watch for new files matching the file path(s) provided using the watch method. Partitioning is not possible when using file watching. Note that by using watching the pipeline will continue watching until there is manual intervention to finish the pipeline. Also note, at present the file watcher will do nothing other than print a warning if a file is modified. In particular for files that are appended to, the new data will not be processed.

File watching should not to be used with the directWrite option in .qsp.write.toDatabase

You should not set file watch to 1b when using the directWrite in .qsp.write.toDatabase since directWrite relies on a definitive finish point which is not guaranteed when using the file watcher.

Object chunking

Chunking an object splits the object into smaller batches and streams the batches through the pipeline. If chunking is set to auto, the reader will determine the size of the target object and if it is sufficiently large (more than a few megabytes) it will be read in chunks.

Operator can only read from one tenant

When reading in multiple files there must only be one tenant used to access all file paths passed into the operator.

Required Permissions

.qsp.read.fromGoogleStorage requires the ability to list and retrieve objects on the defined Google bucket and as such requires the credentials provided by the user to have access to the storage.objects.list, storage.objects.get and storage.buckets.get IAM permission, see here for more information.

Authentication

The Google Cloud Storage reader uses kurl for credential discovery. See credential discovery in the GCP tab for more information. When running on Google provisioned cloud infrastructure, the credential discovery will automatically use the credentials of the user that launched the instance.

Environment variable authentication:

To setup authentication using an environment variable, set GOOGLE_STORAGE_TOKEN with the output of running gcloud auth print-access-token. To install the Google SDK, refer to these instructions.

For Docker based configurations, set the variables in the worker image configuration.

docker-compose:

version: "3.3"
services:
  worker:
    image: portal.dl.kx.com/kxi-sp-worker:1.11.0
    environment:
      GOOGLE_STORAGE_TOKEN: "123abc"

For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name     : "gs-reader",
        type     : "spec",
        config   : { content: $spec },
        settings : { minWorkers: "1", maxWorkers: "10" },
        env      : { GOOGLE_STORAGE_TOKEN: "123abc" }
    }' | jq -asR .)"

Kubernetes secrets for authentication:

When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.

First, create a secret using a generic secret. Take care to ensure the secret is created in the correct namespace.

kubectl create secret generic --from-literal token=$(gcloud auth print-access-token) gscreds

Next, add the secret name to the Google Cloud Reader reader configuration

.qsp.read.fromGoogleStorage[`:gs://bucket/hello; .qsp.use``credentials!``gscreds]

Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "gs-reader",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets : ["gscreds"] }
    }' | jq -asR .)"

Examples

Processing an Google Cloud Storage object:

.qsp.run
  .qsp.read.fromGoogleStorage[`:gs://bucket/numbers.txt]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..

Processing multiple Google Cloud Storage objects:

.qsp.run
  .qsp.read.fromGoogleStorage[(`:gs://bucket/numbersOne.txt;`:gs://bucket/numbersTwo.txt)]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
2021.09.10D10:28:37.177672179 | 80
2021.09.10D10:28:37.177672179 | 91
2021.09.10D10:28:37.177672179 | 642
2021.09.10D10:28:37.177672179 | 485
2021.09.10D10:28:37.177672179 | 32
..

Processing multiple Google Cloud Storage objects using glob pattern matching:

.qsp.run
  .qsp.read.fromGoogleStorage["gs://bucket/numbers*.txt"]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
2021.09.10D10:28:37.177672179 | 80
2021.09.10D10:28:37.177672179 | 91
2021.09.10D10:28:37.177672179 | 642
2021.09.10D10:28:37.177672179 | 485
2021.09.10D10:28:37.177672179 | 32
..

File Watching:

.qsp.run
  .qsp.read.fromGoogleStorage["gs://bucket/numbers*.txt"; .qsp.use ``watch!(`;1b)]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2023.06.15D10:49:19.795351384 | 556
2023.06.15D10:49:19.795351384 | 465
2023.06.15D10:49:19.795351384 | 63
2023.06.15D10:49:19.795351384 | 106
2023.06.15D10:49:19.795351384 | 46
2023.06.15D10:49:20.352682229 | 80
2023.06.15D10:49:20.352682229 | 91
2023.06.15D10:49:20.352682229 | 642
2023.06.15D10:49:20.352682229 | 485
2023.06.15D10:49:20.352682229 | 32

If next we upload numbersThree.txt to the bucket we see:

2023.06.15D10:49:42.323727244 | 908
2023.06.15D10:49:42.323727244 | 360
2023.06.15D10:49:42.323727244 | 522
2023.06.15D10:49:42.323727244 | 257
2023.06.15D10:49:42.323727244 | 858

Reading a text object by line:

.qsp.run
  .qsp.read.fromGoogleStorage[`:gs://bucket/numbers.txt; ``mode!``text]
  .qsp.map["J"$]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..

Reading multiple text objects by line:

.qsp.run
  .qsp.read.fromGoogleStorage[(`:gs://bucket/numbersOne.txt;`:gs://bucket/numbersTwo.txt);
     ``mode!``text]
  .qsp.map["J"$]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..
2021.09.10D10:28:37.177672179 | 405
2021.09.10D10:28:37.177672179 | 215
2021.09.10D10:28:37.177672179 | 564
2021.09.10D10:28:37.177672179 | 50
2021.09.10D10:28:37.177672179 | 765
..

Reading with a custom tenant:

.qsp.run
  .qsp.read.fromGoogleStorage[`:gs://bucket/object; .qsp.use ``tenant!(""; "custom")]
  .qsp.write.toVariable[`output]

.qsp.read.fromHTTP

Requests data from an HTTP(S) endpoint

.qsp.read.fromHTTP[url]
.qsp.read.fromHTTP[url; method]
.qsp.read.fromHTTP[url; method; .qsp.use (!) . flip (
    (`body            ; body);
    (`header          ; header);
    (`onResponse      ; onResponse);
    (`followRedirects ; followRedirects);
    (`maxRedirects    ; maxRedirects);
    (`maxRetryAttempts; maxRetryAttempts);
    (`timeout         ; timeout);
    (`tenant          ; tenant);
    (`insecure        ; insecure);
    (`binary          ; binary);
    (`sync            ; sync);
    (`rejectErrors    ; rejectErrors)
    )]

Parameters:

name type description default
url string or symbol The URL to send a request to. Required
method string or symbol The HTTP method for the HTTP request (ex. GET, POST, etc.). GET

options:

name type description default
body string The payload of the HTTP request. ""
header dict A map of header fields to send as part of the request. ()!()
onResponse function After a response, allows the response to be preprocessed or to trigger another request. This function receives the result of the last request as a list where the first element is the return code, the second is the body and the third is the headers of the request. Returning :: will process the return from the original request immediately. A return of a string will issue another request with the return value as the URL. A return of a dictionary allows for any of the operator parameters to be reset and a new HTTP request issued. A special 'response' key can be used in the return dictionary to change the payload of the response. If the response key is set to ::, no data is pushed into the pipeline. ::
followRedirects boolean If set, any redirects will automatically be followed up to the maximum number of redirects. 1b
maxRedirects long The maximum number of redirects to follow before reporting an error. 5
maxRetryAttempts long The maximum number of times to retry a request that fails due to a timeout. 10
timeout long The duration in milliseconds to wait for a request to be completed before reporting an error. A timeout of 0 is unlimited 5000
tenant symbol The request tenant to use for providing request authentication details. ""
insecure boolean Indicates if unverified server SSL/TLS certificates should be trusted. 0b
binary boolean Indicates that the resulting payload should be returned as binary data, otherwise text is assumed. 0b
sync boolean Indicates if this request should be made synchronously or asynchronously. Setting the request to be synchronous will block the process until the request is completed. 0b
rejectErrors boolean Non-successful response codes will generate an error and stop the pipeline. 1b

For all common arguments, refer to configuring operators

Nondeterminism

This reader is not yet deterministic. When recovering from a checkpoint, data from before the recovery will not be replayed, which can result in data loss or duplication.

Return data from a GET request:

.qsp.run
  .qsp.read.fromHTTP["https://example.com"]
  .qsp.write.toVariable[`output]

Read paged data:

// @overview
// A static example URL.
// @type {string}
URL: "https://example.com"

// @overview
// An 'onResponse' handler that pages through the results of a web API.
// In this example, the response header (`res[2]`) contains a "next-page" key
// with the index of the next page to read. If the key is present, it returns
// a new URL for the HTTP reader to get data from. This will repeat the request
// with the next page until all the data has been read and there are no more pages.
//
// @param res {(int;string;dict)} The response of the last request.
//
// @return {string|null} A string return is a URL for the HTTP reader to issue a
//   a new request to.
nextPage: {[res] $[0 < count page: res[2]"S"$"next-page"; URL,"?page=",page; ::]}

.qsp.run
 .qsp.read.fromHTTP[URL,"?page=0"; .qsp.use``onResponse!(::;nextPage)]
 .qsp.write.toVariable[`output]

Return data from header:

// @overview
// A static example URL.
// @type {string}
URL: "https://example.com"

// @overview
// An onResponse handler that extracts information from the response handler and
// overwrites the data that is pushed into the pipeline to be this value.
//
// @param res {(int;string;dict)} The response of the last request.
//
// @return {dict (response: dict)}
onResponse: {[res] enlist[`response]!enlist res 2 }

.qsp.run
 .qsp.read.fromHTTP[URL; .qsp.use``onResponse!(::;onResponse)]
 .qsp.write.toVariable[`output]

.qsp.read.fromKafka

Consume data from a Kafka topic

.qsp.read.fromKafka[topic]
.qsp.read.fromKafka[topic; brokers]
.qsp.read.fromKafka[topic; brokers; .qsp.use (!) . flip (
    (`retries      ; retries);
    (`retryWait    ; retryWait);
    (`pollLimit    ; pollLimit);
    (`offset       ; offset);
    (`registry     ; registry);
    (`asList       ; asList);
    (`options      ; options))]

Parameters:

name type description default
topic symbol or string The name of a topic. Required
brokers string or string[] or symbol or symbol[] One or more brokers identified as host:port pairs. "localhost:9092"

options:

name type description default
retries long Max retry attempts for Kafka API calls. 10
retryWait timespan Time to wait between retry attempts. 0D00:00:02
pollLimit long Maximum number of records to process in a single poll loop. 1000
offset dictionary Partitions to offsets to start reading from. (`int$())!()
registry string Optional URL to a Kafka Schema Registry. When provided, Kafka Schema Registry mode is enabled, allowing for automatic payload decoding. ""
asList boolean Set to true for Kafka Schema Registry messages to omit field names when decoding Protocol Buffer schemas, and instead return only the list of values. 0b
options dictionary Options to be passed through to Kafka. ()!()

For all common arguments, refer to configuring operators

This operator acts as a Kafka consumer, consuming data from a Kafka topic and pushing it to the pipeline.

A Kafka consumer will distribute load across parallel readers to maximize the throughput of a given topic.

Workers that join the stream will start reading from the end of the stream unless an explicit offset is provided. To start by reading from the beginning of the stream, a special start symbol can be used as the offset value, as in the Custom Offsets example below.

Beside the above keys, all available Kafka consumer configuration options are supported using the options dictionary. This includes properties such as socket.timeout.ms, fetch.message.max.bytes, etc.

Reserved keys

group.id, metadata.broker.list, and consumer.id are reserved values and are maintained directly by the Kafka reader. The Kafka reader will error if these options are used.

Maximum Poll Limit

The Kafka reader reads multiple messages in a single poll iteration. A global limit is imposed on the number of messages to read in a single cycle to avoid locking the process in a read loop only serving Kafka messages. By default this limit is set to 1000 messages. Setting the configuration option pollLimit will change this value. This limit is global so if multiple readers set this value, only one will be used.

Running with TLS

It is recommended for all production deployments that TLS be enabled for encrypting data in transit. For more information about configuring TLS, refer to either the microservice tutorial or the platform tutorial depending on your deployment type.

Offset Committing and Group Permissions

.qsp.read.fromKafka will automatically commit offsets when reading from a kafka broker. This is essential for exactly once semantics and fault tolerance. The error "Local: Waiting for coordinator" will occur when attempting to commit offsets if you do not have sufficient group permissions. The broker should be configured so that the user has group Describe permissions. Since .qsp.read.fromKafka will generate a random group ID, ensure the user is permissioned for all group names on the broker.

Localhost broker:

// Read topic 'numbers' with a default broker at 'localhost:9092'.
// This topic is generating random numbers between 0 and 10
.qsp.run .qsp.read.fromKafka[`numbers] .qsp.write.toConsole[]
2021.03.08D19:19:08.171805000 | 4
2021.03.08D19:19:08.171915000 | 7
2021.03.08D19:19:08.172081000 | 3
..

Multiple brokers:

// Read topic 'numbers' from multiple brokers. This topic is generating
// random numbers between 0 and 10
.qsp.run
  .qsp.read.fromKafka[.qsp.use `topic`brokers!(`numbers;("localhost:1234";"localhost:1235"))]
  .qsp.write.toConsole[]
2021.03.08D19:19:08.171805000 | 4
2021.03.08D19:19:08.171915000 | 7
2021.03.08D19:19:08.172081000 | 3
..

Advanced configuration:

// Reads topic 'numbers' with a default broker setting custom values for advanced
// consumer configuration.
.qsp.run
  .qsp.read.fromKafka[.qsp.use (!) . flip (
    (`topic  ; `numbers);
    (`options; (!) . flip (
        (`socket.timeout.ms         ; 60000);       // Wait 1 minute for socket timeout
        (`fetch.message.max.bytes   ; 4*1024*1024); // Allow 4MiB max message size
        (`fetch.wait.max.ms         ; 500))))]      // Wait up to 500ms for fetch cycle
  .qsp.write.toConsole[]
2021.03.08D19:19:08.171805000 | 4
2021.03.08D19:19:08.171915000 | 7
2021.03.08D19:19:08.172081000 | 3
..

Custom offsets:

// Reads topic 'numbers' from the end of the data stream. This will skip any
// old messages in the stream and start reading from the point that a given
// worker joins the stream. Note that the default offset for a given stream is
// to read from the beginning of the stream.
.qsp.run
  .qsp.read.fromKafka[.qsp.use (!) . flip (
    (`topic  ; `numbers);
    (`offset ; `start))]   // The 'offset' field can be set to a single value
                            // to apply to all partitions. Alternatively it can
                            // be set to a dictionary of partitions and
                            // offsets. Offsets can either be the special
                            // symbols `start or `end or a literal offset index
                            // as a number. Example: 0 1i!(`start; 10)
  .qsp.write.toConsole[]
2021.03.08D19:19:08.171805000 | 8
2021.03.08D19:19:08.171915000 | 1
2021.03.08D19:19:08.172081000 | 9
..

TLS authentication:

For more information on TLS configuration, see either the microservice example or platform example depending on the desired deployment configuration.

// Reads topic 'numbers' from a broker configured at `kafka:9092` using
// custom TLS/SSL certificates.
.qsp.run
  .qsp.read.fromKafka[.qsp.use (!) . flip (
    (`brokers; "kafka:9092");
    (`topic  ; `numbers);
    (`options; (!) . flip (
        (`ssl.secret       ; "certs");
        (`ssl.key.password ; "iamsecure"))))]
  .qsp.write.toConsole[]
2021.03.08D19:19:08.171805000 | 4
2021.03.08D19:19:08.171915000 | 7
2021.03.08D19:19:08.172081000 | 3
..

Kafka Schema Registry:

.qsp.run
  .qsp.read.fromKafka[.qsp.use (!) . flip (
    (`topic;`numbers);
    (`brokers;"localhost:9092");
    (`registry;"http://localhost:8081"))]
  .qsp.write.toConsole[];

.qsp.read.fromMQTT

Subscribe to an MQTT topic

.qsp.read.fromMQTT[topic;broker]
.qsp.read.fromMQTT[topic;broker; .qsp.use (!) . flip (
    (`username; username);
    (`password; password))]

Parameters:

name type description default
topic string or symbol The MQTT topic to subscribe to. Required
broker string or symbol URI of MQTT broker in form of protocol://host:port. protocol must be one of "tcp" or "ssl". SSL options such as sslCert are ignored when protocol is "tcp". Required

options:

name type description default
username string or symbol Username for the MQTT broker connection. ""
password string or symbol Password for the MQTT broker connection. ""
sslCert string or symbol Path of SSL/TLS certificate PEM file. ""
sslKey string or symbol Path of SSL/TLS key PEM file. ""
sslKeyPassword string or symbol Password of private key file if encrypted. ""
sslCAFile string or symbol Path of SSL/TLS public certificate chain. ""
sslCAPath string or symbol Path to directory of trusted certificates for SSL connection. ""
sslSecret string or symbol Name of Kubernetes secret to read SSL configuration from. ""

For all common arguments, refer to configuring operators

This operator subscribes to an MQTT broker, pushing messages published to that topic to the pipeline.

Quality of Service

This operator only supports quality of service 0 (at most once messaging). Messages published at a higher QoS will be received at QoS 0.

Determinism

This reader is non-deterministic, as only quality of service 0 (at most once messaging) has been implemented. As such, the flag KXI_ALLOW_NONDETERMINISM must be set to "true" to use this reader.

Subscribe to topic "readings":

.qsp.run
    .qsp.read.fromMQTT["readings"; "tcp://localhost:1883"]
    .qsp.write.toConsole[];
$ mosquitto_pub -t readings -m "Hello, World!"
2022.07.21D14:51:45.023073601 | "Hello, World!"

Subscribe to a topic "readings" using SSL

.qsp.run
     .qsp.read.fromMQTT[.qsp.use (!) . flip (
         (`topic         ; "readings");
         (`broker        ; "ssl://localhost:1883");
         (`sslCert       ; "/ssl/cert.pem");
         (`sslKey        ; "/ssl/key.pem");
         (`sslKeyPassword; "iamsecure");
         (`sslCAFile     ; "/ssl/ca.pem"))];
     .qsp.write.toConsole[]

.qsp.read.fromAzureStorage

Reads an object from Microsoft Azure Storage

.qsp.read.fromAzureStorage[path]
.qsp.read.fromAzureStorage[path; account]
.qsp.read.fromAzureStorage[path; account; .qsp.use (!) . flip (
    (`mode     ; mode);
    (`tenant   ; tenant);
    (`chunking ; chunking);
    (`chunkSize; chunkSize);
    (`watch    ; watch))]

Parameters:

name type description default
path string or string[] or symbol or symbol[] The Blob URI of an object or multiple objects to read from Microsoft Azure Storage. Note that this must be an ms:// URL*, not an https:// URL. Glob patterns are supported. Required
account string or symbol The name of the Microsoft Azure Storage account to read an object from. $AZURE_STORAGE_ACCOUNT

options:

name type description default
mode symbol Either binary or text. binary
tenant string The authentication tenant. ""
chunking symbol Either enabled, disabled or auto. auto
chunkSize long or string The size in bytes of chunks to read when chunking is enabled or the number of bytes to read using a size suffix. Sizes that are specified as strings are always assumed to be bytes. "1MB"
credentials string Name of a credentials secret to mount. See the authentication section below for details. ""
watch boolean or dictionary Either 0b or 1b or a dictionary with keys method and frequency. The only current method is timer. The default frequency is 0D00:00:05. If watch set to 1b use the watch defaults. 0b

For all common arguments, refer to configuring operators

ms:// URLs

This operator requires URLs to be in the format ms://yourContainerName/path/to/file.csv, so the URL https://myStorageAccount.blob.core.windows.net/myContainerName/path/to/file.csv would be written as .qsp.read.fromAzureStorage["ms://myContainerName/path/to/file.csv"; "myStorageAccount"]

This operator reads an object from a Microsoft Azure Blob Storage bucket.

Object mode

Setting the object mode to binary will read content as a byte vector. When reading an object as text, data will be read as strings and be split on newlines. Each string vector represents a single line.

Partitioning

The pipeline will automatically scale the number of workers to the minimum of the number of file paths and the maxWorkers in the pipeline settings. The workers will read a disjoint union of the paths. You can disable partitioning by setting maxWorkers to 1. Note that partitioning is not currently possible when there is more than one reader.

File watching

Setting the file watch to 1b will enable file watching with default settings. Usually this will be paired with a glob pattern for the file path or list of glob patterns. The reader will continually watch for new files matching the file path(s) provided using the watch method. Partitioning is not possible when using file watching. Note that by using watching the pipeline will continue watching until there is manual intervention to finish the pipeline. Also note, at present the file watcher will do nothing other than print a warning if a file is modified. In particular for files that are appended to, the new data will not be processed.

File watching should not to be used with the directWrite option in .qsp.write.toDatabase

You should not set file watch to 1b when using the directWrite in .qsp.write.toDatabase since directWrite relies on a definitive finish point which is not guaranteed when using the file watcher.

Object chunking

Chunking an object splits the object into smaller batches and streams the batches through the pipeline. If chunking is set to auto, the reader will determine the size of the target object and if it is sufficiently large (more than a few megabytes) it will be read in chunks.

Operator can only read from one account, domain and tenant

When reading in multiple files there must only be one account used to access all file paths passed into the operator. The same goes for domain and tenant.

Required Permissions

.qsp.read.fromAzure requires the ability to list and retrieve objects on the defined Azure blob storage buckets and as such requires the credentials provided by the user to have access to the following operations List Blobs and Get Blobs follow the supplied links for more information.

Authentication

The Microsoft Azure Storage reader uses kurl for credential discovery. See credential discovery in the Azure tab for more information. When running on Azure provisioned cloud infrastructure, the credential discovery will automatically use the credentials of the user that launched the instance.

Environment variable authentication:

To setup authentication using environment variables, set AZURE_STORAGE_ACCOUNT to the name of the storage account to read from and AZURE_STORAGE_SHARED_KEY to one of the keys for that account. To use shared keys, refer to these instructions

For Docker based configurations, set the variables in the worker image configuration.

docker-compose:

version: "3.3"
services:
  worker:
    image: portal.dl.kx.com/kxi-sp-worker:1.11.0
    environment:
      AZURE_STORAGE_ACCOUNT: "123abc"
      AZURE_STORAGE_SHARED_KEY: "aAabBbcCcdDd111222333"

For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name     : "ms-reader",
        type     : "spec",
        config   : { content: $spec },
        settings : { minWorkers: "1", maxWorkers: "10" },
        env      : {
            AZURE_STORAGE_ACCOUNT: "123abc",
            AZURE_STORAGE_SHARED_KEY: "aAabBbcCcdDd111222333"
        }
    }' | jq -asR .)"

Kubernetes secrets for authentication:

When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.

First, create a secret using a generic secret. Take care to ensure the secret is created in the correct namespace.

kubectl create secret generic --from-literal token="aAabBbcCcdDd111222333" mscreds

Next, add the secret name and the account name to the Azure Storage Reader reader configuration.

.qsp.read.fromAzureStorage[`:ms://bucket/hello; "abc123"; .qsp.use``credentials!``mscreds]

Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "ms-reader",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets : ["mscreds"] }
    }' | jq -asR .)"

Examples

Processing an Microsoft Azure Storage object:

.qsp.run
  .qsp.read.fromAzureStorage[`:ms://bucket/numbers.txt; `myaccount]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..

Processing multiple Microsoft Azure Storage objects:

.qsp.run
  .qsp.read.fromAzureStorage[(`:ms://bucket/fileOne.txt;`:ms://bucket/fileTwo.txt); `myaccount]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
2021.09.10D10:28:37.177672179 | 32
2021.09.10D10:28:37.177672179 | 62
2021.09.10D10:28:37.177672179 | 567
2021.09.10D10:28:37.177672179 | 20
2021.09.10D10:28:37.177672179 | 13
..

Processing multiple Microsoft Azure Storage objects using glob pattern matching:

.qsp.run
  .qsp.read.fromAzureStorage["ms://bucket/file*.txt"; `myaccount]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
2021.09.10D10:28:37.177672179 | 32
2021.09.10D10:28:37.177672179 | 62
2021.09.10D10:28:37.177672179 | 567
2021.09.10D10:28:37.177672179 | 20
2021.09.10D10:28:37.177672179 | 13
..

File Watching:

.qsp.run
  .qsp.read.fromAzureStorage["ms://bucket/file*.txt"; `myaccount; .qsp.use ``watch!(`;1b)]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2023.06.15D10:49:19.795351384 | 556
2023.06.15D10:49:19.795351384 | 465
2023.06.15D10:49:19.795351384 | 63
2023.06.15D10:49:19.795351384 | 106
2023.06.15D10:49:19.795351384 | 46
2023.06.15D10:49:20.352682229 | 32
2023.06.15D10:49:20.352682229 | 62
2023.06.15D10:49:20.352682229 | 567
2023.06.15D10:49:20.352682229 | 20
2023.06.15D10:49:20.352682229 | 13

If next we upload fileThree.txt to the bucket we see:

2023.06.15D10:49:42.323727244 | 908
2023.06.15D10:49:42.323727244 | 360
2023.06.15D10:49:42.323727244 | 522
2023.06.15D10:49:42.323727244 | 257
2023.06.15D10:49:42.323727244 | 858
Reading a text object by line:

.qsp.run
  .qsp.read.fromAzureStorage[`:ms://bucket/numbers.txt; `myaccount; .qsp.use ``mode!``text]
  .qsp.map["J"$]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..

Reading multiple text objects by line:

.qsp.run
  .qsp.read.fromAzureStorage[(`:ms://bucket/numbersOne.txt;`:ms://bucket/numbersTwo.txt);
     `myaccount; .qsp.use ``mode!``text]
  .qsp.map["J"$]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..
2021.09.10D10:28:37.177672179 | 324
2021.09.10D10:28:37.177672179 | 214
2021.09.10D10:28:37.177672179 | 877
2021.09.10D10:28:37.177672179 | 96
2021.09.10D10:28:37.177672179 | 102
..

Reading with a custom tenant:

.qsp.run
  .qsp.read.fromAzureStorage[`:ms://bucket/object; .qsp.use `account`tenant!("myaccount"; "custom")]
  .qsp.write.toVariable[`output]

.qsp.read.fromParquet

The parquet file reader reads the given parquet file, entering the parquet file contents (table or mixed list of arrays) into the pipeline.

Performance and Memory Efficiency

The defaults are generally tuned towards good performance. The plugin may not handle parquet files which are larger than the total amount of memory. To handle such files, you can partition them into separate tabular datasets to be efficiently processed by the arrowkdb APIs for partitioning and metadata.

.qsp.read.fromParquet[path]
.qsp.read.fromParquet[path; .qsp.use (!) . flip (
    (`mode         ; mode);
    (`metadata     ; metadata);
    (`storage      ; storage);
    (`region       ; region);
    (`certificates ; certificates);
    (`credentials  ; credentials);
    (`connection   ; connection);
    (`project      ; project);
    (`domain       ; domain);
    (`tenant       ; tenant);
    (`watch        ; watch))]

Parameters:

name type description default
path symbol or symbol[] or string or string[] A file path or list of file paths of parquet file(s). Glob patterns are supported. Required

options:

name type description default
mode symbol Either table or lists. table
metadata dictionary Metadata keys with their types to be applied. (::)
storage string Temporary storage of downloaded parquet files. /tmp
region string The AWS region to authenticate against. us-east-1
certificates string Location of default trust store of SSL certificates. "/opt/kx/ca-bundle.crt"
credentials string Name of a credentials secret to mount. ""
connection string The Azure Storage account connection string. ""
project string The Google Cloud Storage project ID. ""
domain string The name of your domain. ""
tenant symbol The authentication tenant. `
watch boolean or dictionary Either 0b or 1b or a dictionary with keys method and frequency. The only current method is timer. The default frequency is 0D00:00:05. If watch set to 1b use the watch defaults. 0b

For all common arguments, refer to configuring operators

This operator decodes a Parquet file into either a kdb+ table or mixed list of arrays.

Parquet mode

Setting the parquet mode to table will read content as a native KDB table. When reading a parquet file as lists, data will be read as list of arrays. Each array represents a single column.

Parquet metadata

Metadata maps tag values taken as symbols or strings to KDB datatypes named at https://code.kx.com/q/basics/datatypes/. Valid examples of such metadata might be: `year`month!`int`short or ("year";"month")!("int";"short").

Partitioning

The pipeline will automatically scale the number of workers to the minimum of the number of file paths and the maxWorkers in the pipeline settings. The workers will read a disjoint union of the paths. You can disable partitioning by setting maxWorkers to 1. Note that partitioning is not currently possible when there is more than one reader.

Parquet file watching

Setting parquet file watching to 1b will enable parquet file watching with default settings. Usually this will be paired with a glob pattern for the parquet file path or list of glob patterns. The reader will continually watch for new parquet files matching the parquet file path(s) provided using the watch method. Partitioning is not possible when using file watching. Note that by using watching the pipeline will continue watching until there is manual intervention to finish the pipeline. Also note, at present the parquet file watcher will do nothing other than print a warning if a file is modified. In particular for parquet files that are appended to, the new data will not be processed.

File watching should not to be used with the directWrite option in .qsp.write.toDatabase

You should not set file watch to 1b when using the directWrite in .qsp.write.toDatabase since directWrite relies on a definitive finish point which is not guaranteed when using the file watcher.

Parquet chunking

Chunking is not possible with the current reader. Since parquet files include a file footer which is mandatory with each file, a file cannot be parsed by simply partitioning the file. To partition parquet files, go to parquet.apache.org and follow the guide to split parquet files.

Operator can only watch from one domain and tenant

When reading in multiple files there must only be one domain and tenant used to access all file paths passed into the operator.

Support of Apache Arrow compression algorithms

This reader currently only supports Parquet data stored in Snappy compression format.

Authentication of Parquet reader

Docker authentication:

For Docker based configurations, set the variables in the worker image configuration.

docker-compose:

version: "3.3"
services:
    worker:
        image: portal.dl.kx.com/kxi-sp-worker:1.11.0
        environment:
            AWS_ACCESS_KEY_ID: "abcd"
            AWS_SECRET_ACCESS_KEY: "iamasecret"
            AZURE_STORAGE_ACCOUNT: "123abc"
            AZURE_STORAGE_SHARED_KEY: "aAabBbcCcdDd111222333"
            AZURE_STORAGE_CONNECTION_STRING: "<REDACTED>"
            GOOGLE_STORAGE_TOKEN: "123abc"
            GOOGLE_APPLICATION_CREDENTIALS: "/path/to/application_default_credentials.json"

Kubernetes secret authentication:

To setup authentication in Kubernetes cluster combine credentials of your cloud providers into a single generic secret of its corresponding namespace.

kubectl create secret generic \
--from-file credentials=$HOME/.aws/credentials \
--from-literal account="123abc" \
--from-literal token="aAabBbcCcdDd111222333" \
--from-literal connection="<REDACTED>" \
--from-literal access_token="123abc" \
--from-file application_default_credentials.json=$HOME/.config/gcloud/application_default_credentials.json \
pqtcreds

AWS Authentication

The Parquet file watcher uses kurl for credential discovery. See credential discovery in the AWS tab for more information.

Environment variable authentication:

To setup authentication using environment variables, set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.

For Docker based configurations, set the variables in the worker image configuration.

docker-compose:

version: "3.3"
services:
    worker:
        image: portal.dl.kx.com/kxi-sp-worker:1.11.0
        environment:
            AWS_ACCESS_KEY_ID: "abcd"
            AWS_SECRET_ACCESS_KEY: "iamasecret"

Docker credentials file for authentication:

To load a custom configuration file into a Docker deployment, mount the credentials file directory into the container and set KXI_SP_CONFIG_PATH to point to the configuration directory.

version: "3.3"
services:
    worker:
        image: portal.dl.kx.com/kxi-sp-worker:1.11.0
        volumes:
            - $HOME/.aws/:/config/pqtcreds
        environment:
            KXI_SP_CONFIG_PATH: "/config"

Next, add the secret name to the Parquet reader configuration

.qsp.read.fromParquet["s3://bucket/hello.parquet"; .qsp.use``credentials!``pqtcreds]

Now deploying the pipeline will read the credentials from the AWS credentials file. Note that the credentials file name must be credentials.

For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name     : "parquet-reader",
        type     : "spec",
        config   : { content: $spec },
        settings : { minWorkers: "1", maxWorkers: "10" },
        env      : { AWS_ACCESS_KEY_ID: "abcd", AWS_SECRET_ACCESS_KEY: "iamasecret" }
    }' | jq -asR .)"

Kubernetes secrets for authentication:

When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.

First, create a secret using an AWS credentials file. Take care to ensure the secret is created in the correct namespace.

kubectl create secret generic --from-file credentials=$HOME/.aws/credentials pqtcreds

Note that the secret must be formatted so that the variable names are lower case and there is a space either side of the equals sign like so:

[default]
aws_access_key_id = abcd1234
aws_secret_access_key = abcd1234

Next, add the secret name to the Parquet reader configuration

.qsp.read.fromParquet["s3://bucket/hello.parquet"; .qsp.use``credentials!``pqtcreds]

Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "parquet-reader",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets : ["pqtcreds"] }
    }' | jq -asR .)"

Azure Authentication

The Parquet Microsoft Azure Storage watcher uses kurl for credential discovery. See credential discovery in the Azure tab for more information. When running on Azure provisioned cloud infrastructure, the credential discovery will automatically use the credentials of the user that launched the instance.

Environment variable authentication:

To setup authentication using environment variables, refer to Configure Azure Storage connection strings and set AZURE_STORAGE_CONNECTION_STRING to download Parquet files from the storage. For Parquet file watcher, set AZURE_STORAGE_ACCOUNT to the name of the storage account to watch for and AZURE_STORAGE_SHARED_KEY to one of the keys for that account. To use shared keys, refer to these instructions

For Docker based configurations, set the variables in the worker image configuration.

docker-compose:

version: "3.3"
services:
  worker:
    image: portal.dl.kx.com/kxi-sp-worker:1.11.0
    environment:
      AZURE_STORAGE_ACCOUNT: "123abc"
      AZURE_STORAGE_SHARED_KEY: "aAabBbcCcdDd111222333"
      AZURE_STORAGE_CONNECTION_STRING: "<REDACTED>"

For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name     : "parquet-reader",
        type     : "spec",
        config   : { content: $spec },
        settings : { minWorkers: "1", maxWorkers: "10" },
        env      : {
            AZURE_STORAGE_ACCOUNT: "123abc",
            AZURE_STORAGE_SHARED_KEY: "aAabBbcCcdDd111222333",
            AZURE_STORAGE_CONNECTION_STRING: "<REDACTED>"
        }
    }' | jq -asR .)"

Kubernetes secrets for authentication:

When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.

First, create a secret using a generic secret. Take care to ensure the secret is created in the correct namespace.

kubectl create secret generic \
  --from-literal account="123abc" \
  --from-literal token="aAabBbcCcdDd111222333" \
  --from-literal connection="<REDACTED>" pqtcreds

Next, add the secret name and the account name to the Parquet Azure Storage Reader configuration.

.qsp.read.fromParquet["ms://bucket/*.parquet"; .qsp.use``credentials`connection`watch!(`;`pqtcreds;"<REDACTED>";1b)]

Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "parquet-reader",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets : ["pqtcreds"] }
    }' | jq -asR .)"

GCS Authentication

The Google Cloud Storage watcher uses kurl for credential discovery. See credential discovery in the GCP tab for more information. When running on Google provisioned cloud infrastructure, the credential discovery will automatically use the credentials of the user that launched the instance.

Environment variable authentication:

To setup authentication using an environment variable, refer to How Application Default Credentials works and set GOOGLE_APPLICATION_CREDENTIALS to download Parquet files from the storage. For Parquet file watcher, set GOOGLE_STORAGE_TOKEN with the output of running gcloud auth print-access-token. To install the Google SDK, refer to these instructions.

By default, Google Cloud API tokens expire after 1 hour

However, you can request a longer expiration time of up to 12 hours by specifying the "expires_in" parameter when you authenticate.

For Docker based configurations, set the variables in the worker image configuration.

docker-compose:

version: "3.3"
services:
  worker:
    image: portal.dl.kx.com/kxi-sp-worker:1.11.0
    environment:
      GOOGLE_STORAGE_TOKEN: "123abc"
      GOOGLE_APPLICATION_CREDENTIALS: "/path/to/application_default_credentials.json"

For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name     : "parquet-reader",
        type     : "spec",
        config   : { content: $spec },
        settings : { minWorkers: "1", maxWorkers: "10" },
        env      : {
            GOOGLE_STORAGE_TOKEN: "123abc",
            GOOGLE_APPLICATION_CREDENTIALS: "/path/to/application_default_credentials.json"
        }
    }' | jq -asR .)"

Kubernetes secrets for authentication:

When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.

First, create a secret using a generic secret. Take care to ensure the secret is created in the correct namespace.

kubectl create secret generic \
  --from-literal access_token=$(gcloud auth print-access-token) \
  --from-file application_default_credentials.json=$HOME/.config/gcloud/application_default_credentials.json \
  pqtcreds

Next, add the secret name to the Google Cloud Reader reader configuration

.qsp.read.fromParquet["gs://bucket/hello.parquet"; .qsp.use``credentials!``pqtcreds]

Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "parquet-reader",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets : ["pqtcreds"] }
    }' | jq -asR .)"

Examples

These examples assume you have Parquet files stored locally or in object storage buckets.

Decode a Parquet file into a kdb+ table:

/ -- Reading to a kdb+ table
input: "/tmp/table.parquet"
.qsp.run
    .qsp.read.fromParquet[input]
    .qsp.write.toVariable[`output]
output
tm                            sym     px    qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX"   101.9 10
2023.06.09D11:00:00.000000000 "HSI"   102.8 20
2023.06.09D12:00:00.000000000 "DIA"   103.7 30
2023.06.09D13:00:00.000000000 "SPY"   104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50

Decode a Parquet file into a mixed list of arrays:

/ -- Reading the Parquet file to a list of arrays
input: "/tmp/table.parquet"
.qsp.run
    .qsp.read.fromParquet[input; .qsp.use``mode!``lists];
    .qsp.write.toVariable[`output]
output
2023.06.09D10:00:00.000000000 "AEX"   101.9 10
2023.06.09D11:00:00.000000000 "HSI"   102.8 20
2023.06.09D12:00:00.000000000 "DIA"   103.7 30
2023.06.09D13:00:00.000000000 "SPY"   104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50

Reading a Parquet file from AWS S3 registry:

input: "s3://bucket/table.parquet"

.qsp.run
    .qsp.read.fromParquet[input; .qsp.use``region!(`;"us-east-1")]
    .qsp.write.toVariable[`output]
output
tm                            sym     px    qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX"   101.9 10
2023.06.09D11:00:00.000000000 "HSI"   102.8 20
2023.06.09D12:00:00.000000000 "DIA"   103.7 30
2023.06.09D13:00:00.000000000 "SPY"   104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50

Parquet file watching from AWS S3 registry:

input: "s3://bucket/retention/interval*.parquet"

.qsp.run
    .qsp.read.fromParquet[input; .qsp.use``region`watch!(`;"us-east-1";1b)]
    .qsp.write.toVariable[`output]
output
tm                            sym     px    qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX"   101.9 10
2023.06.09D11:00:00.000000000 "HSI"   102.8 20
2023.06.09D12:00:00.000000000 "DIA"   103.7 30
2023.06.09D13:00:00.000000000 "SPY"   104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50

If next we upload intervalJul.parquet to the bucket we see:

tm                            sym      px    qty
-----------------------------------------------
2023.07.19D15:00:00.000000000 "FTXO"   106.4 60
2023.07.19D16:00:00.000000000 "OAIE"   107.3 70
2023.07.19D17:00:00.000000000 "RDIV"   108.2 80
2023.07.19D18:00:00.000000000 "KBWB"   109.1 90
2023.07.19D19:00:00.000000000 "BIGB"   110.0 100

Decode a Parquet Microsoft Azure Storage file:

/ -- Reading to a kdb+ table
input: "ms://bucket/table.parquet"

.qsp.run
    .qsp.read.fromParquet[input; .qsp.use``connection!(`;"<REDACTED>")]
    .qsp.write.toVariable[`output]
output
tm                            sym     px    qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX"   101.9 10
2023.06.09D11:00:00.000000000 "HSI"   102.8 20
2023.06.09D12:00:00.000000000 "DIA"   103.7 30
2023.06.09D13:00:00.000000000 "SPY"   104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50

Parquet watching of tabular dataset from Azure:

input: "ms://bucket/retention/year=2023/month=*/interval*.parquet"
metadata: `year`month!`int`short

.qsp.run
    .qsp.read.fromParquet[input; .qsp.use``connection`metadata`watch!(`;"<REDACTED";metadata;1b)]
    .qsp.write.toVariable[`output]
output
tm                            sym     px    qty year month
----------------------------------------------------------
2023.06.09D10:00:00.000000000 "AEX"   101.9 10  2023 6
2023.06.09D11:00:00.000000000 "HSI"   102.8 20  2023 6
2023.06.09D12:00:00.000000000 "DIA"   103.7 30  2023 6
2023.06.09D13:00:00.000000000 "SPY"   104.6 40  2023 6
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50  2023 6

If next we upload intervalJul.parquet to the bucket we see:

tm                            sym    px    qty year month
---------------------------------------------------------
2023.07.19D15:00:00.000000000 "FTXO" 106.4 60  2023 7
2023.07.19D16:00:00.000000000 "OAIE" 107.3 70  2023 7
2023.07.19D17:00:00.000000000 "RDIV" 108.2 80  2023 7
2023.07.19D18:00:00.000000000 "KBWB" 109.1 90  2023 7
2023.07.19D19:00:00.000000000 "BIGB" 110   100 2023 7

Decode a Parquet Google Cloud Storage file:

/ -- Reading to a kdb+ table
input: "gs://bucket/table.parquet"

.qsp.run
    .qsp.read.fromParquet[input; .qsp.use``project!(`;"<REDACTED>")]
    .qsp.write.toVariable[`output]
output
tm                            sym     px    qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX"   101.9 10
2023.06.09D11:00:00.000000000 "HSI"   102.8 20
2023.06.09D12:00:00.000000000 "DIA"   103.7 30
2023.06.09D13:00:00.000000000 "SPY"   104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50

.qsp.read.fromPostgres

Execute a query on a PostgreSQL database

.qsp.read.fromPostgres[query]
.qsp.read.fromPostgres[query; database]
.qsp.read.fromPostgres[query; database; .qsp.use (!) . flip (
    (`server  ; server);
    (`port    ; port);
    (`username; username);
    (`password; password))]

Parameters:

name type description default
query string Query to execute on the Postgres database. Required
database string Name of the database to connect to. $KXI_SP_POSTGRES_DATABASE

options:

name type description default
server string Address of the database to connect to. $KXI_SP_POSTGRES_SERVER
port string Port of database. $KXI_SP_POSTGRES_PORT
username string Username to authenticate with. $KXI_SP_POSTGRES_USERNAME
password string Password to authenticate with. $KXI_SP_POSTGRES_PASSWORD

For all common arguments, refer to configuring operators

This operator executes a query on a PostgreSQL database and pushes the result into the pipeline.

After the query has completed processing, the Postgres reader will signal a 'finish' command which will teardown the pipeline if there are no other pending requests.

Reading from a table 'stocks':

.qsp.run
  .qsp.read.fromPostgres["select * from stocks"]
  .qsp.write.toConsole[]
                             | id  sym       market   name                                                  cap
-----------------------------| ---------------------------------------------------------------------------------------
2021.09.10D18:29:31.124921700| 1   "OXLC"    "NASDAQ" "Oxford Lane Capital Corp."                           "$229.56M"
2021.09.10D18:29:31.124921700| 2   "CIA"     "NYSE"   "Citizens, Inc."                                      "$350.57M"
2021.09.10D18:29:31.124921700| 3   "PEI"     "NYSE"   "Pennsylvania Real Estate Investment Trust"           "$785.28M"
2021.09.10D18:29:31.124921700| 4   "SPWR"    "NASDAQ" "SunPower Corporation"                                "$1.06B"
2021.09.10D18:29:31.124921700| 5   "DVA"     "NYSE"   "DaVita Inc."                                         "$12.88B"
2021.09.10D18:29:31.124921700| 6   "BHACW"   "NASDAQ" "Barington/Hilco Acquisition Corp."                   "n/a"
2021.09.10D18:29:31.124921700| 7   "BCRX"    "NASDAQ" "BioCryst Pharmaceuticals, Inc."                      "$446.33M"
..

.qsp.read.fromStream

Read data using a kdb Insights Stream

.qsp.read.fromStream[table]
.qsp.read.fromStream[table; stream]
.qsp.read.fromStream[table; stream; .qsp.use enlist[`position]!enlist position]
.qsp.read.fromStream[table; stream; .qsp.use enlist[`index]!enlist index]

Parameters:

name type description default
table symbol or string A table name. A table name to filter incoming messages by. Leave blank to receive all messages.
stream symbol or string The inbound stream $RT_SUB_TOPIC

options:

name type description default
position symbol or string The position in the stream to replay from. Options are start or end of the stream. start
index (DEPRECATED) long The integer position in the stream to replay from. If both index and position are specified the position parameter will take priority. 0

For all common arguments, refer to configuring operators

This operator reads data using kdb Insights Reliable Transport Streams.

.qsp.read.fromAmazonS3

Reads an object from AWS S3

.qsp.read.fromAmazonS3[path; region]
.qsp.read.fromAmazonS3[path; region; .qsp.use (!) . flip (
    (`mode       ; mode);
    (`tenant     ; tenant);
    (`chunking   ; chunking);
    (`chunkSize  ; chunkSize);
    (`credentials; credentials);
    (`watch      ; watch))]

Parameters:

name type description default
path symbol or symbol[] or string or string[] The S3 URI of an object or multiple objects to read from S3. Glob patterns are supported. Required
region string The AWS region to authenticate against. Required

options:

name type description default
mode symbol Either binary or text. binary
tenant string The authentication tenant. `
region string The AWS region to authenticate against. us-east-1
chunking symbol Either enabled, disabled or auto. auto
chunkSize long or string The size in bytes of chunks to read when chunking is enabled or the number of bytes to read using a size suffix. Sizes that are specified as strings are always assumed to be bytes. "1MB"
credentials string Name of a credentials secret to mount. See the authentication section below for details. ""
domain string The name of your domain. ""
watch boolean or dictionary Either 0b or 1b or a dictionary with keys method and frequency. The only current method is timer. The default frequency is 0D00:00:05. If watch set to 1b use the watch defaults. 0b

For all common arguments, refer to configuring operators

This operator reads an object from an Amazon Web Services S3 bucket as either text or binary.

File mode

Setting the file mode to binary will read content as a byte vector. When reading a file as text, data will be read as strings and be split on newlines. Each string vector represents a single line.

Partitioning

The pipeline will automatically scale the number of workers to the minimum of the number of file paths and the maxWorkers in the pipeline settings. The workers will read a disjoint union of the paths. You can disable partitioning by setting maxWorkers to 1. Note that partitioning is not currently possible when there is more than one reader.

File watching

Setting the file watch to 1b will enable file watching with default settings. Usually this will be paired with a glob pattern for the file path or list of glob patterns. The reader will continually watch for new files matching the file path(s) provided using the watch method. Partitioning is not possible when using file watching. Note that by using watching the pipeline will continue watching until there is manual intervention to finish the pipeline. Also note, at present the file watcher will do nothing other than print a warning if a file is modified. In particular for files that are appended to, the new data will not be processed.

File watching should not to be used with the directWrite option in .qsp.write.toDatabase

You should not set file watch to 1b when using the directWrite in .qsp.write.toDatabase since directWrite relies on a definitive finish point which is not guaranteed when using the file watcher.

File chunking

Chunking a file splits the file into smaller batches and streams the batches through the pipeline. If chunking is set to auto, the reader will determine the size of the target file and if it is sufficiently large (more than a few megabytes) it will be read in chunks.

Operator can only read from one domain and tenant

When reading in multiple files there must only be one domain and tenant used to access all file paths passed into the operator.

Required Permissions

.qsp.read.fromAmazonS3 requires the ability to list and retrieve objects on the defined S3 bucket and as such requires the credentials provided by the user to have access to the s3:ListBucket, s3:GetObject and s3:ListObjects ACLs, see here for more information.

Authentication

The S3 reader uses kurl for credential discovery. See credential discovery in the AWS tab for more information.

Environment variable authentication:

To setup authentication using environment variables, set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.

For Docker based configurations, set the variables in the worker image configuration.

docker-compose:

version: "3.3"
services:
  worker:
    image: portal.dl.kx.com/kxi-sp-worker:1.11.0
    environment:
      AWS_ACCESS_KEY_ID: "abcd"
      AWS_SECRET_ACCESS_KEY: "iamasecret"

Docker credentials file for authentication:

To load a custom configuration file into a Docker deployment, mount the credentials file directory into the container and set KXI_SP_CONFIG_PATH to point to the configuration directory.

version: "3.3"
services:
  worker:
    image: portal.dl.kx.com/kxi-sp-worker:1.11.0
    volumes:
      - $HOME/.aws/:/config/awscreds
    environment:
      KXI_SP_CONFIG_PATH: "/config"

Next, add the secret name to the S3 reader configuration

.qsp.read.fromAmazonS3[`:s3://bucket/hello; "us-east-1"; .qsp.use``credentials!``awscreds]

Now deploying the pipeline will read the credentials from the AWS credentials file. Note that the credentials file name must be credentials.

For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name     : "s3-reader",
        type     : "spec",
        config   : { content: $spec },
        settings : { minWorkers: "1", maxWorkers: "10" },
        env      : { AWS_ACCESS_KEY_ID: "abcd", AWS_SECRET_ACCESS_KEY: "iamasecret" }
    }' | jq -asR .)"

Kubernetes secrets for authentication:

When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.

First, create a secret using an AWS credentials file. Take care to ensure the secret is created in the correct namespace.

kubectl create secret generic --from-file credentials=$HOME/.aws/credentials awscreds

Note that the secret must be formatted so that the variable names are lower case and there is a space either side of the equals sign like so:

[default]
aws_access_key_id = abcd1234
aws_secret_access_key = abcd1234

Next, add the secret name to the S3 reader configuration

.qsp.read.fromAmazonS3[`:s3://bucket/hello; "us-east-1"; .qsp.use``credentials!``awscreds]

Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "s3-reader",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets : ["awscreds"] }
    }' | jq -asR .)"

Examples

Processing an AWS S3 object:

.qsp.run
  .qsp.read.fromAmazonS3[`:s3://bucket/numbers.txt; "us-east-1"]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..

Processing multiple AWS S3 objects:

.qsp.run
  .qsp.read.fromAmazonS3[(`:s3://bucket/numbersOne.txt;`:s3://bucket/numbersTwo.txt); "us-east-1";]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
2021.09.10D10:28:37.177672179 | 435
2021.09.10D10:28:37.177672179 | 365
2021.09.10D10:28:37.177672179 | 452
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 42
..

Processing multiple AWS S3 objects using glob pattern matching:

.qsp.run
  .qsp.read.fromAmazonS3["s3://bucket/numbers*.txt"; "us-east-1";]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
2021.09.10D10:28:37.177672179 | 435
2021.09.10D10:28:37.177672179 | 365
2021.09.10D10:28:37.177672179 | 452
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 42
..

File Watching:

.qsp.run
  .qsp.read.fromAmazonS3["s3://bucket/numbers*.txt"; "us-east-1"; .qsp.use ``watch!(`;1b)]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2023.06.15D10:49:19.795351384 | 556
2023.06.15D10:49:19.795351384 | 465
2023.06.15D10:49:19.795351384 | 63
2023.06.15D10:49:19.795351384 | 106
2023.06.15D10:49:19.795351384 | 46
2023.06.15D10:49:20.352682229 | 435
2023.06.15D10:49:20.352682229 | 365
2023.06.15D10:49:20.352682229 | 452
2023.06.15D10:49:20.352682229 | 44
2023.06.15D10:49:20.352682229 | 42

If next we upload numbersThree.txt to the bucket we see:

2023.06.15D10:49:42.323727244 | 908
2023.06.15D10:49:42.323727244 | 360
2023.06.15D10:49:42.323727244 | 522
2023.06.15D10:49:42.323727244 | 257
2023.06.15D10:49:42.323727244 | 858

Reading a text object by line:

.qsp.run
  .qsp.read.fromAmazonS3[`:s3://bucket/numbers.txt; "us-east-1"; .qsp.use ``mode!``text]
  .qsp.map["J"$]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..

Reading multiple text objects by line:

.qsp.run
  .qsp.read.fromAmazonS3[(`:s3://bucket/numbersOne.txt;`:s3://bucket/numbersTwo.txt); "us-east-1";
     .qsp.use ``mode!``text]
  .qsp.map["J"$]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..
2021.09.10D10:28:37.177672179 | 435
2021.09.10D10:28:37.177672179 | 365
2021.09.10D10:28:37.177672179 | 452
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 42
..

Reading with a custom region and tenant:

.qsp.run
  .qsp.read.fromAmazonS3[`:s3://bucket/object; "us-west-1" ;(enlist `tenant)!enlist "custom"]
  .qsp.write.toVariable[`output]

.qsp.read.fromSQLServer

Execute a query on an SQL Server database

.qsp.read.fromSQLServer[query]
.qsp.read.fromSQLServer[query; database]
.qsp.read.fromSQLServer[query; database; .qsp.use (!) . flip (
    (`server  ; server);
    (`port    ; port);
    (`username; username);
    (`password; password))]

Parameters:

name type description default
query string Query to execute on the SQL Server database. Required
database string Name of the database to connect to. $KXI_SP_SQLSERVER_DATABASE

options:

name type description default
server string Address of the database. $KXI_SP_SQLSERVER_SERVER
port string Port of database. $KXI_SP_SQLSERVER_PORT
username string Username to authenticate with. $KXI_SP_SQLSERVER_USERNAME
password string Password to authenticate with. $KXI_SP_SQLSERVER_PASSWORD

For all common arguments, refer to configuring operators

This operator executes a query on a Microsoft SQL Server database and pushes the result into the pipeline.

After the query has completed processing, the SQL Server reader will signal a 'finish' command which will teardown the pipeline if there are no other pending requests.

Reading from a table 'stocks':

.qsp.run
  .qsp.read.fromSQLServer["select * from stocks"]
  .qsp.write.toConsole[]
                             | id  sym       market   name                                                  cap
-----------------------------| ---------------------------------------------------------------------------------------
2021.09.10D18:29:31.124921700| 1   "OXLC"    "NASDAQ" "Oxford Lane Capital Corp."                           "$229.56M"
2021.09.10D18:29:31.124921700| 2   "CIA"     "NYSE"   "Citizens, Inc."                                      "$350.57M"
2021.09.10D18:29:31.124921700| 3   "PEI"     "NYSE"   "Pennsylvania Real Estate Investment Trust"           "$785.28M"
2021.09.10D18:29:31.124921700| 4   "SPWR"    "NASDAQ" "SunPower Corporation"                                "$1.06B"
2021.09.10D18:29:31.124921700| 5   "DVA"     "NYSE"   "DaVita Inc."                                         "$12.88B"
2021.09.10D18:29:31.124921700| 6   "BHACW"   "NASDAQ" "Barington/Hilco Acquisition Corp."                   "n/a"
2021.09.10D18:29:31.124921700| 7   "BCRX"    "NASDAQ" "BioCryst Pharmaceuticals, Inc."                      "$446.33M"
..

.qsp.read.fromUpload

Reads data supplied through an HTTP endpoint

.qsp.read.fromUpload[uploadName]

Parameters:

name type description default
uploadName symbol or string Unique name for this fromUpload operator. This must be supplied in the path of HTTP upload requests. The request path will be of the form: https://insights.kx.com/streamprocessor/upload/{uploadName} . The uploadName is passed through in the operator metadata. Required

Fully Deterministic

The upload reader is fully deterministic by default. It will use event journaling to record the raw data it is provided. In a recovery scenario it will replay the journal (from the latest checkpoint) and return to the state it was in prior to crashing. Since all read events are journalled it will require enough disk space to write that data. To turn this feature off set the environment variable KXI_SP_EVENT_JOURNAL to false.

Limits and HTTP chunking

The upload limit is set to 10MB. The reader does not currently support HTTP chunking, therefore the user is responsible for manually chunking their files and making multiple upload requests if they wish to upload larger files.

No High Availability

Use of this node in a replicated pipeline is not supported and will fail. This will be fixed in a future release.

kdb Insights Enterprise only

The .qsp.read.fromUpload reader operates only within kdb Insights Enterprise deployments of the Stream Processor at this time. This functionality does not operate within the kdb Insights Stream Processor Microservice.

Optional HTTP parameters

The optional HTTP parameters are table and finish. finish is used to finish the operator and subsequently the pipeline. This can be useful if we pair the upload reader with a direct write. The table parameter is unused by the upload operator, however if provided it will be passed through in the metadata and could be used by a node downstream e.g. a map node.

The template of the HTTP request to upload data is: (See the Open API Spec for full details).

curl -X POST https://insights.kx.com/streamprocessor/upload/{uploadName}?table=z&finish=true \
   --header "Content-Type: application/octet-stream" \
   --header "Authorization: Bearer $INSIGHTS_TOKEN" \
   --data-binary "@my/file.csv"

The @ symbol in "@my/file.csv" is required to signify the string resolves to a file rather than a literal.

Example:

Start pipeline using the fromUpload reader.

.qsp.run
  .qsp.read.fromUpload["myUniqueName"]
  .qsp.decode.csv[.qsp.use``header!``always]
  .qsp.write.toConsole[]

Send HTTP request to the coordinator to upload your file data.

>> head quotes.csv
time,ticker,bid,bidSize,ask,askSize
2023-01-09D06:03:54.726000000,AAPL,58.79616,890,5.538282,100
2023-01-09D09:05:49.815000000,AAPL,12.70558,790,47.71454,990
2023-01-10D09:24:01.982000000,AAPL,4.636787,620,50.61855,480
2023-01-10D03:58:45.586000000,MSFT,81.40808,970,23.05427,500
2023-01-10D03:16:20.405000000,FB,44.11634,560,74.11803,330
2023-01-08D09:43:50.155000000,KX,78.12387,650,26.48796,40
2023-01-09D10:55:05.862000000,AAPL,71.60091,840,66.77192,130
2023-01-09D09:03:34.867000000,KX,37.10555,130,69.64077,940
2023-01-08D07:59:00.020000000,TSLA,67.64414,500,43.57184,160
>> curl -X POST https://insights.kx.com/streamprocessor/upload/myUniqueName \
      --header "Authorization: Bearer $INSIGHTS_TOKEN"
      --data-binary "@quotes.csv"

Worker logs:

                             | time                            ticker bid           bidSize ask           askSize
-----------------------------| ----------------------------------------------------------------------------------
2023.07.20D15:37:31.641574003| "2023-01-09D06:03:54.726000000" "AAPL" "58.79616"    "890"   "5.538282"    "100"
2023.07.20D15:37:31.641574003| "2023-01-09D09:05:49.815000000" "AAPL" "12.70558"    "790"   "47.71454"    "990"
2023.07.20D15:37:31.641574003| "2023-01-10D09:24:01.982000000" "AAPL" "4.636787"    "620"   "50.61855"    "480"
2023.07.20D15:37:31.641574003| "2023-01-10D03:58:45.586000000" "MSFT" "81.40808"    "970"   "23.05427"    "500"
2023.07.20D15:37:31.641574003| "2023-01-10D03:16:20.405000000" "FB"   "44.11634"    "560"   "74.11803"    "330"
2023.07.20D15:37:31.641574003| "2023-01-08D09:43:50.155000000" "KX"   "78.12387"    "650"   "26.48796"    "40"
2023.07.20D15:37:31.641574003| "2023-01-09D10:55:05.862000000" "AAPL" "71.60091"    "840"   "66.77192"    "130"
2023.07.20D15:37:31.641574003| "2023-01-09D09:03:34.867000000" "KX"   "37.10555"    "130"   "69.64077"    "940"
2023.07.20D15:37:31.641574003| "2023-01-08D07:59:00.020000000" "TSLA" "67.64414"    "500"   "43.57184"    "160"

Use the finish parameter to finish the reader after data has been processed. If not provided it defaults to false.

>> curl -X POST https://insights.kx.com/streamprocessor/upload/myUniqueName?finish=true \
      --header "Authorization: Bearer $INSIGHTS_TOKEN"
      --data-binary "@quotes.csv"

Additionally we can finish the operator without sending any data.

>> curl -X POST https://insights.kx.com/streamprocessor/myUniqueName?finish=true \
      --header "Authorization: Bearer $INSIGHTS_TOKEN"

Note that the only Content-Type the HTTP requests can use are application/octet-stream (the default). Attempts to use any other Content-Type in the request will fail. If for example you wish to upload a JSON file upload use content type application/octet-stream (as opposed to application/json) and use a JSON decoder in the pipeline.