Readers
.qsp.read fromAmazonS3 reads data from Amazon Web Services S3 buckets fromAzureStorage reads data from Microsoft Azure Blob Storage fromCallback define callback in the q global namespace fromDatabase reads data from an Insights database fromExpr evaluate expression or function into the pipeline fromFile read file contents into pipeline fromGoogleStorage reads data from Amazon Web Services S3 buckets fromHTTP requests data from an HTTP(S) endpoint fromKafka consume data from a Kafka topic fromMQTT subscribe to an MQTT topic fromParquet produce table data from local or S3/Azure/GCS stored Parquet files fromPostgres execute a query against a PostgreSQL database fromSQLServer execute a query against a SQL Server database fromStream read data using a kdb Insights stream fromUpload read data supplied through an HTTP endpoint
Readers are a specialized type of operator that allow users to feed data from different external data sources into a streaming pipeline.
Each reader has its own start
, stop
, setup
and teardown
functions to handle different streaming lifecycle events.
Readers are assumed to be asynchronous and push data into a pipeline using .qsp.push
.
Some readers, such as .qsp.read.fromCallback
and .qsp.read.fromKafka
, have implemented a stream-partitioning interface. When using these readers, the partitions are distributed over multiple Workers, orchestrated by the Controller.
.qsp.read.fromCallback
Receives data from a callback from the local process or over IPC
.qsp.read.fromCallback[callback]
.qsp.read.fromCallback[callback; .qsp.use (!) . flip (
(`partitions; partitions);
(`key ; key);
(`replay ; replay))]
Parameters:
name | type | description | default |
---|---|---|---|
callback | symbol | The name of the function to define. | Required |
options:
name | type | description | default |
---|---|---|---|
partitions | vector | A list of partition identifiers to distribute over available Workers. | () |
key | symbol or generic null (::) | Name of the field which contains the key of the published event, or generic null for unkeyed data. Only works for dictionary events. | (::) |
replay | boolean | (Beta feature) If true, message replay is enabled for the Callback reader. On recovery, messages that arrived after the last checkpoint will be pushed to the pipeline. | 0b |
For all common arguments, refer to configuring operators
Beta Features
To enable beta features, set the environment variable KXI_SP_BETA_FEATURES
to true
.
This operator defines callback
as a function in the q global namespace, causing data passed
to the function (locally or over IPC) to enter the pipeline data flow.
If using replay, you must set $KXI_SP_EVENT_JOURNAL
to "true", and set $KXI_SP_JOURNAL_DIR
to
the directory where you'd like the event journals to be stored. Event journals are files
containing the messages published to the callback reader. Messages are replayed from the journals
on recovery. If $KXI_SP_JOURNAL_DIR
is not set, it defaults to being
$KXI_SP_CHECKPOINT_DIR/journals
.
Note that enabling replay adversely affects performance, and is disabled by default for this reason.
Single worker callback:
// This reader creates a monadic callback function that can be executed to push
// data into a pipeline. When the function `publish` is executed on an SP
// Worker with input data, that data is forwarded to subsequent operators in
// the pipeline.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toConsole[]
// Since the callback is defined in the global scope, data can be passed
// directly within the same program. This could be used to define `upd`
// handlers in kdb+ tick, or to wrap other data-acquisition methods such as `.Q.fs`.
publish ([] val: 100?10f ) // Call directly from the same process
upd: enlist[`trade]!enlist publish // Define kdb+ tick upd handler
.Q.fs[publish] `:myfile.json // Stream the file 'myfile.json' of
// '\n'-delimited JSON records
// through the pipeline
Multi-worker callback:
// Creates a partitioned callback reader that can be distributed over multiple
// SP Workers. This is creating a callback for each partition called `publish`
// and assign one of 10 partitions to that callback.
.qsp.run
.qsp.read.fromCallback[.qsp.use `callback`partitions!(`publish; til 10)]
.qsp.write.toConsole[]
Callback with message replay:
`KXI_SP_JOURNAL_DIR setenv "/tmp/journals";
.qsp.run
.qsp.read.fromCallback[`publish; .qsp.use ``replay!(::;1b)]
.qsp.write.toConsole[];
publish 0
publish 1
2023.02.08D17:18:21.593351490 | 0
2023.02.08D17:18:22.106658246 | 1
2023-02-08 17:19:41.285 [] INFO SP Finished setup
2023-02-08 17:19:41.285 [] INFO SP Starting readers...
2023-02-08 17:19:41.285 [] INFO SPREAD Performing Callback reader replay, id=callback_publish
2023.02.08D17:19:41.285481189 | 0
2023.02.08D17:19:41.285516250 | 1
2023-02-08 17:19:41.285 [] INFO SP Readers started
.qsp.read.fromDatabase
Reads data from a kdb Insights Database
.qsp.read.fromDatabase[query]
.qsp.read.fromDatabase[query; .qsp.use (!) . flip (
(`table ; table);
(`startTS ; startTS);
(`endTS ; endTS);
(`filter ; filter);
(`groupBy ; groupBy);
(`agg ; agg);
(`fill ; fill);
(`temporality ; temporality);
(`sortCols ; sortCols);
(`labels ; labels);
(`retries ; retries);
(`retryWait ; retryWait))]
Parameters:
name | type | description | default |
---|---|---|---|
query | string | SQL query to execute on the database. | Required |
options:
name | type | description | default |
---|---|---|---|
table | symbol | Name of table to retrieve data from. | ` |
startTS | timestamp | Inclusive start time of period of interest. | -0Wp |
endTS | timestamp | Exclusive end time of period of interest. | 0Wp |
filter | list | List of triadic lists of the form (function;column name;parameter). | () |
groupBy | symbol[] | List of columns to group aggregation result by. | () |
agg | symbol[] | List of triples of aggregations or columns to select. e.g. Aggregation dict example:(`c1`avg`price;`c2`sum`size) , basic select example `price`size . |
() |
fill | symbol | How to handle nulls in the data. Supported values are zero and forward . The zero option fills numeric types with zeroes. The forward option fills nulls with previous, non-null entry. |
`zero |
temporality | symbol | Sets the range of data in view for each day within the query. Supports two types of temporality: snapshot which takes a continuous range of the data, and slice which grabs data between the time values of startTS and endTS parameters, for each date within the start and end timestamp. |
`snapshot |
sortCols | symbol[] | Columns to sort result data on (ascending). | () |
labels | dict | Map of label selectors for specifying Assemblies. | ()!() |
retries | long | Number of times to retry the connection to the database. | 60 |
retryWait | timespan | Time to wait between database connection attempts. | 0D00:00:03 |
For all common arguments, refer to configuring operators
This operator issues a query either using SQL or a functional 'get data' query. If a single
string parameter is provided, a SQL query is issued. Otherwise, the configuration dictionary is
used, and table
, startTS
and endTS
are considered required fields.
Database Host
In a Kubernetes deployment, the database reader will use the deployment release name to
locate the query service gateway. When using a Docker configuration or to override the
host address of the database, set $KXI_SP_DATABASE_HOST
. The hostname should be the
full URL of the Service Gateway including the port to connect to.
SQL query:
.qsp.run
.qsp.read.fromDatabase["SELECT * FROM stocks"]
.qsp.write.toConsole[]
| date id sym market name cap
-----------------------------| ---------------------------------------------------------------------------------------------------
2021.09.10D18:29:31.124921700| 2021.09.10 1 "OXLC" "NASDAQ" "Oxford Lane Capital Corp." "$229.56M"
2021.09.10D18:29:31.124921700| 2021.09.10 2 "CIA" "NYSE" "Citizens, Inc." "$350.57M"
2021.09.10D18:29:31.124921700| 2021.09.10 3 "PEI" "NYSE" "Pennsylvania Real Estate Investment Trust" "$785.28M"
2021.09.10D18:29:31.124921700| 2021.09.10 4 "SPWR" "NASDAQ" "SunPower Corporation" "$1.06B"
2021.09.10D18:29:31.124921700| 2021.09.10 5 "DVA" "NYSE" "DaVita Inc." "$12.88B"
2021.09.10D18:29:31.124921700| 2021.09.10 6 "BHACW" "NASDAQ" "Barington/Hilco Acquisition Corp." "n/a"
2021.09.10D18:29:31.124921700| 2021.09.10 7 "BCRX" "NASDAQ" "BioCryst Pharmaceuticals, Inc." "$446.33M"
..
Get data API:
.qsp.run
.qsp.read.fromDatabase[.qsp.use `table`startTS`endTS!(`stocks; .z.p-1D; .z.p)]
.qsp.write.toConsole[]
| date id sym market name cap
-----------------------------| ---------------------------------------------------------------------------------------------------
2021.09.10D18:29:31.124921700| 2021.09.10 1 "OXLC" "NASDAQ" "Oxford Lane Capital Corp." "$229.56M"
2021.09.10D18:29:31.124921700| 2021.09.10 2 "CIA" "NYSE" "Citizens, Inc." "$350.57M"
2021.09.10D18:29:31.124921700| 2021.09.10 3 "PEI" "NYSE" "Pennsylvania Real Estate Investment Trust" "$785.28M"
2021.09.10D18:29:31.124921700| 2021.09.10 4 "SPWR" "NASDAQ" "SunPower Corporation" "$1.06B"
2021.09.10D18:29:31.124921700| 2021.09.10 5 "DVA" "NYSE" "DaVita Inc." "$12.88B"
2021.09.10D18:29:31.124921700| 2021.09.10 6 "BHACW" "NASDAQ" "Barington/Hilco Acquisition Corp." "n/a"
2021.09.10D18:29:31.124921700| 2021.09.10 7 "BCRX" "NASDAQ" "BioCryst Pharmaceuticals, Inc." "$446.33M"
..
Enrich data with stream
stocks: .qsp.read.fromDatabase["select * from stocks"];
// Reads from a stream and filters on a table called 'trades', windows
// the data into 5 second intervals, then enriches the data with a join
// on time. Finally the data is written to a table called 'avgPrice'.
.qsp.run
.qsp.read.fromStream[`trades]
.qsp.window.tumbling[0D00:00:05; `time]
.qsp.merge[stocks; { x lj `sym xkey y }] // Join stocks to trades on sym
.qsp.map[{ select avg price by sym from x }]
.qsp.write.toStream[`avgPrice]
toConsole
writer might be as follows.
| sym price
-----------------------------| -------------
2022.02.23D17:38:31.256064000| AAPL 360.1039
2022.02.23D17:38:31.256064000| GOOG 658.463
2022.02.23D17:38:31.256064000| HBP 737.0521
2022.02.23D17:38:31.256064000| NFLX 289.6375
2022.02.23D17:38:31.256064000| PEA 191.2175
..
.qsp.read.fromExpr
Evaluate expression or function into the pipeline
.qsp.read.fromExpr[expr]
Parameters:
name | type | description | default |
---|---|---|---|
expr | string or function | A q expression, or in q only, a nullary synchronous function. | Required |
For all common arguments, refer to configuring operators
This operator evaluates the expression or function, and incorporates the resulting data into the pipeline.
Enter the table refData
into the pipeline:
refData: ([] sensorID: 100?`8; sensorCode: 100?100)
.qsp.run
.qsp.read.fromExpr["refData"]
.qsp.write.toConsole[]
Query a separate process for data:
getSensor: { `:dap:4000 "select from trace where date = .z.d, sensorID = `id" }
.qsp.run
.qsp.read.fromExpr[getSensor]
.qsp.write.toConsole[]
.qsp.read.fromFile
Read file contents into pipeline
.qsp.read.fromFile[path]
.qsp.read.fromFile[path; .qsp.use (!) . flip (
(`mode ; mode);
(`chunking ; chunking);
(`chunkSize; chunkSize);
(`watch ; watch))]
Parameters:
name | type | description | default |
---|---|---|---|
path | symbol or symbol[] or string or string[] | A filepath or list of file paths. Glob patterns are supported. | Required |
options:
name | type | description | default |
---|---|---|---|
mode | symbol | Either binary or text . |
binary |
chunking | symbol | Either enabled , disabled or auto . |
auto |
chunkSize | long or string | The size in bytes of chunks to read when chunking is enabled or the number of bytes to read using a size suffix. Sizes that are specified as strings are always assumed to be bytes. | "1MB" |
watch | boolean or dictionary | Either 0b or 1b or a dictionary with keys method and frequency . The only current method is timer . The default frequency is 0D00:00:05 . If watch set to 1b use the watch defaults. |
0b |
For all common arguments, refer to configuring operators
This operator reads one or more files, pushing their contents (bytes or characters) to the pipeline.
File mode
Setting the file mode to binary
will read content as a byte vector. When reading
file(s) as text
, data will be read as strings and be split on newlines. Each string
represents a single line.
File Chunking
Chunking a file splits the file into smaller batches and streams the batches through
the pipeline. If chunking
is set to auto
, the reader will determine the size of
the target file and if it is sufficiently large (more than a few megabytes) it will
be read in chunks.
File watching
Setting the file watch to 1b
will enable file watching with default settings. Usually
this will be paired with a glob pattern for the file path or list of glob patterns.
The reader will continually watch for new files matching the file path(s) provided using
the watch method. Partitioning is not possible when using file watching. Note that by using
watching the pipeline will continue watching until there is manual intervention to finish
the pipeline. Also note, at present the file watcher will do nothing other than print a
warning if a file is modified. In particular for files that are appended to, the new data
will not be processed.
File watching should not to be used with the directWrite
option in .qsp.write.toDatabase
You should not set file watch to 1b
when using the directWrite
in .qsp.write.toDatabase
since directWrite
relies on a definitive finish point which is not guaranteed when using the file watcher.
Progress API
Used to subscribe to events relating to the progress of file reading. There are 4 possible
event types described below. We use .qsp.subscribe
to subscribe
to an eventType
with a handler
- a unary function taking an event
as argument.
file events:
event type | description | data dictionary keys |
---|---|---|
file.found | file path(s) have been found | paths |
file.start | started to process path | path, size |
file.progress | update on progress processing path | path, totalBytes, bytesRead |
file.end | finished reading path | path, size |
data values
name | q type | description |
---|---|---|
paths | string[] | list of file paths to be read |
path | string | file path |
size | long | size of file |
totalBytes | long | total bytes to be read |
bytesRead | long | bytes read so far |
Reading a file:
`:numbers.txt 0: string 100?1000;
.qsp.run
.qsp.read.fromFile["numbers.txt"]
.qsp.map[{"J"$"\n" vs x}]
.qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
Reading multiple files:
`:numbersOne.txt 0: string til 5;
`:numbersTwo.txt 0: string 5 + til 5;
.qsp.run
.qsp.read.fromFile[("numbersOne.txt";"numbersTwo.txt"); .qsp.use``mode!``text]
.qsp.map[{[md; data] (md; "c"$data)}; .qsp.use ``params!(::; `metadata`data)]
.qsp.write.toConsole[]
2022.10.19D16:57:37.385577001 | `offset`key!(10;`numbersOne.txt)
2022.10.19D16:57:37.385577001 | (,"0";,"1";,"2";,"3";,"4")
2022.10.19D16:57:37.393146238 | `offset`key!(10;`numbersTwo.txt)
2022.10.19D16:57:37.393146238 | (,"5";,"6";,"7";,"8";,"9")
Reading text data by line:
`:numbers.txt 0: string 100?1000;
.qsp.run
.qsp.read.fromFile["numbers.txt"; ``mode!``text]
.qsp.map["J"$]
.qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..
Reading binary data:
`:numbers.bin 1: 100?0x0;
.qsp.run
.qsp.read.fromFile["numbers.bin"]
.qsp.write.toVariable[`output]
output
0xebf68daee782ca3f0156512c444a35e1dfe4bc471594..
File Watching:
`:/tmp/numbers.txt 0: string til 5;
.qsp.run
.qsp.read.fromFile["/tmp/number*.txt"; .qsp.use `mode`watch!(`text;1b)]
.qsp.write.toConsole[]
2024.08.19D16:42:59.888120571 | ,"0"
2024.08.19D16:42:59.888120571 | ,"1"
2024.08.19D16:42:59.888120571 | ,"2"
2024.08.19D16:42:59.888120571 | ,"3"
2024.08.19D16:42:59.888120571 | ,"4"
`:/tmp/numberOnes.txt 0: string 1 1 1;
2024.08.19D16:45:05.122681552 | ,"1"
2024.08.19D16:45:05.122681552 | ,"1"
2024.08.19D16:45:05.122681552 | ,"1"
Subscribing to file events:
.file.events: ([] eventType: `symbol$(); eventTime: `timestamp$(); origin: `symbol$(); data:());
.qsp.subscribe[ ;{`.file.events upsert value x}] each `file.found`file.start`file.progress`file.end;
`:/tmp/numbers.txt 0: string til 5;
.qsp.run
.qsp.read.fromFile[`:/tmp/numbers.txt]
.qsp.write.toVariable[`test.output];
q).file.events
eventType eventTime origin data
-------------------------------------------------------------------------------------------------------------------------
file.found 2022.11.18D09:41:14.111578506 file_:/tmp/numbers_txt (,`paths)!,,":/tmp/numbers.txt"
file.start 2022.11.18D09:41:14.111668936 file_:/tmp/numbers_txt `path`size!(":/tmp/numbers.txt";10)
file.progress 2022.11.18D09:41:14.111703746 file_:/tmp/numbers_txt `path`totalBytes`bytesRead!(":/tmp/numbers.txt";10;10)
file.end 2022.11.18D09:41:14.111710593 file_:/tmp/numbers_txt `path`size!(":/tmp/numbers.txt";10)
.qsp.read.fromGoogleStorage
Reads an object from Google Cloud Storage
.qsp.read.fromGoogleStorage[path]
.qsp.read.fromGoogleStorage[path; .qsp.use (!) . flip (
(`project ; project);
(`mode ; mode);
(`tenant ; tenant);
(`chunking ; chunking);
(`chunkSize ; chunkSize);
(`credentials; credentials);
(`watch ; watch))]
Parameters:
name | type | description | default |
---|---|---|---|
path | string or string[] or symbol or symbol[] | The Cloud Storage URI of an object or multiple objects to read from Google Cloud Storage. Glob patterns are supported. | Required |
options:
name | type | description | default |
---|---|---|---|
project | string | The Google Cloud Storage project ID. | "" |
mode | symbol | Either binary or text . |
binary |
tenant | string | The authentication tenant. | "" |
chunking | symbol | Either enabled , disabled or auto . |
auto |
chunkSize | long or string | The size in bytes of chunks to read when chunking is enabled or the number of bytes to read using a size suffix. Sizes that are specified as strings are always assumed to be bytes. | "1MB" |
credentials | string | Name of a credentials secret to mount. See the authentication section below for details. | "" |
watch | boolean or dictionary | Either 0b or 1b or a dictionary with keys method and frequency . The only current method is timer . The default frequency is 0D00:00:05 . If watch set to 1b use the watch defaults. |
0b |
For all common arguments, refer to configuring operators
This operator reads an object from a Google Cloud Storage bucket.
Object mode
Setting the object mode to binary
will read content as a byte vector. When reading
an object as text
, data will be read as strings and be split on newlines. Each string
vector represents a single line.
Partitioning
The pipeline will automatically scale the number of workers to the minimum of the number
of file paths and the maxWorkers
in the pipeline settings. The workers will read a
disjoint union of the paths. You can disable partitioning by setting maxWorkers
to 1. Note that partitioning is not currently possible when there is more than one reader.
File watching
Setting the file watch to 1b
will enable file watching with default settings. Usually
this will be paired with a glob pattern for the file path or list of glob patterns.
The reader will continually watch for new files matching the file path(s) provided using
the watch method. Partitioning is not possible when using file watching. Note that by using
watching the pipeline will continue watching until there is manual intervention to finish
the pipeline. Also note, at present the file watcher will do nothing other than print a
warning if a file is modified. In particular for files that are appended to, the new data
will not be processed.
File watching should not to be used with the directWrite
option in .qsp.write.toDatabase
You should not set file watch to 1b
when using the directWrite
in .qsp.write.toDatabase
since directWrite
relies on a definitive finish point which is not guaranteed when using the file watcher.
Object chunking
Chunking an object splits the object into smaller batches and streams the batches through
the pipeline. If chunking
is set to auto
, the reader will determine the size of
the target object and if it is sufficiently large (more than a few megabytes) it will
be read in chunks.
Operator can only read from one tenant
When reading in multiple files there must only be one tenant used to access all file paths passed into the operator.
Required Permissions
.qsp.read.fromGoogleStorage
requires the ability to list and retrieve objects on the defined Google
bucket and as such requires the credentials provided by the user to have access to the
storage.objects.list
, storage.objects.get
and storage.buckets.get
IAM permission,
see here
for more information.
Authentication
The Google Cloud Storage reader uses kurl
for credential discovery.
See credential discovery
in the GCP
tab for more information. When running on Google provisioned cloud infrastructure,
the credential discovery will automatically use the credentials of the user that launched the
instance.
Environment variable authentication:
To setup authentication using an environment variable, set GOOGLE_STORAGE_TOKEN
with the output of
running gcloud auth print-access-token
. To install the Google SDK, refer to
these instructions.
For Docker based configurations, set the variables in the worker image configuration.
docker-compose
:
version: "3.3"
services:
worker:
image: portal.dl.kx.com/kxi-sp-worker:1.11.0
environment:
GOOGLE_STORAGE_TOKEN: "123abc"
For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "gs-reader",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
env : { GOOGLE_STORAGE_TOKEN: "123abc" }
}' | jq -asR .)"
Kubernetes secrets for authentication:
When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.
First, create a secret using a generic secret. Take care to ensure the secret is created in the correct namespace.
kubectl create secret generic --from-literal token=$(gcloud auth print-access-token) gscreds
Next, add the secret name to the Google Cloud Reader reader configuration
.qsp.read.fromGoogleStorage[`:gs://bucket/hello; .qsp.use``credentials!``gscreds]
Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "gs-reader",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
kubeConfig : { secrets : ["gscreds"] }
}' | jq -asR .)"
Examples
Processing an Google Cloud Storage object:
.qsp.run
.qsp.read.fromGoogleStorage[`:gs://bucket/numbers.txt]
.qsp.map[{"J"$"\n" vs x}]
.qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
Processing multiple Google Cloud Storage objects:
.qsp.run
.qsp.read.fromGoogleStorage[(`:gs://bucket/numbersOne.txt;`:gs://bucket/numbersTwo.txt)]
.qsp.map[{"J"$"\n" vs x}]
.qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
2021.09.10D10:28:37.177672179 | 80
2021.09.10D10:28:37.177672179 | 91
2021.09.10D10:28:37.177672179 | 642
2021.09.10D10:28:37.177672179 | 485
2021.09.10D10:28:37.177672179 | 32
..
Processing multiple Google Cloud Storage objects using glob pattern matching:
.qsp.run
.qsp.read.fromGoogleStorage["gs://bucket/numbers*.txt"]
.qsp.map[{"J"$"\n" vs x}]
.qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
2021.09.10D10:28:37.177672179 | 80
2021.09.10D10:28:37.177672179 | 91
2021.09.10D10:28:37.177672179 | 642
2021.09.10D10:28:37.177672179 | 485
2021.09.10D10:28:37.177672179 | 32
..
File Watching:
.qsp.run
.qsp.read.fromGoogleStorage["gs://bucket/numbers*.txt"; .qsp.use ``watch!(`;1b)]
.qsp.map[{"J"$"\n" vs x}]
.qsp.write.toConsole[]
2023.06.15D10:49:19.795351384 | 556
2023.06.15D10:49:19.795351384 | 465
2023.06.15D10:49:19.795351384 | 63
2023.06.15D10:49:19.795351384 | 106
2023.06.15D10:49:19.795351384 | 46
2023.06.15D10:49:20.352682229 | 80
2023.06.15D10:49:20.352682229 | 91
2023.06.15D10:49:20.352682229 | 642
2023.06.15D10:49:20.352682229 | 485
2023.06.15D10:49:20.352682229 | 32
If next we upload numbersThree.txt
to the bucket we see:
2023.06.15D10:49:42.323727244 | 908
2023.06.15D10:49:42.323727244 | 360
2023.06.15D10:49:42.323727244 | 522
2023.06.15D10:49:42.323727244 | 257
2023.06.15D10:49:42.323727244 | 858
Reading a text object by line:
.qsp.run
.qsp.read.fromGoogleStorage[`:gs://bucket/numbers.txt; ``mode!``text]
.qsp.map["J"$]
.qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..
Reading multiple text objects by line:
.qsp.run
.qsp.read.fromGoogleStorage[(`:gs://bucket/numbersOne.txt;`:gs://bucket/numbersTwo.txt);
``mode!``text]
.qsp.map["J"$]
.qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..
2021.09.10D10:28:37.177672179 | 405
2021.09.10D10:28:37.177672179 | 215
2021.09.10D10:28:37.177672179 | 564
2021.09.10D10:28:37.177672179 | 50
2021.09.10D10:28:37.177672179 | 765
..
Reading with a custom tenant:
.qsp.run
.qsp.read.fromGoogleStorage[`:gs://bucket/object; .qsp.use ``tenant!(""; "custom")]
.qsp.write.toVariable[`output]
.qsp.read.fromHTTP
Requests data from an HTTP(S) endpoint
.qsp.read.fromHTTP[url]
.qsp.read.fromHTTP[url; method]
.qsp.read.fromHTTP[url; method; .qsp.use (!) . flip (
(`body ; body);
(`header ; header);
(`onResponse ; onResponse);
(`followRedirects ; followRedirects);
(`maxRedirects ; maxRedirects);
(`maxRetryAttempts; maxRetryAttempts);
(`timeout ; timeout);
(`tenant ; tenant);
(`insecure ; insecure);
(`binary ; binary);
(`sync ; sync);
(`rejectErrors ; rejectErrors)
)]
Parameters:
name | type | description | default |
---|---|---|---|
url | string or symbol | The URL to send a request to. | Required |
method | string or symbol | The HTTP method for the HTTP request (ex. GET, POST, etc.). | GET |
options:
name | type | description | default |
---|---|---|---|
body | string | The payload of the HTTP request. | "" |
header | dict | A map of header fields to send as part of the request. | ()!() |
onResponse | function | After a response, allows the response to be preprocessed or to trigger another request. This function receives the result of the last request as a list where the first element is the return code, the second is the body and the third is the headers of the request. Returning :: will process the return from the original request immediately. A return of a string will issue another request with the return value as the URL. A return of a dictionary allows for any of the operator parameters to be reset and a new HTTP request issued. A special 'response' key can be used in the return dictionary to change the payload of the response. If the response key is set to :: , no data is pushed into the pipeline. |
:: |
followRedirects | boolean | If set, any redirects will automatically be followed up to the maximum number of redirects. | 1b |
maxRedirects | long | The maximum number of redirects to follow before reporting an error. | 5 |
maxRetryAttempts | long | The maximum number of times to retry a request that fails due to a timeout. | 10 |
timeout | long | The duration in milliseconds to wait for a request to be completed before reporting an error. A timeout of 0 is unlimited |
5000 |
tenant | symbol | The request tenant to use for providing request authentication details. | "" |
insecure | boolean | Indicates if unverified server SSL/TLS certificates should be trusted. | 0b |
binary | boolean | Indicates that the resulting payload should be returned as binary data, otherwise text is assumed. | 0b |
sync | boolean | Indicates if this request should be made synchronously or asynchronously. Setting the request to be synchronous will block the process until the request is completed. | 0b |
rejectErrors | boolean | Non-successful response codes will generate an error and stop the pipeline. | 1b |
For all common arguments, refer to configuring operators
Nondeterminism
This reader is not yet deterministic. When recovering from a checkpoint, data from before the recovery will not be replayed, which can result in data loss or duplication.
Return data from a GET request:
.qsp.run
.qsp.read.fromHTTP["https://example.com"]
.qsp.write.toVariable[`output]
Read paged data:
// @overview
// A static example URL.
// @type {string}
URL: "https://example.com"
// @overview
// An 'onResponse' handler that pages through the results of a web API.
// In this example, the response header (`res[2]`) contains a "next-page" key
// with the index of the next page to read. If the key is present, it returns
// a new URL for the HTTP reader to get data from. This will repeat the request
// with the next page until all the data has been read and there are no more pages.
//
// @param res {(int;string;dict)} The response of the last request.
//
// @return {string|null} A string return is a URL for the HTTP reader to issue a
// a new request to.
nextPage: {[res] $[0 < count page: res[2]"S"$"next-page"; URL,"?page=",page; ::]}
.qsp.run
.qsp.read.fromHTTP[URL,"?page=0"; .qsp.use``onResponse!(::;nextPage)]
.qsp.write.toVariable[`output]
Return data from header:
// @overview
// A static example URL.
// @type {string}
URL: "https://example.com"
// @overview
// An onResponse handler that extracts information from the response handler and
// overwrites the data that is pushed into the pipeline to be this value.
//
// @param res {(int;string;dict)} The response of the last request.
//
// @return {dict (response: dict)}
onResponse: {[res] enlist[`response]!enlist res 2 }
.qsp.run
.qsp.read.fromHTTP[URL; .qsp.use``onResponse!(::;onResponse)]
.qsp.write.toVariable[`output]
.qsp.read.fromKafka
Consume data from a Kafka topic
.qsp.read.fromKafka[topic]
.qsp.read.fromKafka[topic; brokers]
.qsp.read.fromKafka[topic; brokers; .qsp.use (!) . flip (
(`retries ; retries);
(`retryWait ; retryWait);
(`pollLimit ; pollLimit);
(`offset ; offset);
(`registry ; registry);
(`asList ; asList);
(`options ; options))]
Parameters:
name | type | description | default |
---|---|---|---|
topic | symbol or string | The name of a topic. | Required |
brokers | string or string[] or symbol or symbol[] | One or more brokers identified as host:port pairs. | "localhost:9092" |
options:
name | type | description | default |
---|---|---|---|
retries | long | Max retry attempts for Kafka API calls. | 10 |
retryWait | timespan | Time to wait between retry attempts. | 0D00:00:02 |
pollLimit | long | Maximum number of records to process in a single poll loop. | 1000 |
offset | dictionary | Partitions to offsets to start reading from. | (`int$())!() |
registry | string | Optional URL to a Kafka Schema Registry. When provided, Kafka Schema Registry mode is enabled, allowing for automatic payload decoding. | "" |
asList | boolean | Set to true for Kafka Schema Registry messages to omit field names when decoding Protocol Buffer schemas, and instead return only the list of values. | 0b |
options | dictionary | Options to be passed through to Kafka. | ()!() |
For all common arguments, refer to configuring operators
This operator acts as a Kafka consumer, consuming data from a Kafka topic and pushing it to the pipeline.
A Kafka consumer will distribute load across parallel readers to maximize the throughput of a given topic.
Workers that join the stream will start reading from the end of the stream unless an
explicit offset is provided. To start by reading from the beginning of the stream, a special start
symbol can be used as the offset value, as in the Custom Offsets example below.
Beside the above keys, all available Kafka consumer
configuration options
are supported using the options
dictionary.
This includes properties such as socket.timeout.ms
, fetch.message.max.bytes
, etc.
Reserved keys
group.id
, metadata.broker.list
, and consumer.id
are reserved values and are maintained
directly by the Kafka reader. The Kafka reader will error if these options are used.
Maximum Poll Limit
The Kafka reader reads multiple messages in a single poll iteration. A global limit is
imposed on the number of messages to read in a single cycle to avoid locking the
process in a read loop only serving Kafka messages. By default this limit is set to
1000 messages. Setting the configuration option pollLimit
will change this value.
This limit is global so if multiple readers set this value, only one will be used.
Running with TLS
It is recommended for all production deployments that TLS be enabled for encrypting data in transit. For more information about configuring TLS, refer to either the microservice tutorial or the platform tutorial depending on your deployment type.
Offset Committing and Group Permissions
.qsp.read.fromKafka
will automatically commit offsets when reading from a kafka broker.
This is essential for exactly once semantics and fault tolerance.
The error "Local: Waiting for coordinator" will occur when attempting to commit
offsets if you do not have sufficient group permissions. The broker should be configured so that
the user has group Describe permissions. Since .qsp.read.fromKafka
will generate a
random group ID, ensure the user is permissioned for all group names on the broker.
Localhost broker:
// Read topic 'numbers' with a default broker at 'localhost:9092'.
// This topic is generating random numbers between 0 and 10
.qsp.run .qsp.read.fromKafka[`numbers] .qsp.write.toConsole[]
2021.03.08D19:19:08.171805000 | 4
2021.03.08D19:19:08.171915000 | 7
2021.03.08D19:19:08.172081000 | 3
..
Multiple brokers:
// Read topic 'numbers' from multiple brokers. This topic is generating
// random numbers between 0 and 10
.qsp.run
.qsp.read.fromKafka[.qsp.use `topic`brokers!(`numbers;("localhost:1234";"localhost:1235"))]
.qsp.write.toConsole[]
2021.03.08D19:19:08.171805000 | 4
2021.03.08D19:19:08.171915000 | 7
2021.03.08D19:19:08.172081000 | 3
..
Advanced configuration:
// Reads topic 'numbers' with a default broker setting custom values for advanced
// consumer configuration.
.qsp.run
.qsp.read.fromKafka[.qsp.use (!) . flip (
(`topic ; `numbers);
(`options; (!) . flip (
(`socket.timeout.ms ; 60000); // Wait 1 minute for socket timeout
(`fetch.message.max.bytes ; 4*1024*1024); // Allow 4MiB max message size
(`fetch.wait.max.ms ; 500))))] // Wait up to 500ms for fetch cycle
.qsp.write.toConsole[]
2021.03.08D19:19:08.171805000 | 4
2021.03.08D19:19:08.171915000 | 7
2021.03.08D19:19:08.172081000 | 3
..
Custom offsets:
// Reads topic 'numbers' from the end of the data stream. This will skip any
// old messages in the stream and start reading from the point that a given
// worker joins the stream. Note that the default offset for a given stream is
// to read from the beginning of the stream.
.qsp.run
.qsp.read.fromKafka[.qsp.use (!) . flip (
(`topic ; `numbers);
(`offset ; `start))] // The 'offset' field can be set to a single value
// to apply to all partitions. Alternatively it can
// be set to a dictionary of partitions and
// offsets. Offsets can either be the special
// symbols `start or `end or a literal offset index
// as a number. Example: 0 1i!(`start; 10)
.qsp.write.toConsole[]
2021.03.08D19:19:08.171805000 | 8
2021.03.08D19:19:08.171915000 | 1
2021.03.08D19:19:08.172081000 | 9
..
TLS authentication:
For more information on TLS configuration, see either the microservice example or platform example depending on the desired deployment configuration.
// Reads topic 'numbers' from a broker configured at `kafka:9092` using
// custom TLS/SSL certificates.
.qsp.run
.qsp.read.fromKafka[.qsp.use (!) . flip (
(`brokers; "kafka:9092");
(`topic ; `numbers);
(`options; (!) . flip (
(`ssl.secret ; "certs");
(`ssl.key.password ; "iamsecure"))))]
.qsp.write.toConsole[]
2021.03.08D19:19:08.171805000 | 4
2021.03.08D19:19:08.171915000 | 7
2021.03.08D19:19:08.172081000 | 3
..
Kafka Schema Registry:
.qsp.run
.qsp.read.fromKafka[.qsp.use (!) . flip (
(`topic;`numbers);
(`brokers;"localhost:9092");
(`registry;"http://localhost:8081"))]
.qsp.write.toConsole[];
.qsp.read.fromMQTT
Subscribe to an MQTT topic
.qsp.read.fromMQTT[topic;broker]
.qsp.read.fromMQTT[topic;broker; .qsp.use (!) . flip (
(`username; username);
(`password; password))]
Parameters:
name | type | description | default |
---|---|---|---|
topic | string or symbol | The MQTT topic to subscribe to. | Required |
broker | string or symbol | URI of MQTT broker in form of protocol://host:port . protocol must be one of "tcp" or "ssl". SSL options such as sslCert are ignored when protocol is "tcp". |
Required |
options:
name | type | description | default |
---|---|---|---|
username | string or symbol | Username for the MQTT broker connection. | "" |
password | string or symbol | Password for the MQTT broker connection. | "" |
sslCert | string or symbol | Path of SSL/TLS certificate PEM file. | "" |
sslKey | string or symbol | Path of SSL/TLS key PEM file. | "" |
sslKeyPassword | string or symbol | Password of private key file if encrypted. | "" |
sslCAFile | string or symbol | Path of SSL/TLS public certificate chain. | "" |
sslCAPath | string or symbol | Path to directory of trusted certificates for SSL connection. | "" |
sslSecret | string or symbol | Name of Kubernetes secret to read SSL configuration from. | "" |
For all common arguments, refer to configuring operators
This operator subscribes to an MQTT broker, pushing messages published to that topic to the pipeline.
Quality of Service
This operator only supports quality of service 0 (at most once messaging). Messages published at a higher QoS will be received at QoS 0.
Determinism
This reader is non-deterministic, as only quality of service 0 (at most once messaging) has been implemented. As such, the flag KXI_ALLOW_NONDETERMINISM must be set to "true" to use this reader.
Subscribe to topic "readings":
.qsp.run
.qsp.read.fromMQTT["readings"; "tcp://localhost:1883"]
.qsp.write.toConsole[];
$ mosquitto_pub -t readings -m "Hello, World!"
2022.07.21D14:51:45.023073601 | "Hello, World!"
Subscribe to a topic "readings" using SSL
.qsp.run
.qsp.read.fromMQTT[.qsp.use (!) . flip (
(`topic ; "readings");
(`broker ; "ssl://localhost:1883");
(`sslCert ; "/ssl/cert.pem");
(`sslKey ; "/ssl/key.pem");
(`sslKeyPassword; "iamsecure");
(`sslCAFile ; "/ssl/ca.pem"))];
.qsp.write.toConsole[]
.qsp.read.fromAzureStorage
Reads an object from Microsoft Azure Storage
.qsp.read.fromAzureStorage[path]
.qsp.read.fromAzureStorage[path; account]
.qsp.read.fromAzureStorage[path; account; .qsp.use (!) . flip (
(`mode ; mode);
(`tenant ; tenant);
(`chunking ; chunking);
(`chunkSize; chunkSize);
(`watch ; watch))]
Parameters:
name | type | description | default |
---|---|---|---|
path | string or string[] or symbol or symbol[] | The Blob URI of an object or multiple objects to read from Microsoft Azure Storage. Note that this must be an ms:// URL*, not an https:// URL. Glob patterns are supported. |
Required |
account | string or symbol | The name of the Microsoft Azure Storage account to read an object from. | $AZURE_STORAGE_ACCOUNT |
options:
name | type | description | default |
---|---|---|---|
mode | symbol | Either binary or text . |
binary |
tenant | string | The authentication tenant. | "" |
chunking | symbol | Either enabled , disabled or auto . |
auto |
chunkSize | long or string | The size in bytes of chunks to read when chunking is enabled or the number of bytes to read using a size suffix. Sizes that are specified as strings are always assumed to be bytes. | "1MB" |
credentials | string | Name of a credentials secret to mount. See the authentication section below for details. | "" |
watch | boolean or dictionary | Either 0b or 1b or a dictionary with keys method and frequency . The only current method is timer . The default frequency is 0D00:00:05 . If watch set to 1b use the watch defaults. |
0b |
For all common arguments, refer to configuring operators
ms://
URLs
This operator requires URLs to be in the format ms://yourContainerName/path/to/file.csv
,
so the URL https://myStorageAccount.blob.core.windows.net/myContainerName/path/to/file.csv
would be written as .qsp.read.fromAzureStorage["ms://myContainerName/path/to/file.csv"; "myStorageAccount"]
This operator reads an object from a Microsoft Azure Blob Storage bucket.
Object mode
Setting the object mode to binary
will read content as a byte vector. When reading
an object as text
, data will be read as strings and be split on newlines. Each string
vector represents a single line.
Partitioning
The pipeline will automatically scale the number of workers to the minimum of the number
of file paths and the maxWorkers
in the pipeline settings. The workers will read a
disjoint union of the paths. You can disable partitioning by setting maxWorkers
to 1. Note that partitioning is not currently possible when there is more than one reader.
File watching
Setting the file watch to 1b
will enable file watching with default settings. Usually
this will be paired with a glob pattern for the file path or list of glob patterns.
The reader will continually watch for new files matching the file path(s) provided using
the watch method. Partitioning is not possible when using file watching. Note that by using
watching the pipeline will continue watching until there is manual intervention to finish
the pipeline. Also note, at present the file watcher will do nothing other than print a
warning if a file is modified. In particular for files that are appended to, the new data
will not be processed.
File watching should not to be used with the directWrite
option in .qsp.write.toDatabase
You should not set file watch to 1b
when using the directWrite
in .qsp.write.toDatabase
since directWrite
relies on a definitive finish point which is not guaranteed when using the file watcher.
Object chunking
Chunking an object splits the object into smaller batches and streams the batches through
the pipeline. If chunking
is set to auto
, the reader will determine the size of
the target object and if it is sufficiently large (more than a few megabytes) it will
be read in chunks.
Operator can only read from one account, domain and tenant
When reading in multiple files there must only be one account used to access all file paths passed into the operator. The same goes for domain and tenant.
Required Permissions
.qsp.read.fromAzure
requires the ability to list and retrieve objects on the defined Azure blob storage
buckets and as such requires the credentials provided by the user to have access to the
following operations List Blobs
and Get Blobs
follow the supplied links for more information.
Authentication
The Microsoft Azure Storage reader uses kurl
for credential discovery.
See credential discovery
in the Azure
tab for more information. When running on Azure provisioned cloud infrastructure,
the credential discovery will automatically use the credentials of the user that launched the
instance.
Environment variable authentication:
To setup authentication using environment variables, set AZURE_STORAGE_ACCOUNT
to the name
of the storage account to read from and AZURE_STORAGE_SHARED_KEY
to one of the keys for
that account. To use shared keys, refer to
these instructions
For Docker based configurations, set the variables in the worker image configuration.
docker-compose
:
version: "3.3"
services:
worker:
image: portal.dl.kx.com/kxi-sp-worker:1.11.0
environment:
AZURE_STORAGE_ACCOUNT: "123abc"
AZURE_STORAGE_SHARED_KEY: "aAabBbcCcdDd111222333"
For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "ms-reader",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
env : {
AZURE_STORAGE_ACCOUNT: "123abc",
AZURE_STORAGE_SHARED_KEY: "aAabBbcCcdDd111222333"
}
}' | jq -asR .)"
Kubernetes secrets for authentication:
When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.
First, create a secret using a generic secret. Take care to ensure the secret is created in the correct namespace.
kubectl create secret generic --from-literal token="aAabBbcCcdDd111222333" mscreds
Next, add the secret name and the account name to the Azure Storage Reader reader configuration.
.qsp.read.fromAzureStorage[`:ms://bucket/hello; "abc123"; .qsp.use``credentials!``mscreds]
Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "ms-reader",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
kubeConfig : { secrets : ["mscreds"] }
}' | jq -asR .)"
Examples
Processing an Microsoft Azure Storage object:
.qsp.run
.qsp.read.fromAzureStorage[`:ms://bucket/numbers.txt; `myaccount]
.qsp.map[{"J"$"\n" vs x}]
.qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
Processing multiple Microsoft Azure Storage objects:
.qsp.run
.qsp.read.fromAzureStorage[(`:ms://bucket/fileOne.txt;`:ms://bucket/fileTwo.txt); `myaccount]
.qsp.map[{"J"$"\n" vs x}]
.qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
2021.09.10D10:28:37.177672179 | 32
2021.09.10D10:28:37.177672179 | 62
2021.09.10D10:28:37.177672179 | 567
2021.09.10D10:28:37.177672179 | 20
2021.09.10D10:28:37.177672179 | 13
..
Processing multiple Microsoft Azure Storage objects using glob pattern matching:
.qsp.run
.qsp.read.fromAzureStorage["ms://bucket/file*.txt"; `myaccount]
.qsp.map[{"J"$"\n" vs x}]
.qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
2021.09.10D10:28:37.177672179 | 32
2021.09.10D10:28:37.177672179 | 62
2021.09.10D10:28:37.177672179 | 567
2021.09.10D10:28:37.177672179 | 20
2021.09.10D10:28:37.177672179 | 13
..
File Watching:
.qsp.run
.qsp.read.fromAzureStorage["ms://bucket/file*.txt"; `myaccount; .qsp.use ``watch!(`;1b)]
.qsp.map[{"J"$"\n" vs x}]
.qsp.write.toConsole[]
2023.06.15D10:49:19.795351384 | 556
2023.06.15D10:49:19.795351384 | 465
2023.06.15D10:49:19.795351384 | 63
2023.06.15D10:49:19.795351384 | 106
2023.06.15D10:49:19.795351384 | 46
2023.06.15D10:49:20.352682229 | 32
2023.06.15D10:49:20.352682229 | 62
2023.06.15D10:49:20.352682229 | 567
2023.06.15D10:49:20.352682229 | 20
2023.06.15D10:49:20.352682229 | 13
If next we upload fileThree.txt
to the bucket we see:
2023.06.15D10:49:42.323727244 | 908
2023.06.15D10:49:42.323727244 | 360
2023.06.15D10:49:42.323727244 | 522
2023.06.15D10:49:42.323727244 | 257
2023.06.15D10:49:42.323727244 | 858
.qsp.run
.qsp.read.fromAzureStorage[`:ms://bucket/numbers.txt; `myaccount; .qsp.use ``mode!``text]
.qsp.map["J"$]
.qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..
Reading multiple text objects by line:
.qsp.run
.qsp.read.fromAzureStorage[(`:ms://bucket/numbersOne.txt;`:ms://bucket/numbersTwo.txt);
`myaccount; .qsp.use ``mode!``text]
.qsp.map["J"$]
.qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..
2021.09.10D10:28:37.177672179 | 324
2021.09.10D10:28:37.177672179 | 214
2021.09.10D10:28:37.177672179 | 877
2021.09.10D10:28:37.177672179 | 96
2021.09.10D10:28:37.177672179 | 102
..
Reading with a custom tenant:
.qsp.run
.qsp.read.fromAzureStorage[`:ms://bucket/object; .qsp.use `account`tenant!("myaccount"; "custom")]
.qsp.write.toVariable[`output]
.qsp.read.fromParquet
The parquet file reader reads the given parquet file, entering the parquet file contents (table or mixed list of arrays) into the pipeline.
Performance and Memory Efficiency
The defaults are generally tuned towards good performance. The plugin may not handle parquet files which are larger than the total amount of memory. To handle such files, you can partition them into separate tabular datasets to be efficiently processed by the arrowkdb APIs for partitioning and metadata.
.qsp.read.fromParquet[path]
.qsp.read.fromParquet[path; .qsp.use (!) . flip (
(`mode ; mode);
(`metadata ; metadata);
(`storage ; storage);
(`region ; region);
(`certificates ; certificates);
(`credentials ; credentials);
(`connection ; connection);
(`project ; project);
(`domain ; domain);
(`tenant ; tenant);
(`watch ; watch))]
Parameters:
name | type | description | default |
---|---|---|---|
path | symbol or symbol[] or string or string[] | A file path or list of file paths of parquet file(s). Glob patterns are supported. | Required |
options:
name | type | description | default |
---|---|---|---|
mode | symbol | Either table or lists . |
table |
metadata | dictionary | Metadata keys with their types to be applied. | (::) |
storage | string | Temporary storage of downloaded parquet files. | /tmp |
region | string | The AWS region to authenticate against. | us-east-1 |
certificates | string | Location of default trust store of SSL certificates. | "/opt/kx/ca-bundle.crt" |
credentials | string | Name of a credentials secret to mount. | "" |
connection | string | The Azure Storage account connection string. | "" |
project | string | The Google Cloud Storage project ID. | "" |
domain | string | The name of your domain. | "" |
tenant | symbol | The authentication tenant. | ` |
watch | boolean or dictionary | Either 0b or 1b or a dictionary with keys method and frequency . The only current method is timer . The default frequency is 0D00:00:05 . If watch set to 1b use the watch defaults. |
0b |
For all common arguments, refer to configuring operators
This operator decodes a Parquet file into either a kdb+ table or mixed list of arrays.
Parquet mode
Setting the parquet mode to table
will read content as a native KDB table.
When reading a parquet file as lists
, data will be read as list of arrays.
Each array represents a single column.
Parquet metadata
Metadata maps tag values taken as symbols or strings to KDB datatypes named at https://code.kx.com/q/basics/datatypes/. Valid examples of such metadata might be: `year`month!`int`short or ("year";"month")!("int";"short").
Partitioning
The pipeline will automatically scale the number of workers to the minimum
of the number of file paths and the maxWorkers
in the pipeline settings.
The workers will read a disjoint union of the paths. You can disable
partitioning by setting maxWorkers
to 1. Note that partitioning is not
currently possible when there is more than one reader.
Parquet file watching
Setting parquet file watching to 1b
will enable parquet file watching with
default settings. Usually this will be paired with a glob pattern for the
parquet file path or list of glob patterns. The reader will continually
watch for new parquet files matching the parquet file path(s) provided
using the watch method. Partitioning is not possible when using file
watching. Note that by using watching the pipeline will continue watching
until there is manual intervention to finish the pipeline. Also note, at
present the parquet file watcher will do nothing other than print a warning
if a file is modified. In particular for parquet files that are appended to,
the new data will not be processed.
File watching should not to be used with the directWrite
option in .qsp.write.toDatabase
You should not set file watch to 1b
when using the directWrite
in .qsp.write.toDatabase
since directWrite
relies on a definitive finish point which is not guaranteed when using the file watcher.
Parquet chunking
Chunking is not possible with the current reader. Since parquet files include a file footer which is mandatory with each file, a file cannot be parsed by simply partitioning the file. To partition parquet files, go to parquet.apache.org and follow the guide to split parquet files.
Operator can only watch from one domain and tenant
When reading in multiple files there must only be one domain and tenant used to access all file paths passed into the operator.
Support of Apache Arrow compression algorithms
This reader currently only supports Parquet data stored in Snappy compression format.
Authentication of Parquet reader
Docker authentication:
For Docker based configurations, set the variables in the worker image configuration.
docker-compose
:
version: "3.3"
services:
worker:
image: portal.dl.kx.com/kxi-sp-worker:1.11.0
environment:
AWS_ACCESS_KEY_ID: "abcd"
AWS_SECRET_ACCESS_KEY: "iamasecret"
AZURE_STORAGE_ACCOUNT: "123abc"
AZURE_STORAGE_SHARED_KEY: "aAabBbcCcdDd111222333"
AZURE_STORAGE_CONNECTION_STRING: "<REDACTED>"
GOOGLE_STORAGE_TOKEN: "123abc"
GOOGLE_APPLICATION_CREDENTIALS: "/path/to/application_default_credentials.json"
Kubernetes secret authentication:
To setup authentication in Kubernetes cluster combine credentials of your cloud providers into a single generic secret of its corresponding namespace.
kubectl create secret generic \
--from-file credentials=$HOME/.aws/credentials \
--from-literal account="123abc" \
--from-literal token="aAabBbcCcdDd111222333" \
--from-literal connection="<REDACTED>" \
--from-literal access_token="123abc" \
--from-file application_default_credentials.json=$HOME/.config/gcloud/application_default_credentials.json \
pqtcreds
AWS Authentication
The Parquet file watcher uses kurl
for credential discovery.
See credential discovery
in the AWS tab for more information.
Environment variable authentication:
To setup authentication using environment variables, set AWS_ACCESS_KEY_ID
and
AWS_SECRET_ACCESS_KEY
.
For Docker based configurations, set the variables in the worker image configuration.
docker-compose
:
version: "3.3"
services:
worker:
image: portal.dl.kx.com/kxi-sp-worker:1.11.0
environment:
AWS_ACCESS_KEY_ID: "abcd"
AWS_SECRET_ACCESS_KEY: "iamasecret"
Docker credentials file for authentication:
To load a custom configuration file into a Docker deployment, mount the credentials file
directory into the container and set KXI_SP_CONFIG_PATH
to point to the configuration
directory.
version: "3.3"
services:
worker:
image: portal.dl.kx.com/kxi-sp-worker:1.11.0
volumes:
- $HOME/.aws/:/config/pqtcreds
environment:
KXI_SP_CONFIG_PATH: "/config"
Next, add the secret name to the Parquet reader configuration
.qsp.read.fromParquet["s3://bucket/hello.parquet"; .qsp.use``credentials!``pqtcreds]
Now deploying the pipeline will read the credentials from the AWS credentials file. Note
that the credentials file name must be credentials
.
For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "parquet-reader",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
env : { AWS_ACCESS_KEY_ID: "abcd", AWS_SECRET_ACCESS_KEY: "iamasecret" }
}' | jq -asR .)"
Kubernetes secrets for authentication:
When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.
First, create a secret using an AWS credentials file. Take care to ensure the secret is created in the correct namespace.
kubectl create secret generic --from-file credentials=$HOME/.aws/credentials pqtcreds
Note that the secret must be formatted so that the variable names are lower case and there is a space either side of the equals sign like so:
[default]
aws_access_key_id = abcd1234
aws_secret_access_key = abcd1234
Next, add the secret name to the Parquet reader configuration
.qsp.read.fromParquet["s3://bucket/hello.parquet"; .qsp.use``credentials!``pqtcreds]
Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "parquet-reader",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
kubeConfig : { secrets : ["pqtcreds"] }
}' | jq -asR .)"
Azure Authentication
The Parquet Microsoft Azure Storage watcher uses kurl
for credential discovery.
See credential discovery
in the Azure
tab for more information. When running on Azure provisioned cloud
infrastructure, the credential discovery will automatically use the credentials of
the user that launched the instance.
Environment variable authentication:
To setup authentication using environment variables,
refer to Configure Azure Storage connection strings
and set AZURE_STORAGE_CONNECTION_STRING
to download Parquet files from the storage.
For Parquet file watcher,
set AZURE_STORAGE_ACCOUNT
to the name of the storage account to watch for
and AZURE_STORAGE_SHARED_KEY
to one of the keys for that account.
To use shared keys, refer to these instructions
For Docker based configurations, set the variables in the worker image configuration.
docker-compose
:
version: "3.3"
services:
worker:
image: portal.dl.kx.com/kxi-sp-worker:1.11.0
environment:
AZURE_STORAGE_ACCOUNT: "123abc"
AZURE_STORAGE_SHARED_KEY: "aAabBbcCcdDd111222333"
AZURE_STORAGE_CONNECTION_STRING: "<REDACTED>"
For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "parquet-reader",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
env : {
AZURE_STORAGE_ACCOUNT: "123abc",
AZURE_STORAGE_SHARED_KEY: "aAabBbcCcdDd111222333",
AZURE_STORAGE_CONNECTION_STRING: "<REDACTED>"
}
}' | jq -asR .)"
Kubernetes secrets for authentication:
When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.
First, create a secret using a generic secret. Take care to ensure the secret is created in the correct namespace.
kubectl create secret generic \
--from-literal account="123abc" \
--from-literal token="aAabBbcCcdDd111222333" \
--from-literal connection="<REDACTED>" pqtcreds
Next, add the secret name and the account name to the Parquet Azure Storage Reader configuration.
.qsp.read.fromParquet["ms://bucket/*.parquet"; .qsp.use``credentials`connection`watch!(`;`pqtcreds;"<REDACTED>";1b)]
Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "parquet-reader",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
kubeConfig : { secrets : ["pqtcreds"] }
}' | jq -asR .)"
GCS Authentication
The Google Cloud Storage watcher uses kurl
for credential discovery.
See credential discovery
in the GCP
tab for more information. When running on Google provisioned cloud infrastructure,
the credential discovery will automatically use the credentials of the user that launched the
instance.
Environment variable authentication:
To setup authentication using an environment variable,
refer to How Application Default Credentials works
and set GOOGLE_APPLICATION_CREDENTIALS
to download Parquet files from the storage.
For Parquet file watcher, set GOOGLE_STORAGE_TOKEN
with
the output of running gcloud auth print-access-token
.
To install the Google SDK, refer to
these instructions.
By default, Google Cloud API tokens expire after 1 hour
However, you can request a longer expiration time of up to 12 hours by specifying the "expires_in" parameter when you authenticate.
For Docker based configurations, set the variables in the worker image configuration.
docker-compose
:
version: "3.3"
services:
worker:
image: portal.dl.kx.com/kxi-sp-worker:1.11.0
environment:
GOOGLE_STORAGE_TOKEN: "123abc"
GOOGLE_APPLICATION_CREDENTIALS: "/path/to/application_default_credentials.json"
For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "parquet-reader",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
env : {
GOOGLE_STORAGE_TOKEN: "123abc",
GOOGLE_APPLICATION_CREDENTIALS: "/path/to/application_default_credentials.json"
}
}' | jq -asR .)"
Kubernetes secrets for authentication:
When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.
First, create a secret using a generic secret. Take care to ensure the secret is created in the correct namespace.
kubectl create secret generic \
--from-literal access_token=$(gcloud auth print-access-token) \
--from-file application_default_credentials.json=$HOME/.config/gcloud/application_default_credentials.json \
pqtcreds
Next, add the secret name to the Google Cloud Reader reader configuration
.qsp.read.fromParquet["gs://bucket/hello.parquet"; .qsp.use``credentials!``pqtcreds]
Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "parquet-reader",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
kubeConfig : { secrets : ["pqtcreds"] }
}' | jq -asR .)"
Examples
These examples assume you have Parquet files stored locally or in object storage buckets.
Decode a Parquet file into a kdb+ table:
/ -- Reading to a kdb+ table
input: "/tmp/table.parquet"
.qsp.run
.qsp.read.fromParquet[input]
.qsp.write.toVariable[`output]
output
tm sym px qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX" 101.9 10
2023.06.09D11:00:00.000000000 "HSI" 102.8 20
2023.06.09D12:00:00.000000000 "DIA" 103.7 30
2023.06.09D13:00:00.000000000 "SPY" 104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50
Decode a Parquet file into a mixed list of arrays:
/ -- Reading the Parquet file to a list of arrays
input: "/tmp/table.parquet"
.qsp.run
.qsp.read.fromParquet[input; .qsp.use``mode!``lists];
.qsp.write.toVariable[`output]
output
2023.06.09D10:00:00.000000000 "AEX" 101.9 10
2023.06.09D11:00:00.000000000 "HSI" 102.8 20
2023.06.09D12:00:00.000000000 "DIA" 103.7 30
2023.06.09D13:00:00.000000000 "SPY" 104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50
Reading a Parquet file from AWS S3 registry:
input: "s3://bucket/table.parquet"
.qsp.run
.qsp.read.fromParquet[input; .qsp.use``region!(`;"us-east-1")]
.qsp.write.toVariable[`output]
output
tm sym px qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX" 101.9 10
2023.06.09D11:00:00.000000000 "HSI" 102.8 20
2023.06.09D12:00:00.000000000 "DIA" 103.7 30
2023.06.09D13:00:00.000000000 "SPY" 104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50
Parquet file watching from AWS S3 registry:
input: "s3://bucket/retention/interval*.parquet"
.qsp.run
.qsp.read.fromParquet[input; .qsp.use``region`watch!(`;"us-east-1";1b)]
.qsp.write.toVariable[`output]
output
tm sym px qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX" 101.9 10
2023.06.09D11:00:00.000000000 "HSI" 102.8 20
2023.06.09D12:00:00.000000000 "DIA" 103.7 30
2023.06.09D13:00:00.000000000 "SPY" 104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50
If next we upload intervalJul.parquet
to the bucket we see:
tm sym px qty
-----------------------------------------------
2023.07.19D15:00:00.000000000 "FTXO" 106.4 60
2023.07.19D16:00:00.000000000 "OAIE" 107.3 70
2023.07.19D17:00:00.000000000 "RDIV" 108.2 80
2023.07.19D18:00:00.000000000 "KBWB" 109.1 90
2023.07.19D19:00:00.000000000 "BIGB" 110.0 100
Decode a Parquet Microsoft Azure Storage file:
/ -- Reading to a kdb+ table
input: "ms://bucket/table.parquet"
.qsp.run
.qsp.read.fromParquet[input; .qsp.use``connection!(`;"<REDACTED>")]
.qsp.write.toVariable[`output]
output
tm sym px qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX" 101.9 10
2023.06.09D11:00:00.000000000 "HSI" 102.8 20
2023.06.09D12:00:00.000000000 "DIA" 103.7 30
2023.06.09D13:00:00.000000000 "SPY" 104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50
Parquet watching of tabular dataset from Azure:
input: "ms://bucket/retention/year=2023/month=*/interval*.parquet"
metadata: `year`month!`int`short
.qsp.run
.qsp.read.fromParquet[input; .qsp.use``connection`metadata`watch!(`;"<REDACTED";metadata;1b)]
.qsp.write.toVariable[`output]
output
tm sym px qty year month
----------------------------------------------------------
2023.06.09D10:00:00.000000000 "AEX" 101.9 10 2023 6
2023.06.09D11:00:00.000000000 "HSI" 102.8 20 2023 6
2023.06.09D12:00:00.000000000 "DIA" 103.7 30 2023 6
2023.06.09D13:00:00.000000000 "SPY" 104.6 40 2023 6
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50 2023 6
If next we upload intervalJul.parquet
to the bucket we see:
tm sym px qty year month
---------------------------------------------------------
2023.07.19D15:00:00.000000000 "FTXO" 106.4 60 2023 7
2023.07.19D16:00:00.000000000 "OAIE" 107.3 70 2023 7
2023.07.19D17:00:00.000000000 "RDIV" 108.2 80 2023 7
2023.07.19D18:00:00.000000000 "KBWB" 109.1 90 2023 7
2023.07.19D19:00:00.000000000 "BIGB" 110 100 2023 7
Decode a Parquet Google Cloud Storage file:
/ -- Reading to a kdb+ table
input: "gs://bucket/table.parquet"
.qsp.run
.qsp.read.fromParquet[input; .qsp.use``project!(`;"<REDACTED>")]
.qsp.write.toVariable[`output]
output
tm sym px qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX" 101.9 10
2023.06.09D11:00:00.000000000 "HSI" 102.8 20
2023.06.09D12:00:00.000000000 "DIA" 103.7 30
2023.06.09D13:00:00.000000000 "SPY" 104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50
.qsp.read.fromPostgres
Execute a query on a PostgreSQL database
.qsp.read.fromPostgres[query]
.qsp.read.fromPostgres[query; database]
.qsp.read.fromPostgres[query; database; .qsp.use (!) . flip (
(`server ; server);
(`port ; port);
(`username; username);
(`password; password))]
Parameters:
name | type | description | default |
---|---|---|---|
query | string | Query to execute on the Postgres database. | Required |
database | string | Name of the database to connect to. | $KXI_SP_POSTGRES_DATABASE |
options:
name | type | description | default |
---|---|---|---|
server | string | Address of the database to connect to. | $KXI_SP_POSTGRES_SERVER |
port | string | Port of database. | $KXI_SP_POSTGRES_PORT |
username | string | Username to authenticate with. | $KXI_SP_POSTGRES_USERNAME |
password | string | Password to authenticate with. | $KXI_SP_POSTGRES_PASSWORD |
For all common arguments, refer to configuring operators
This operator executes a query on a PostgreSQL database and pushes the result into the pipeline.
After the query has completed processing, the Postgres reader will signal a 'finish' command which will teardown the pipeline if there are no other pending requests.
Reading from a table 'stocks':
.qsp.run
.qsp.read.fromPostgres["select * from stocks"]
.qsp.write.toConsole[]
| id sym market name cap
-----------------------------| ---------------------------------------------------------------------------------------
2021.09.10D18:29:31.124921700| 1 "OXLC" "NASDAQ" "Oxford Lane Capital Corp." "$229.56M"
2021.09.10D18:29:31.124921700| 2 "CIA" "NYSE" "Citizens, Inc." "$350.57M"
2021.09.10D18:29:31.124921700| 3 "PEI" "NYSE" "Pennsylvania Real Estate Investment Trust" "$785.28M"
2021.09.10D18:29:31.124921700| 4 "SPWR" "NASDAQ" "SunPower Corporation" "$1.06B"
2021.09.10D18:29:31.124921700| 5 "DVA" "NYSE" "DaVita Inc." "$12.88B"
2021.09.10D18:29:31.124921700| 6 "BHACW" "NASDAQ" "Barington/Hilco Acquisition Corp." "n/a"
2021.09.10D18:29:31.124921700| 7 "BCRX" "NASDAQ" "BioCryst Pharmaceuticals, Inc." "$446.33M"
..
.qsp.read.fromStream
Read data using a kdb Insights Stream
.qsp.read.fromStream[table]
.qsp.read.fromStream[table; stream]
.qsp.read.fromStream[table; stream; .qsp.use enlist[`position]!enlist position]
.qsp.read.fromStream[table; stream; .qsp.use enlist[`index]!enlist index]
Parameters:
name | type | description | default |
---|---|---|---|
table | symbol or string | A table name. | A table name to filter incoming messages by. Leave blank to receive all messages. |
stream | symbol or string | The inbound stream | $RT_SUB_TOPIC |
options:
name | type | description | default |
---|---|---|---|
position | symbol or string | The position in the stream to replay from. Options are start or end of the stream. |
start |
index (DEPRECATED) | long | The integer position in the stream to replay from. If both index and position are specified the position parameter will take priority. |
0 |
For all common arguments, refer to configuring operators
This operator reads data using kdb Insights Reliable Transport Streams.
.qsp.read.fromAmazonS3
Reads an object from AWS S3
.qsp.read.fromAmazonS3[path; region]
.qsp.read.fromAmazonS3[path; region; .qsp.use (!) . flip (
(`mode ; mode);
(`tenant ; tenant);
(`chunking ; chunking);
(`chunkSize ; chunkSize);
(`credentials; credentials);
(`watch ; watch))]
Parameters:
name | type | description | default |
---|---|---|---|
path | symbol or symbol[] or string or string[] | The S3 URI of an object or multiple objects to read from S3. Glob patterns are supported. | Required |
region | string | The AWS region to authenticate against. | Required |
options:
name | type | description | default |
---|---|---|---|
mode | symbol | Either binary or text . |
binary |
tenant | string | The authentication tenant. | ` |
region | string | The AWS region to authenticate against. | us-east-1 |
chunking | symbol | Either enabled , disabled or auto . |
auto |
chunkSize | long or string | The size in bytes of chunks to read when chunking is enabled or the number of bytes to read using a size suffix. Sizes that are specified as strings are always assumed to be bytes. | "1MB" |
credentials | string | Name of a credentials secret to mount. See the authentication section below for details. | "" |
domain | string | The name of your domain. | "" |
watch | boolean or dictionary | Either 0b or 1b or a dictionary with keys method and frequency . The only current method is timer . The default frequency is 0D00:00:05 . If watch set to 1b use the watch defaults. |
0b |
For all common arguments, refer to configuring operators
This operator reads an object from an Amazon Web Services S3 bucket as either text or binary.
File mode
Setting the file mode to binary
will read content as a byte vector. When reading
a file as text
, data will be read as strings and be split on newlines. Each string
vector represents a single line.
Partitioning
The pipeline will automatically scale the number of workers to the minimum of the number
of file paths and the maxWorkers
in the pipeline settings. The workers will read a
disjoint union of the paths. You can disable partitioning by setting maxWorkers
to 1. Note that partitioning is not currently possible when there is more than one reader.
File watching
Setting the file watch to 1b
will enable file watching with default settings. Usually
this will be paired with a glob pattern for the file path or list of glob patterns.
The reader will continually watch for new files matching the file path(s) provided using
the watch method. Partitioning is not possible when using file watching. Note that by using
watching the pipeline will continue watching until there is manual intervention to finish
the pipeline. Also note, at present the file watcher will do nothing other than print a
warning if a file is modified. In particular for files that are appended to, the new data
will not be processed.
File watching should not to be used with the directWrite
option in .qsp.write.toDatabase
You should not set file watch to 1b
when using the directWrite
in .qsp.write.toDatabase
since directWrite
relies on a definitive finish point which is not guaranteed when using the file watcher.
File chunking
Chunking a file splits the file into smaller batches and streams the batches through
the pipeline. If chunking
is set to auto
, the reader will determine the size of
the target file and if it is sufficiently large (more than a few megabytes) it will
be read in chunks.
Operator can only read from one domain and tenant
When reading in multiple files there must only be one domain and tenant used to access all file paths passed into the operator.
Required Permissions
.qsp.read.fromAmazonS3
requires the ability to list and retrieve objects on the defined S3
bucket and as such requires the credentials provided by the user to have access to the
s3:ListBucket
, s3:GetObject
and s3:ListObjects
ACLs, see
here
for more information.
Authentication
The S3 reader uses kurl
for credential discovery.
See credential discovery
in the AWS tab for more information.
Environment variable authentication:
To setup authentication using environment variables, set AWS_ACCESS_KEY_ID
and
AWS_SECRET_ACCESS_KEY
.
For Docker based configurations, set the variables in the worker image configuration.
docker-compose
:
version: "3.3"
services:
worker:
image: portal.dl.kx.com/kxi-sp-worker:1.11.0
environment:
AWS_ACCESS_KEY_ID: "abcd"
AWS_SECRET_ACCESS_KEY: "iamasecret"
Docker credentials file for authentication:
To load a custom configuration file into a Docker deployment, mount the credentials file
directory into the container and set KXI_SP_CONFIG_PATH
to point to the configuration
directory.
version: "3.3"
services:
worker:
image: portal.dl.kx.com/kxi-sp-worker:1.11.0
volumes:
- $HOME/.aws/:/config/awscreds
environment:
KXI_SP_CONFIG_PATH: "/config"
Next, add the secret name to the S3 reader configuration
.qsp.read.fromAmazonS3[`:s3://bucket/hello; "us-east-1"; .qsp.use``credentials!``awscreds]
Now deploying the pipeline will read the credentials from the AWS credentials file. Note
that the credentials file name must be credentials
.
For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "s3-reader",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
env : { AWS_ACCESS_KEY_ID: "abcd", AWS_SECRET_ACCESS_KEY: "iamasecret" }
}' | jq -asR .)"
Kubernetes secrets for authentication:
When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.
First, create a secret using an AWS credentials file. Take care to ensure the secret is created in the correct namespace.
kubectl create secret generic --from-file credentials=$HOME/.aws/credentials awscreds
Note that the secret must be formatted so that the variable names are lower case and there is a space either side of the equals sign like so:
[default]
aws_access_key_id = abcd1234
aws_secret_access_key = abcd1234
Next, add the secret name to the S3 reader configuration
.qsp.read.fromAmazonS3[`:s3://bucket/hello; "us-east-1"; .qsp.use``credentials!``awscreds]
Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "s3-reader",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
kubeConfig : { secrets : ["awscreds"] }
}' | jq -asR .)"
Examples
Processing an AWS S3 object:
.qsp.run
.qsp.read.fromAmazonS3[`:s3://bucket/numbers.txt; "us-east-1"]
.qsp.map[{"J"$"\n" vs x}]
.qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
Processing multiple AWS S3 objects:
.qsp.run
.qsp.read.fromAmazonS3[(`:s3://bucket/numbersOne.txt;`:s3://bucket/numbersTwo.txt); "us-east-1";]
.qsp.map[{"J"$"\n" vs x}]
.qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
2021.09.10D10:28:37.177672179 | 435
2021.09.10D10:28:37.177672179 | 365
2021.09.10D10:28:37.177672179 | 452
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 42
..
Processing multiple AWS S3 objects using glob pattern matching:
.qsp.run
.qsp.read.fromAmazonS3["s3://bucket/numbers*.txt"; "us-east-1";]
.qsp.map[{"J"$"\n" vs x}]
.qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
2021.09.10D10:28:37.177672179 | 435
2021.09.10D10:28:37.177672179 | 365
2021.09.10D10:28:37.177672179 | 452
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 42
..
File Watching:
.qsp.run
.qsp.read.fromAmazonS3["s3://bucket/numbers*.txt"; "us-east-1"; .qsp.use ``watch!(`;1b)]
.qsp.map[{"J"$"\n" vs x}]
.qsp.write.toConsole[]
2023.06.15D10:49:19.795351384 | 556
2023.06.15D10:49:19.795351384 | 465
2023.06.15D10:49:19.795351384 | 63
2023.06.15D10:49:19.795351384 | 106
2023.06.15D10:49:19.795351384 | 46
2023.06.15D10:49:20.352682229 | 435
2023.06.15D10:49:20.352682229 | 365
2023.06.15D10:49:20.352682229 | 452
2023.06.15D10:49:20.352682229 | 44
2023.06.15D10:49:20.352682229 | 42
If next we upload numbersThree.txt
to the bucket we see:
2023.06.15D10:49:42.323727244 | 908
2023.06.15D10:49:42.323727244 | 360
2023.06.15D10:49:42.323727244 | 522
2023.06.15D10:49:42.323727244 | 257
2023.06.15D10:49:42.323727244 | 858
Reading a text object by line:
.qsp.run
.qsp.read.fromAmazonS3[`:s3://bucket/numbers.txt; "us-east-1"; .qsp.use ``mode!``text]
.qsp.map["J"$]
.qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..
Reading multiple text objects by line:
.qsp.run
.qsp.read.fromAmazonS3[(`:s3://bucket/numbersOne.txt;`:s3://bucket/numbersTwo.txt); "us-east-1";
.qsp.use ``mode!``text]
.qsp.map["J"$]
.qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..
2021.09.10D10:28:37.177672179 | 435
2021.09.10D10:28:37.177672179 | 365
2021.09.10D10:28:37.177672179 | 452
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 42
..
Reading with a custom region and tenant:
.qsp.run
.qsp.read.fromAmazonS3[`:s3://bucket/object; "us-west-1" ;(enlist `tenant)!enlist "custom"]
.qsp.write.toVariable[`output]
.qsp.read.fromSQLServer
Execute a query on an SQL Server database
.qsp.read.fromSQLServer[query]
.qsp.read.fromSQLServer[query; database]
.qsp.read.fromSQLServer[query; database; .qsp.use (!) . flip (
(`server ; server);
(`port ; port);
(`username; username);
(`password; password))]
Parameters:
name | type | description | default |
---|---|---|---|
query | string | Query to execute on the SQL Server database. | Required |
database | string | Name of the database to connect to. | $KXI_SP_SQLSERVER_DATABASE |
options:
name | type | description | default |
---|---|---|---|
server | string | Address of the database. | $KXI_SP_SQLSERVER_SERVER |
port | string | Port of database. | $KXI_SP_SQLSERVER_PORT |
username | string | Username to authenticate with. | $KXI_SP_SQLSERVER_USERNAME |
password | string | Password to authenticate with. | $KXI_SP_SQLSERVER_PASSWORD |
For all common arguments, refer to configuring operators
This operator executes a query on a Microsoft SQL Server database and pushes the result into the pipeline.
After the query has completed processing, the SQL Server reader will signal a 'finish' command which will teardown the pipeline if there are no other pending requests.
Reading from a table 'stocks':
.qsp.run
.qsp.read.fromSQLServer["select * from stocks"]
.qsp.write.toConsole[]
| id sym market name cap
-----------------------------| ---------------------------------------------------------------------------------------
2021.09.10D18:29:31.124921700| 1 "OXLC" "NASDAQ" "Oxford Lane Capital Corp." "$229.56M"
2021.09.10D18:29:31.124921700| 2 "CIA" "NYSE" "Citizens, Inc." "$350.57M"
2021.09.10D18:29:31.124921700| 3 "PEI" "NYSE" "Pennsylvania Real Estate Investment Trust" "$785.28M"
2021.09.10D18:29:31.124921700| 4 "SPWR" "NASDAQ" "SunPower Corporation" "$1.06B"
2021.09.10D18:29:31.124921700| 5 "DVA" "NYSE" "DaVita Inc." "$12.88B"
2021.09.10D18:29:31.124921700| 6 "BHACW" "NASDAQ" "Barington/Hilco Acquisition Corp." "n/a"
2021.09.10D18:29:31.124921700| 7 "BCRX" "NASDAQ" "BioCryst Pharmaceuticals, Inc." "$446.33M"
..
.qsp.read.fromUpload
Reads data supplied through an HTTP endpoint
.qsp.read.fromUpload[uploadName]
Parameters:
name | type | description | default |
---|---|---|---|
uploadName | symbol or string | Unique name for this fromUpload operator. This must be supplied in the path of HTTP upload requests. The request path will be of the form: https://insights.kx.com/streamprocessor/upload/{uploadName} . The uploadName is passed through in the operator metadata. | Required |
Fully Deterministic
The upload reader is fully deterministic by default. It will use event journaling to record
the raw data it is provided. In a recovery scenario it will replay the journal (from the latest
checkpoint) and return to the state it was in prior to crashing. Since all read events are journalled
it will require enough disk space to write that data. To turn this feature off set the environment variable
KXI_SP_EVENT_JOURNAL
to false.
Limits and HTTP chunking
The upload limit is set to 10MB
. The reader does not currently support HTTP chunking, therefore
the user is responsible for manually chunking their files and making multiple upload requests
if they wish to upload larger files.
No High Availability
Use of this node in a replicated pipeline is not supported and will fail. This will be fixed in a future release.
kdb Insights Enterprise only
The .qsp.read.fromUpload
reader operates only within kdb Insights Enterprise deployments of
the Stream Processor at this time. This functionality does not operate within the kdb Insights
Stream Processor Microservice.
Optional HTTP parameters
The optional HTTP parameters are table
and finish
.
finish
is used to finish the operator and subsequently the pipeline. This can be useful
if we pair the upload reader with a direct write.
The table
parameter is unused by the upload operator, however if provided it will be passed
through in the metadata and could be used by a node downstream e.g. a map node.
The template of the HTTP request to upload data is: (See the Open API Spec for full details).
curl -X POST https://insights.kx.com/streamprocessor/upload/{uploadName}?table=z&finish=true \
--header "Content-Type: application/octet-stream" \
--header "Authorization: Bearer $INSIGHTS_TOKEN" \
--data-binary "@my/file.csv"
The @
symbol in "@my/file.csv" is required to signify the string resolves to a file rather than
a literal.
Example:
Start pipeline using the fromUpload reader.
.qsp.run
.qsp.read.fromUpload["myUniqueName"]
.qsp.decode.csv[.qsp.use``header!``always]
.qsp.write.toConsole[]
Send HTTP request to the coordinator to upload your file data.
>> head quotes.csv
time,ticker,bid,bidSize,ask,askSize
2023-01-09D06:03:54.726000000,AAPL,58.79616,890,5.538282,100
2023-01-09D09:05:49.815000000,AAPL,12.70558,790,47.71454,990
2023-01-10D09:24:01.982000000,AAPL,4.636787,620,50.61855,480
2023-01-10D03:58:45.586000000,MSFT,81.40808,970,23.05427,500
2023-01-10D03:16:20.405000000,FB,44.11634,560,74.11803,330
2023-01-08D09:43:50.155000000,KX,78.12387,650,26.48796,40
2023-01-09D10:55:05.862000000,AAPL,71.60091,840,66.77192,130
2023-01-09D09:03:34.867000000,KX,37.10555,130,69.64077,940
2023-01-08D07:59:00.020000000,TSLA,67.64414,500,43.57184,160
>> curl -X POST https://insights.kx.com/streamprocessor/upload/myUniqueName \
--header "Authorization: Bearer $INSIGHTS_TOKEN"
--data-binary "@quotes.csv"
Worker logs:
| time ticker bid bidSize ask askSize
-----------------------------| ----------------------------------------------------------------------------------
2023.07.20D15:37:31.641574003| "2023-01-09D06:03:54.726000000" "AAPL" "58.79616" "890" "5.538282" "100"
2023.07.20D15:37:31.641574003| "2023-01-09D09:05:49.815000000" "AAPL" "12.70558" "790" "47.71454" "990"
2023.07.20D15:37:31.641574003| "2023-01-10D09:24:01.982000000" "AAPL" "4.636787" "620" "50.61855" "480"
2023.07.20D15:37:31.641574003| "2023-01-10D03:58:45.586000000" "MSFT" "81.40808" "970" "23.05427" "500"
2023.07.20D15:37:31.641574003| "2023-01-10D03:16:20.405000000" "FB" "44.11634" "560" "74.11803" "330"
2023.07.20D15:37:31.641574003| "2023-01-08D09:43:50.155000000" "KX" "78.12387" "650" "26.48796" "40"
2023.07.20D15:37:31.641574003| "2023-01-09D10:55:05.862000000" "AAPL" "71.60091" "840" "66.77192" "130"
2023.07.20D15:37:31.641574003| "2023-01-09D09:03:34.867000000" "KX" "37.10555" "130" "69.64077" "940"
2023.07.20D15:37:31.641574003| "2023-01-08D07:59:00.020000000" "TSLA" "67.64414" "500" "43.57184" "160"
Use the finish parameter to finish the reader after data has been processed. If not provided it defaults to false.
>> curl -X POST https://insights.kx.com/streamprocessor/upload/myUniqueName?finish=true \
--header "Authorization: Bearer $INSIGHTS_TOKEN"
--data-binary "@quotes.csv"
Additionally we can finish the operator without sending any data.
>> curl -X POST https://insights.kx.com/streamprocessor/myUniqueName?finish=true \
--header "Authorization: Bearer $INSIGHTS_TOKEN"
Note that the only Content-Type the HTTP requests can use are application/octet-stream
(the default).
Attempts to use any other Content-Type in the request will fail.
If for example you wish to upload a JSON file upload use content type application/octet-stream
(as opposed to application/json
) and use a JSON decoder in the pipeline.