Readers
.qsp.read fromAmazonS3 reads data from Amazon Web Services S3 buckets fromAzureStorage reads data from Microsoft Azure Blob Storage fromCallback define callback in the q global namespace fromDatabase query an Insights database fromExpr evaluate expression or function into the pipeline fromFile read file contents into pipeline fromGoogleStorage reads data from Amazon Web Services S3 buckets fromKafka consume data from a Kafka topic fromPostgres execute a query against a PostgreSQL database fromSQLServer execute a query against a SQL Server database fromStream read data using a KX Insights stream
Readers are a specialized type of operator that allow users to feed data from different external data sources into a streaming pipeline.
Each reader has its own start
, stop
, setup
and teardown
functions to handle different streaming lifecycle events.
Readers are assumed to be asynchronous and push data into a pipeline using .qsp.push
.
Some readers, such as .qsp.read.fromCallback
and .qsp.read.fromKafka
, have implemented a stream-partitioning interface. When using these readers, the partitions are distributed over multiple Workers, orchestrated by the Controller.
.qsp.read.fromCallback
Define callback in the q global namespace
.qsp.read.fromCallback[callback]
.qsp.read.fromCallback[callback; .qsp.use enlist[`partitions]!enlist partitions]
Parameters:
name | type | description | default |
---|---|---|---|
callback | symbol | The name of the function to define. | Required |
options:
name | type | description | default |
---|---|---|---|
partitions | vector | A list of partition identifiers to distribute over available Workers. | () |
key | symbol or generic null (::) | Name of the field which contains the key of the published event, or generic null for unkeyed data. Only works for dictionary events. | (::) |
For all common arguments, refer to configuring operators
This operator defines callback
as a function in the q global namespace, causing data passed
to the function (locally or over IPC) to enter the pipeline data flow.
Single worker callback:
// This reader creates a monadic callback function that can be executed to push
// data into a pipeline. When the function `publish` is executed on an SP
// Worker with input data, that data is forwarded to subsequent operators in
// the pipeline.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toConsole[]
// Since the callback is defined in the global scope, data can be passed
// directly within the same program. This could be used to define `upd`
// handlers in kdb+ tick, or to wrap other data-acquisition methods such as `.Q.fs`.
publish ([] val: 100?10f ) // Call directly from the same process
upd: enlist[`trade]!enlist publish // Define kdb+ tick upd handler
.Q.fs[publish] `:myfile.json // Stream the file 'myfile.json' of
// '\n'-delimited JSON records
// through the pipeline
Multi-worker callback:
// Creates a partitioned callback reader that can be distributed over multiple
// SP Workers. This is creating a callback for each partition called `publish`
// and assign one of 10 partitions to that callback.
.qsp.run
.qsp.read.fromCallback[.qsp.use `callback`partitions!(`publish; til 10)]
.qsp.write.toConsole[]
.qsp.read.fromDatabase
Reads data from an Insights database
.qsp.read.fromDatabase[query]
.qsp.read.fromDatabase[query; .qsp.use (!) . flip (
(table ; table);
(startTS ; startTS);
(endTS ; endTS);
(filter ; filter);
(groupBy ; groupBy);
(agg ; agg);
(fill ; fill);
(temporality; temporality);
(sortCols ; sortCols);
(retries ; retries);
(retryWait ; retryWait))]
Parameters:
name | type | description | default |
---|---|---|---|
query | string | qSQL query to execute on the database. | Required |
options:
name | type | description | default |
---|---|---|---|
table | symbol | Name of table to retrieve data from. | ` |
startTS | timestamp | Inclusive start time of period of interest. | 0Np |
endTS | timestamp | Exclusive end time of period of interest. | 0Np |
filter | list | List of triadic lists of the form (function;column name;parameter). | () |
groupBy | symbol[] | List of columns to group aggregation result by. | () |
agg | symbol[] | List of triples of aggregations or columns to select. e.g. Aggregation dict example:(`c1`avg`price;`c2`sum`size) , basic select example `price`size . |
() |
fill | symbol | How to handle nulls in the data. Supported values are zero and forward . The zero option fills numeric types with zeroes. The forward option fills nulls with previous, non-null entry. |
`zero |
temporality | symbol | Sets the range of data in view for each day within the query. Supports two types of temporality: snapshot which takes a continuous range of the data, and slice which grabs data between the time values of startTS and endTS parameters, for each date within the start and end timestamp. |
`snapshot |
sortCols | symbol[] | Columns to sort result data on. | () |
retries | long | Number of times to retry the connection to the database. | 5 |
retryWait | timespan | Time to wait between database connection attempts. | 0D00:00:02 |
For all common arguments, refer to configuring operators
This operator issues a query either using qSQL or a functional 'get data' query. If a single
string parameter is provided, a qSQL query is issued. Otherwise, the configuration dictionary is
used, and table
, startTS
and endTS
are considered required fields.
Database Host
In a Kubernetes deployment, the database reader will use the deployment release name to
locate the query service gateway. When using a Docker configuration or to override the
host address of the database, set $KXI_SP_DATABASE_HOST
. The hostname should be the
full URL of the Service Gateway including the port to connect to.
q SQL query:
.qsp.run
.qsp.read.fromDatabase["select from stocks"]
.qsp.write.toConsole[]
| date id sym market name cap
-----------------------------| ---------------------------------------------------------------------------------------------------
2021.09.10D18:29:31.124921700| 2021.09.10 1 "OXLC" "NASDAQ" "Oxford Lane Capital Corp." "$229.56M"
2021.09.10D18:29:31.124921700| 2021.09.10 2 "CIA" "NYSE" "Citizens, Inc." "$350.57M"
2021.09.10D18:29:31.124921700| 2021.09.10 3 "PEI" "NYSE" "Pennsylvania Real Estate Investment Trust" "$785.28M"
2021.09.10D18:29:31.124921700| 2021.09.10 4 "SPWR" "NASDAQ" "SunPower Corporation" "$1.06B"
2021.09.10D18:29:31.124921700| 2021.09.10 5 "DVA" "NYSE" "DaVita Inc." "$12.88B"
2021.09.10D18:29:31.124921700| 2021.09.10 6 "BHACW" "NASDAQ" "Barington/Hilco Acquisition Corp." "n/a"
2021.09.10D18:29:31.124921700| 2021.09.10 7 "BCRX" "NASDAQ" "BioCryst Pharmaceuticals, Inc." "$446.33M"
..
Get data API:
.qsp.run
.qsp.read.fromDatabase[.qsp.use `table`startTS`endTS!(`stocks; .z.p-1D; .z.p)]
.qsp.write.toConsole[]
| date id sym market name cap
-----------------------------| ---------------------------------------------------------------------------------------------------
2021.09.10D18:29:31.124921700| 2021.09.10 1 "OXLC" "NASDAQ" "Oxford Lane Capital Corp." "$229.56M"
2021.09.10D18:29:31.124921700| 2021.09.10 2 "CIA" "NYSE" "Citizens, Inc." "$350.57M"
2021.09.10D18:29:31.124921700| 2021.09.10 3 "PEI" "NYSE" "Pennsylvania Real Estate Investment Trust" "$785.28M"
2021.09.10D18:29:31.124921700| 2021.09.10 4 "SPWR" "NASDAQ" "SunPower Corporation" "$1.06B"
2021.09.10D18:29:31.124921700| 2021.09.10 5 "DVA" "NYSE" "DaVita Inc." "$12.88B"
2021.09.10D18:29:31.124921700| 2021.09.10 6 "BHACW" "NASDAQ" "Barington/Hilco Acquisition Corp." "n/a"
2021.09.10D18:29:31.124921700| 2021.09.10 7 "BCRX" "NASDAQ" "BioCryst Pharmaceuticals, Inc." "$446.33M"
..
Enrich data with stream
stocks: .qsp.read.fromDatabase["select * from stocks"];
// Reads from a stream and filters on a table called 'trades', windows
// the data into 5 second intervals, then enriches the data with a join
// on time. Finally the data is written to a table called 'avgPrice'.
.qsp.run
.qsp.read.fromStream[`trades]
.qsp.window.tumbling[0D00:00:05; `time]
.qsp.merge[stocks; { x lj `sym xkey y }] // Join stocks to trades on sym
.qsp.map[{ select avg price by sym from x }]
.qsp.write.toStream[`avgPrice]
An example output of this pipeline using a toConsole
writer might be as follows.
| sym price
-----------------------------| -------------
2022.02.23D17:38:31.256064000| AAPL 360.1039
2022.02.23D17:38:31.256064000| GOOG 658.463
2022.02.23D17:38:31.256064000| HBP 737.0521
2022.02.23D17:38:31.256064000| NFLX 289.6375
2022.02.23D17:38:31.256064000| PEA 191.2175
..
.qsp.read.fromExpr
Evaluate expression or function into the pipeline
.qsp.read.fromExpr[expr]
Parameters:
name | type | description | default |
---|---|---|---|
expr | string or function | A q expression, or in q only, a nullary synchronous function. | Required |
For all common arguments, refer to configuring operators
This operator evaluates the expression or function, and incorporates the resulting data into the pipeline.
Enter the table refData
into the pipeline:
refData: ([] sensorID: 100?`8; sensorCode: 100?100)
.qsp.run
.qsp.read.fromExpr["refData"]
.qsp.write.toConsole[]
Query a separate process for data:
getSensor: { `:dap:4000 "select from trace where date = .z.d, sensorID = `id" }
.qsp.run
.qsp.read.fromExpr[getSensor]
.qsp.write.toConsole[]
.qsp.read.fromFile
Read file contents into pipeline
.qsp.read.fromFile[path]
.qsp.read.fromFile[path; .qsp.use (!) . flip (
(`mode ; mode);
(`chunking ; chunking);
(`chunkSize; chunkSize))]
Parameters:
name | type | description | default |
---|---|---|---|
path | string | A filepath. | Required |
options:
name | type | description | default |
---|---|---|---|
mode | symbol | Either binary or text . |
binary |
chunking | symbol | Either enabled , disabled or auto . |
auto |
chunkSize | long or string | The size in bytes of chunks to read when chunking is enabled or the number of bytes to read using a size suffix. Sizes that are specified as strings are always assumed to be bytes. | "1MB" |
For all common arguments, refer to configuring operators
This operator reads a file, pushing its contents (bytes or characters) to the pipeline.
File mode
Setting the file mode to binary
will read content as a byte vector. When reading
a file as text
, data will be read as strings and be split on newlines. Each string
vector represents a single line.
File Chunking
Chunking a file splits the file into smaller batches and streams the batches through
the pipeline. If chunking
is set to auto
, the reader will determine the size of
the target file and if it is sufficiently large (more than a few megabytes) it will
be read in chunks.
Reading a file:
`:numbers.txt 0: string 100?1000;
.qsp.run
.qsp.read.fromFile["numbers.txt"]
.qsp.map[{"J"$"\n" vs x}]
.qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
Reading text data by line:
`:numbers.txt 0: string 100?1000;
.qsp.run
.qsp.read.fromFile["numbers.txt"; ``mode!``text]
.qsp.map["J"$]
.qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..
Reading binary data:
`:numbers.bin 1: 100?0x0;
.qsp.run
.qsp.read.fromFile["numbers.bin"]
.qsp.write.toVariable[`output]
output
0xebf68daee782ca3f0156512c444a35e1dfe4bc471594..
.qsp.read.fromGoogleStorage
Reads an object from Google Cloud Storage
.qsp.read.fromGoogleStorage[path]
.qsp.read.fromGoogleStorage[path; .qsp.use (!) . flip (
(`mode ; mode);
(`tenant ; tenant);
(`chunking ; chunking);
(`chunkSize; chunkSize))]
Parameters:
name | type | description | default |
---|---|---|---|
path | string or symbol | The path of an object to read from Google Cloud Storage. | Required |
options:
name | type | description | default |
---|---|---|---|
mode | symbol | Either binary or text . |
binary |
tenant | string | The authentication tenant. | "" |
chunking | symbol | Either enabled , disabled or auto . |
auto |
chunkSize | long or string | The size in bytes of chunks to read when chunking is enabled or the number of bytes to read using a size suffix. Sizes that are specified as strings are always assumed to be bytes. | "1MB" |
For all common arguments, refer to configuring operators
This operator reads an object from a Google Cloud Storage bucket.
Object mode
Setting the object mode to binary
will read content as a byte vector. When reading
an object as text
, data will be read as strings and be split on newlines. Each string
vector represents a single line.
Object chunking
Chunking an object splits the object into smaller batches and streams the batches through
the pipeline. If chunking
is set to auto
, the reader will determine the size of
the target object and if it is sufficiently large (more than a few megabytes) it will
be read in chunks.
Authentication
The Google Cloud Storage reader uses kurl
for credential discovery.
See credential discovery
in the GCP
tab for more information. When running on Google provisioned cloud infrastructure,
the credential discovery will automatically use the credentials of the user that launched the
instance.
Environment variable authentication:
To setup authentication using an environment variable, set GOOGLE_STORAGE_TOKEN
with the output of
running gcloud auth print-access-token
. To install the Google SDK, refer to
these instructions.
For Docker based configurations, set the variables in the worker image configuration.
docker-compose
:
version: "3.3"
services:
worker:
image: registry.dl.kx.com/kxi-sp-worker:1.1.0
environment:
GOOGLE_STORAGE_TOKEN: "123abc"
For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "gs-reader",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
env : { GOOGLE_STORAGE_TOKEN: "123abc" }
}' | jq -asR .)"
Kubernetes secrets for authentication:
When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.
First, create a secret using a generic secret.
kubectl create secret generic --from-literal token=$(gcloud auth print-access-token) gscreds
Next, add the secret name to the Google Cloud Reader reader configuration
.qsp.read.fromGoogleStorage[`:gs://bucket/hello; .qsp.use``credentials!``gscreds]
Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "gs-reader",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
kubeConfig : { secrets : ["gscreds"] }
}' | jq -asR .)"
Examples
Processing an Google Cloud Storage object:
.qsp.run
.qsp.read.fromGoogleStorage[`:gs://bucket/numbers.txt]
.qsp.map[{"J"$"\n" vs x}]
.qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
Reading a text object by line:
.qsp.run
.qsp.read.fromGoogleStorage[`:gs://bucket/numbers.txt; ``mode!``text]
.qsp.map["J"$]
.qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..
Reading with a custom tenant:
.qsp.run
.qsp.read.fromGoogleStorage[`:gs://bucket/object; .qsp.use ``tenant!(""; "custom")]
.qsp.write.toVariable[`output]
.qsp.read.fromHTTP
Requests data from an HTTP(S) endpoint
.qsp.read.fromHTTP[url]
.qsp.read.fromHTTP[url; method]
.qsp.read.fromHTTP[url; method; .qsp.use (!) . flip (
(`body ; body);
(`header ; header);
(`onResponse ; onResponse);
(`followRedirects ; followRedirects);
(`maxRedirects ; maxRedirects);
(`maxRetryAttempts; maxRetryAttempts);
(`timeout ; timeout);
(`tenant ; tenant);
(`insecure ; insecure);
(`binary ; binary);
(`sync ; sync);
(`rejectErrors ; rejectErrors)
)]
Parameters:
name | type | description | default |
---|---|---|---|
url | string or symbol | The URL to send a request to. | Required |
method | string or symbol | The HTTP method for the HTTP request (ex. GET, POST, etc.). | GET |
options:
name | type | description | default |
---|---|---|---|
body | string | The payload of the HTTP request. | "" |
header | dict | A map of header fields to send as part of the request. | ()!() |
onResponse | function | After a response, allows the response to be preprocessed or to trigger another request. Returning :: will process the return from the original request immediately. A return of a string will issue another request with the return value as the URL. A return of a dictionary allows for any of the operator parameters to be reset and a new HTTP request issued. A special 'response' key can be used in the return dictionary to change the payload of the response. If the response key is set to :: , no data is pushed into the pipeline. |
:: |
followRedirects | boolean | If set, any redirects will automatically be followed up to the maximum number of redirects. | 1b |
maxRedirects | long | The maximum number of redirects to follow before reporting an error. | 5 |
maxRetryAttempts | long | The maximum number of times to retry a request that fails due to a timeout. | 10 |
timeout | long | The duration in milliseconds to wait for a request to be completed before reporting an error. | 0W |
tenant | symbol | The request tenant to use for providing request authentication details. | "" |
insecure | boolean | Indicates if unverified server SSL/TLS certificates should be trusted. | 0b |
binary | boolean | Indicates that the resulting payload should be returned as binary data, otherwise text is assumed. | 0b |
sync | boolean | Indicates if this request should be made synchronously or asynchronously. Setting the request to be synchronous will block the process until the request is completed. | 0b |
rejectErrors | boolean | Non-successful response codes will generate an error and stop the pipeline. | 1b |
For all common arguments, refer to configuring operators
Return data from a GET request:
.qsp.run
.qsp.read.fromHTTP["https://example.com"]
.qsp.write.toVariable[`output]
Read paged data:
URL: "https://example.com"
nextPage: {[res] $[0 < count page: res[2]"S"$"next-page"; URL,"?page=",page; ::]}
.qsp.run
.qsp.read.fromHTTP[URL,"?page=0"; .qsp.use``onResponse!(::;nextPage)]
.qsp.write.toVariable[`output]
.qsp.read.fromKafka
Consume data from a Kafka topic
.qsp.read.fromKafka[topic]
.qsp.read.fromKafka[topic; brokers]
.qsp.read.fromKafka[topic; brokers; .qsp.use (!) . flip (
(`retries ; retries);
(`retryWait ; retryWait);
(`pollLimit ; pollLimit);
(`offset ; offset);
(`registry ; registry);
(`includeFields; includeFields);
(`options ; options))]
Parameters:
name | type | description | default |
---|---|---|---|
topic | symbol | The name of a topic. | Required |
brokers | string or string[] | One or more brokers identified as host:port pairs. | "localhost:9092" |
options:
name | type | description | default |
---|---|---|---|
retries | long | Max retry attempts for Kafka API calls. | 10 |
retryWait | timespan | Time to wait between retry attempts. | 0D00:00:02 |
pollLimit | long | Maximum number of records to process in a single poll loop. | 1000 |
offset | dictionary | Partitions to offsets to start reading from. | (`int$())!() |
registry | string | Optional URL to a Kafka Schema Registry. When provided, Kafka Schema Registry mode is enabled, allowing for automatic payload decoding. | "" |
includeFields | boolean | Set to true for Kafka Schema Registry messages to include the field name when decoding Protocol Buffer schemas. | 1b |
options | dictionary | Options to be passed through to Kafka. | ()!() |
For all common arguments, refer to configuring operators
This operator acts as a Kafka consumer, consuming data from a Kafka topic and pushing it to the pipeline.
A Kafka consumer will distribute load across parallel readers to maximize the throughput of a given topic.
Workers that join the stream will start reading from the end of the stream unless an
explicit offset is provided. To start by reading from the beginning of the stream, a special start
symbol can be used as the offset value, as in the Custom Offsets example below.
Beside the above keys, all available Kafka consumer
configuration options
are supported as options in the API, and can be passed in using the options
dictionary.
This includes properties such as socket.timeout.ms
, fetch.message.max.bytes
, etc.
Reserved keys
group.id
, metadata.broker.list
and consumer.id
are reserved values and are maintained
directly by the Kafka reader.
Maximum Poll Limit
The Kafka reader reads multiple messages in a single poll iteration. A global limit is
imposed on the number of messages to read in a single cycle to avoid locking the
process in a read loop only serving Kafka messages. By default this limit is set to
1000 messages. Setting the configuration option pollLimit
will change this value.
This limit is global so if multiple readers set this value, only one will be used.
Running with TLS
It is recommended for all production deployments that TLS be enabled for encrypting data in transit. For more information about configuring TLS, refer to either the microservice tutorial or the platform tutorial depending on the configuration.
Localhost broker:
// Read topic 'numbers' with a default broker at 'localhost:9092'.
// This topic is generating random numbers between 0 and 10
.qsp.run .qsp.read.fromKafka[`numbers] .qsp.write.toConsole[]
2021.03.08D19:19:08.171805000 | 4
2021.03.08D19:19:08.171915000 | 7
2021.03.08D19:19:08.172081000 | 3
..
Multiple brokers:
// Read topic 'numbers' from multiple brokers. This topic is generating
// random numbers between 0 and 10
.qsp.run
.qsp.read.fromKafka[.qsp.use `topic`brokers!(`numbers;("localhost:1234";"localhost:1235"))]
.qsp.write.toConsole[]
2021.03.08D19:19:08.171805000 | 4
2021.03.08D19:19:08.171915000 | 7
2021.03.08D19:19:08.172081000 | 3
..
Advanced configuration:
// Reads topic 'numbers' with a default broker setting custom values for advanced
// consumer configuration.
.qsp.run
.qsp.read.fromKafka[.qsp.use (!) . flip (
(`topic ; `numbers);
(`options; (!) . flip (
(`socket.timeout.ms ; 60000); // Wait 1 minute for socket timeout
(`fetch.message.max.bytes ; 4*1024*1024); // Allow 4MiB max message size
(`fetch.wait.max.ms ; 500); // Wait up to 500ms for fetch cycle
(`queued.max.message.chunks ; 100))))] // Allow up to 100 queued msg chunks
.qsp.write.toConsole[]
2021.03.08D19:19:08.171805000 | 4
2021.03.08D19:19:08.171915000 | 7
2021.03.08D19:19:08.172081000 | 3
..
Custom offsets:
// Reads topic 'numbers' from the end of the data stream. This will skip any
// old messages in the stream and start reading from the point that a given
// worker joins the stream. Note that the default offset for a given stream is
// to read from the beginning of the stream.
.qsp.run
.qsp.read.fromKafka[.qsp.use (!) . flip (
(`topic ; `numbers);
(`offset ; `start))] // The 'offset' field can be set to a single value
// to apply to all partitions. Alternatively it can
// be set to a dictionary of partitions and
// offsets. Offsets can either be the special
// symbols `start or `end or a literal offset index
// as a number. Example: 0 1i!(`start; 10)
.qsp.write.toConsole[]
2021.03.08D19:19:08.171805000 | 8
2021.03.08D19:19:08.171915000 | 1
2021.03.08D19:19:08.172081000 | 9
..
TLS authentication:
For more information on TLS configuration, see either the microservice example or platform example depending on the desired deployment configuration.
// Reads topic 'numbers' from a broker configured at `kafka:9092` using
// custom TLS/SSL certificates.
.qsp.run
.qsp.read.fromKafka[.qsp.use (!) . flip (
(`brokers; "kafka:9092");
(`topic ; `numbers);
(`options; (!) . flip (
(`ssl.secret ; "certs");
(`ssl.key.password ; "iamsecure"))))]
.qsp.write.toConsole[]
2021.03.08D19:19:08.171805000 | 4
2021.03.08D19:19:08.171915000 | 7
2021.03.08D19:19:08.172081000 | 3
..
Kafka Schema Registry:
.qsp.run
.qsp.read.fromKafka[.qsp.use (!) . flip (
(`topic;`numbers);
(`brokers;"localhost:9092");
(`registry;"http://localhost:8081"))]
.qsp.write.toConsole[];
.qsp.read.fromAzureStorage
Reads an object from Microsoft Azure Storage
.qsp.read.fromAzureStorage[path]
.qsp.read.fromAzureStorage[path; account]
.qsp.read.fromAzureStorage[path; account; .qsp.use (!) . flip (
(`mode ; mode);
(`tenant ; tenant);
(`chunking ; chunking);
(`chunkSize; chunkSize))]
Parameters:
name | type | description | default |
---|---|---|---|
path | string or symbol | The path of an object to read from Microsoft Azure Storage. | Required |
account | string or symbol | The name of the Microsoft Azure Storage account to read an object from | $AZURE_STORAGE_ACCOUNT |
options:
name | type | description | default |
---|---|---|---|
mode | symbol | Either binary or text . |
binary |
tenant | string | The authentication tenant. | "" |
chunking | symbol | Either enabled , disabled or auto . |
auto |
chunkSize | long or string | The size in bytes of chunks to read when chunking is enabled or the number of bytes to read using a size suffix. Sizes that are specified as strings are always assumed to be bytes. | "1MB" |
For all common arguments, refer to configuring operators
This operator reads an object from a Microsoft Azure Blob Storage bucket.
Object mode
Setting the object mode to binary
will read content as a byte vector. When reading
an object as text
, data will be read as strings and be split on newlines. Each string
vector represents a single line.
Object chunking
Chunking an object splits the object into smaller batches and streams the batches through
the pipeline. If chunking
is set to auto
, the reader will determine the size of
the target object and if it is sufficiently large (more than a few megabytes) it will
be read in chunks.
Authentication
The Microsoft Azure Storage reader uses kurl
for credential discovery.
See credential discovery
in the Azure
tab for more information. When running on Azure provisioned cloud infrastructure,
the credential discovery will automatically use the credentials of the user that launched the
instance.
Environment variable authentication:
To setup authentication using environment variables, set AZURE_STORAGE_ACCOUNT
to the name
of the storage account to read from and AZURE_STORAGE_SHARED_KEY
to one of the keys for
that account. To use shared keys, refer to
these instructions
For Docker based configurations, set the variables in the worker image configuration.
docker-compose
:
version: "3.3"
services:
worker:
image: registry.dl.kx.com/kxi-sp-worker:1.1.0
environment:
AZURE_STORAGE_ACCOUNT: "123abc"
AZURE_STORAGE_SHARED_KEY: "aAabBbcCcdDd111222333"
For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "ms-reader",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
env : {
AZURE_STORAGE_ACCOUNT: "123abc",
AZURE_STORAGE_SHARED_KEY: "aAabBbcCcdDd111222333"
}
}' | jq -asR .)"
Kubernetes secrets for authentication:
When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.
First, create a secret using a generic secret.
kubectl create secret generic --from-literal token="aAabBbcCcdDd111222333" mscreds
Next, add the secret name and the account name to the Azure Storage Reader reader configuration.
.qsp.read.fromAzureStorage[`:ms://bucket/hello; "abc123"; .qsp.use``credentials!``mscreds]
Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "ms-reader",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
kubeConfig : { secrets : ["mscreds"] }
}' | jq -asR .)"
Examples
Processing an Microsoft Azure Storage object:
.qsp.run
.qsp.read.fromAzureStorage[`:ms://bucket/numbers.txt; `myaccount]
.qsp.map[{"J"$"\n" vs x}]
.qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
Reading a text object by line:
.qsp.run
.qsp.read.fromAzureStorage[`:ms://bucket/numbers.txt; `myaccount; .qsp.use ``mode!``text]
.qsp.map["J"$]
.qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..
Reading with a custom tenant:
.qsp.run
.qsp.read.fromAzureStorage[`:ms://bucket/object; .qsp.use `account`tenant!("myaccount"; "custom")]
.qsp.write.toVariable[`output]
.qsp.read.fromPostgres
Execute query on a PostgreSQL database
.qsp.read.fromPostgres[query]
.qsp.read.fromPostgres[query; database]
.qsp.read.fromPostgres[query; database; .qsp.use (!) . flip (
(`server ; server);
(`post ; post);
(`username; username);
(`password; password))]
Parameters:
name | type | description | default |
---|---|---|---|
query | string | Query to execute on the Postgres database. | Required |
database | string | Name of the database to connect to. | $KXI_SP_POSTGRES_DATABASE |
options:
name | type | description | default |
---|---|---|---|
server | string | Address of the database to connect to. | $KXI_SP_POSTGRES_SERVER |
post | string | Port of database. | $KXI_SP_POSTGRES_PORT |
username | string | Username to authenticate with. | $KXI_SP_POSTGRES_USERNAME |
password | string | Password to authenticate with. | $KXI_SP_POSTGRES_PASSWORD |
For all common arguments, refer to configuring operators
This operator executes a query on a PostgreSQL database and pushes the result into the pipeline.
After the query has completed processing, the Postgres reader will signal a 'finish' command which will teardown the pipeline if there are no other pending requests.
Reading from a table 'stocks':
.qsp.run
.qsp.read.fromPostgres["select * from stocks"]
.qsp.write.toConsole[]
| id sym market name cap
-----------------------------| ---------------------------------------------------------------------------------------
2021.09.10D18:29:31.124921700| 1 "OXLC" "NASDAQ" "Oxford Lane Capital Corp." "$229.56M"
2021.09.10D18:29:31.124921700| 2 "CIA" "NYSE" "Citizens, Inc." "$350.57M"
2021.09.10D18:29:31.124921700| 3 "PEI" "NYSE" "Pennsylvania Real Estate Investment Trust" "$785.28M"
2021.09.10D18:29:31.124921700| 4 "SPWR" "NASDAQ" "SunPower Corporation" "$1.06B"
2021.09.10D18:29:31.124921700| 5 "DVA" "NYSE" "DaVita Inc." "$12.88B"
2021.09.10D18:29:31.124921700| 6 "BHACW" "NASDAQ" "Barington/Hilco Acquisition Corp." "n/a"
2021.09.10D18:29:31.124921700| 7 "BCRX" "NASDAQ" "BioCryst Pharmaceuticals, Inc." "$446.33M"
..
.qsp.read.fromStream
Read data using a KX Insights Stream
.qsp.read.fromStream[table]
.qsp.read.fromStream[table; stream]
.qsp.read.fromStream[table; stream; .qsp.use enlist[`index]!enlist index]
Parameters:
name | type | description | default |
---|---|---|---|
table | symbol or string | A table name. | Required |
stream | symbol or string | The inbound stream | $RT_SUB_TOPIC |
options:
name | type | description | default |
---|---|---|---|
index | long | The position in the stream to replay from. | 0 |
For all common arguments, refer to configuring operators
This operator reads data using KX Insights Reliable Transport Streams.
.qsp.read.fromAmazonS3
Reads a file from AWS S3
.qsp.read.fromAmazonS3[path]
.qsp.read.fromAmazonS3[path; .qsp.use (!) . flip (
(`mode ; mode);
(`tenant ; tenant);
(`region ; region);
(`chunking ; chunking);
(`chunkSize; chunkSize))]
Parameters:
name | type | description | default |
---|---|---|---|
path | symbol | The path of an object to read from S3. | Required |
options:
name | type | description | default |
---|---|---|---|
mode | long | Either binary or text . |
binary |
tenant | symbol | The authentication tenant. | ` |
region | string | The AWS region to authenticate against. | us-east-1 |
chunking | string | Either enabled , disabled or auto . |
auto |
chunkSize | long or string | The size in bytes of chunks to read when chunking is enabled or the number of bytes to read using a size suffix. Sizes that are specified as strings are always assumed to be bytes. | "1MB" |
For all common arguments, refer to configuring operators
This operator reads an object from an Amazon Web Services S3 bucket as either text or binary.
File mode
Setting the file mode to binary
will read content as a byte vector. When reading
a file as text
, data will be read as strings and be split on newlines. Each string
vector represents a single line.
File chunking
Chunking a file splits the file into smaller batches and streams the batches through
the pipeline. If chunking
is set to auto
, the reader will determine the size of
the target file and if it is sufficiently large (more than a few megabytes) it will
be read in chunks.
Authentication
The S3 reader uses kurl
for credential discovery.
See credential discovery
in the AWS tab for more information.
Environment variable authentication:
To setup authentication using environment variables, set AWS_ACCESS_KEY_ID
and
AWS_SECRET_ACCESS_KEY
.
For Docker based configurations, set the variables in the worker image configuration.
docker-compose
:
version: "3.3"
services:
worker:
image: registry.dl.kx.com/kxi-sp-worker:1.1.0
environment:
AWS_ACCESS_KEY_ID: "abcd"
AWS_SECRET_ACCESS_KEY: "iamasecret"
Docker credentials file for authentication:
To load a custom configuration file into a Docker deployment, mount the credentials file
directory into the container and set KXI_SP_CONFIG_PATH
to point to the configuration
directory.
version: "3.3"
services:
worker:
image: registry.dl.kx.com/kxi-sp-worker:1.1.0
volumes:
- $HOME/.aws/:/config/awscreds
environment:
KXI_SP_CONFIG_PATH: "/config"
Next, add the secret name to the S3 reader configuration
.qsp.read.fromAmazonS3[`:s3://bucket/hello; .qsp.use``credentials!``awscreds]
Now deploying the pipeline will read the credentials from the AWS credentials file. Note
that the credentials file name must be credentials
.
For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "s3-reader",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
env : { AWS_ACCESS_KEY_ID: "abcd", AWS_SECRET_ACCESS_KEY: "iamasecret" }
}' | jq -asR .)"
Kubernetes secrets for authentication:
When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.
First, create a secret using an AWS credentials file.
kubectl create secret generic --from-file credentials=$HOME/.aws/credentials awscreds
Next, add the secret name to the S3 reader configuration
.qsp.read.fromAmazonS3[`:s3://bucket/hello; .qsp.use``credentials!``awscreds]
Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "s3-reader",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
kubeConfig : { secrets : ["awscreds"] }
}' | jq -asR .)"
Examples
Processing an AWS S3 object:
.qsp.run
.qsp.read.fromAmazonS3[`:s3://bucket/numbers.txt]
.qsp.map[{"J"$"\n" vs x}]
.qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
Reading a text object by line:
.qsp.run
.qsp.read.fromAmazonS3[`:s3://bucket/numbers.txt; ``mode!``text]
.qsp.map["J"$]
.qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..
Reading with a custom region and tenant:
.qsp.run
.qsp.read.fromAmazonS3[`:s3://bucket/object; `tenant`region!("custom"; "us-west-1")]
.qsp.write.toVariable[`output]
.qsp.read.fromSQLServer
Execute a query on an SQL Server database
.qsp.read.fromSQLServer[query]
.qsp.read.fromSQLServer[query; database]
.qsp.read.fromSQLServer[query; database; .qsp.use (!) . flip (
(`server ; server);
(`post ; post);
(`username; username);
(`password; password))]
Parameters:
name | type | description | default |
---|---|---|---|
query | string | Query to execute on the SQL Server database. | Required |
database | string | Name of the database to connect to. | $KXI_SP_SQLSERVER_DATABASE |
options:
name | type | description | default |
---|---|---|---|
server | string | Address of the database. | $KXI_SP_SQLSERVER_SERVER |
post | string | Port of database. | $KXI_SP_SQLSERVER_PORT |
username | string | Username to authenticate with. | $KXI_SP_SQLSERVER_USERNAME |
password | string | Password to authenticate with. | $KXI_SP_SQLSERVER_PASSWORD |
For all common arguments, refer to configuring operators
This operator executes a query on a Microsoft SQL Server database and pushes the result into the pipeline.
After the query has completed processing, the SQL Server reader will signal a 'finish' command which will teardown the pipeline if there are no other pending requests.
Reading from a table 'stocks':
.qsp.run
.qsp.read.fromSQLServer["select * from stocks"]
.qsp.write.toConsole[]
| id sym market name cap
-----------------------------| ---------------------------------------------------------------------------------------
2021.09.10D18:29:31.124921700| 1 "OXLC" "NASDAQ" "Oxford Lane Capital Corp." "$229.56M"
2021.09.10D18:29:31.124921700| 2 "CIA" "NYSE" "Citizens, Inc." "$350.57M"
2021.09.10D18:29:31.124921700| 3 "PEI" "NYSE" "Pennsylvania Real Estate Investment Trust" "$785.28M"
2021.09.10D18:29:31.124921700| 4 "SPWR" "NASDAQ" "SunPower Corporation" "$1.06B"
2021.09.10D18:29:31.124921700| 5 "DVA" "NYSE" "DaVita Inc." "$12.88B"
2021.09.10D18:29:31.124921700| 6 "BHACW" "NASDAQ" "Barington/Hilco Acquisition Corp." "n/a"
2021.09.10D18:29:31.124921700| 7 "BCRX" "NASDAQ" "BioCryst Pharmaceuticals, Inc." "$446.33M"
..