Readers
This page presents methods to start data flow through a pipeline by using readers.
Readers are a specialized type of operator that allow users to feed data from different external data sources into a streaming pipeline.
Each reader has its own start, stop, setup and teardown functions to handle different streaming lifecycle events.
Readers are assumed to be asynchronous and push data into a pipeline using Push.
Some readers, such as the Callback and Kafka readers, have implemented a stream-partitioning interface. When using these readers, the partitions are distributed over multiple Workers, orchestrated by the Controller. See the notes on Scaling for more details:
Amazon S3
Reads files from Amazon S3 Storage.
.qsp.read.fromAmazonS3[path; region]
.qsp.read.fromAmazonS3[path; region; .qsp.use (!) . flip (
    (`mode       ; mode);
    (`tenant     ; tenant);
    (`chunking   ; chunking);
    (`chunkSize  ; chunkSize);
    (`credentials; credentials);
    (`watch      ; watch))]
Parameters:
| name | type | description | default | 
|---|---|---|---|
| path | symbol or symbol[] or string or string[] | The S3 URI of an object or multiple objects to read from S3. Glob patterns are supported. | Required | 
| region | string | The AWS region to authenticate against. | Required | 
options:
| name | type | description | default | 
|---|---|---|---|
| mode | symbol | Either binaryortext. | binary | 
| tenant | string | The authentication tenant. | ` | 
| chunking | symbol | Either enabled,disabledorauto. | auto | 
| chunkSize | long or string | The size in bytes of chunks to read when chunking is enabled or the number of bytes to read using a size suffix. Sizes that are specified as strings are always assumed to be bytes. | "1MB" | 
| credentials | string | Name of a credentials secret to mount. See the authentication section below for details. | "" | 
| domain | string | The name of your domain. | "" | 
| watch | boolean or dictionary | Either 0bor1bor a dictionary with keysmethodandfrequency. The only current method istimer. The default frequency is0D00:00:05. If watch set to1buse the watch defaults. | 0b | 
For all common arguments, refer to configuring operators
This operator reads an object from an Amazon Web Services S3 bucket as either text or binary.
File mode
Setting the file mode to binary will read content as a byte vector. When reading
 a file as text, data will be read as strings and be split on newlines. Each string
 vector represents a single line.
Partitioning
The pipeline will automatically scale the number of workers to the minimum of the number
 of file paths and the maxWorkers in the pipeline settings. The workers will read a
 disjoint union of the paths. You can disable partitioning by setting  maxWorkers
 to 1. Note that partitioning is not currently possible when there is more than one reader.
File watching
Setting the file watch to 1b will enable file watching with default settings. Usually
 this will be paired with a glob pattern for the file path or list of glob patterns.
 The reader will continually watch for new files matching the file path(s) provided using
 the watch method. Partitioning is not possible when using file watching. Note that by using
 watching the pipeline will continue watching until there is manual intervention to finish
 the pipeline. Also note, at present the file watcher will do nothing other than print a
 warning if a file is modified. In particular for files that are appended to, the new data
 will not be processed.
File watching should not to be used with the directWrite option in .qsp.write.toDatabase
You should not set file watch to 1b when using the directWrite in Database Writer since directWrite relies on a definitive finish point which is not guaranteed when using the file watcher.
This is not an issue with .qsp.v2.write.toDatabase.
File chunking
Chunking a file splits the file into smaller batches and streams the batches through
 the pipeline. If chunking is set to auto, the reader will determine the size of
 the target file and if it is sufficiently large (more than a few megabytes) it will
 be read in chunks.
Operator can only read from one domain and tenant
When reading in multiple files there must only be one domain and tenant used to access all file paths passed into the operator.
Examples
Processing an AWS S3 object:
.qsp.run
  .qsp.read.fromAmazonS3[`:s3://bucket/numbers.txt; "us-east-1"]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
Processing multiple AWS S3 objects:
.qsp.run
  .qsp.read.fromAmazonS3[(`:s3://bucket/numbersOne.txt;`:s3://bucket/numbersTwo.txt); "us-east-1";]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
2021.09.10D10:28:37.177672179 | 435
2021.09.10D10:28:37.177672179 | 365
2021.09.10D10:28:37.177672179 | 452
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 42
..
Processing multiple AWS S3 objects using glob pattern matching:
.qsp.run
  .qsp.read.fromAmazonS3["s3://bucket/numbers*.txt"; "us-east-1";]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
2021.09.10D10:28:37.177672179 | 435
2021.09.10D10:28:37.177672179 | 365
2021.09.10D10:28:37.177672179 | 452
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 42
..
File Watching:
.qsp.run
  .qsp.read.fromAmazonS3["s3://bucket/numbers*.txt"; "us-east-1"; .qsp.use ``watch!(`;1b)]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2023.06.15D10:49:19.795351384 | 556
2023.06.15D10:49:19.795351384 | 465
2023.06.15D10:49:19.795351384 | 63
2023.06.15D10:49:19.795351384 | 106
2023.06.15D10:49:19.795351384 | 46
2023.06.15D10:49:20.352682229 | 435
2023.06.15D10:49:20.352682229 | 365
2023.06.15D10:49:20.352682229 | 452
2023.06.15D10:49:20.352682229 | 44
2023.06.15D10:49:20.352682229 | 42
If next we upload numbersThree.txt to the bucket we see:
2023.06.15D10:49:42.323727244 | 908
2023.06.15D10:49:42.323727244 | 360
2023.06.15D10:49:42.323727244 | 522
2023.06.15D10:49:42.323727244 | 257
2023.06.15D10:49:42.323727244 | 858
Reading a text object by line:
.qsp.run
  .qsp.read.fromAmazonS3[`:s3://bucket/numbers.txt; "us-east-1"; .qsp.use ``mode!``text]
  .qsp.map["J"$]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..
Reading multiple text objects by line:
.qsp.run
  .qsp.read.fromAmazonS3[(`:s3://bucket/numbersOne.txt;`:s3://bucket/numbersTwo.txt); "us-east-1";
     .qsp.use ``mode!``text]
  .qsp.map["J"$]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..
2021.09.10D10:28:37.177672179 | 435
2021.09.10D10:28:37.177672179 | 365
2021.09.10D10:28:37.177672179 | 452
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 42
..
Reading with a custom region and tenant:
.qsp.run
  .qsp.read.fromAmazonS3[`:s3://bucket/object; "us-west-1" ;(enlist `tenant)!enlist "custom"]
  .qsp.write.toVariable[`output]
Subscribing to file events:
An API exists which allows you to subscribe to file progress events and track the progress of file readers. The docs describe how to utilize it with examples.
sp.read.from_amazon_s3(':s3://production-bucket/trades.csv', mode='text',region='us-east-2')
Parameters:
| name | type | description | default | 
|---|---|---|---|
| path | symbol or symbol[] or string or string[] | The S3 URI of an object or multiple objects to read from S3. Glob patterns are supported. | Required | 
| region | string | The AWS region to authenticate against. | Required | 
options:
| name | type | description | default | 
|---|---|---|---|
| mode | symbol | How the content of the file should be interpreted by the reader. This must be a member of the sp.read.FileMode enumeration. | binary | 
| tenant | string | The authorization tenant. | ` | 
| offset | long | DEPRECATED How many bytes into the file reading should begin. | 0 | 
| chunking | symbol | A FileChunkingenum value, or string equivalent. | auto | 
| chunk_size | long or string | The size of chunks to read when chunking is enabled. Can be specified as an integer number of bytes, or as a string with the unit, e.g. '1MB'. | "1MB" | 
| domain | string | A custom Amazon S3 domain. | "" | 
| credentials | string | The secret name for the Amazon S3 credentials. Refer to the authentication section below for more information. | "" | 
| watch | boolean or dictionary | Either True, False, or a dictionary containing method(a string) andfrequency(a timedelta). The only current method is"timer". The default frequency istimedelta(seconds=5). If watch is set to True, use the watch defaults. | 0b | 
Returns:
A from_amazon_s3 reader, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_amazon_s3(':s3://mybucket/numbers1.txt', mode='text',region='us-east-2')
        | sp.write.to_variable('out'))
>>> kx.q('out')
"556"
"465"
"63"
"106"
"46"
Required Permissions
This reader requires the ability to list and retrieve objects on the defined S3
bucket and as such requires the credentials provided by the user to have access to the
s3:ListBucket, s3:GetObject and s3:ListObjects ACLs, see
here
for more information.
Authentication
The S3 reader uses kurl for credential discovery.
See credential discovery
in the AWS tab for more information.
Environment variable authentication:
To setup authentication using environment variables, set AWS_ACCESS_KEY_ID and
AWS_SECRET_ACCESS_KEY.
For Docker based configurations, set the variables in the worker image configuration.
docker-compose:
version: "3.3"
services:
  worker:
    image: portal.dl.kx.com/kxi-sp-worker:1.13.0
    environment:
      AWS_ACCESS_KEY_ID: "abcd"
      AWS_SECRET_ACCESS_KEY: "iamasecret"
Docker credentials file for authentication:
To load a custom configuration file into a Docker deployment, mount the credentials file
directory into the container and set KXI_SP_CONFIG_PATH to point to the configuration
directory.
version: "3.3"
services:
  worker:
    image: portal.dl.kx.com/kxi-sp-worker:1.13.0
    volumes:
      - $HOME/.aws/:/config/awscreds
    environment:
      KXI_SP_CONFIG_PATH: "/config"
Next, add the secret name to the S3 reader configuration
.qsp.read.fromAmazonS3[`:s3://bucket/hello; "us-east-1"; .qsp.use``credentials!``awscreds]
Now deploying the pipeline will read the credentials from the AWS credentials file. Note
that the credentials file name must be credentials.
For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.
curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name     : "s3-reader",
        type     : "spec",
        config   : { content: $spec },
        settings : { minWorkers: "1", maxWorkers: "10" },
        env      : { AWS_ACCESS_KEY_ID: "abcd", AWS_SECRET_ACCESS_KEY: "iamasecret" }
    }' | jq -asR .)"
Kubernetes secrets for authentication:
When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.
First, create a secret using an AWS credentials file. Take care to ensure the secret is created in the correct namespace.
kubectl create secret generic --from-file credentials=$HOME/.aws/credentials awscreds
Note that the secret must be formatted so that the variable names are lower case and there is a space either side of the equals sign like so:
[default]
aws_access_key_id = abcd1234
aws_secret_access_key = abcd1234
Next, add the secret name to the S3 reader configuration
.qsp.read.fromAmazonS3[`:s3://bucket/hello; "us-east-1"; .qsp.use``credentials!``awscreds]
Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.
curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "s3-reader",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets : ["awscreds"] }
    }' | jq -asR .)"
Callback
Reads data from a callback function from the local process or over IPC.
.qsp.read.fromCallback[callback]
.qsp.read.fromCallback[callback; .qsp.use (!) . flip (
    (`partitions; partitions);
    (`key       ; key);
    (`replay    ; replay))]
Parameters:
| name | type | description | default | 
|---|---|---|---|
| callback | symbol | The name of the function to define. | Required | 
options:
| name | type | description | default | 
|---|---|---|---|
| partitions | vector | A list of partition identifiers to distribute over available Workers. | () | 
| key | symbol or generic null (::) | Name of the field which contains the key of the published event, or generic null for unkeyed data. Only works for dictionary events. | (::) | 
| replay | boolean | (Beta Feature) If true, message replay is enabled for the Callback reader. On recovery, messages that arrived after the last checkpoint will be pushed to the pipeline. | 0b | 
| external | boolean | Allow users to execute the callback function via an IPC connection. | 0b | 
For all common arguments, refer to configuring operators
Beta Features
To enable beta features, set the environment variable KXI_SP_BETA_FEATURES to true. See Beta Feature Usage Terms.
This operator defines callback as a function in the q global namespace, causing data passed
to the function (locally or over IPC) to enter the pipeline data flow.
If using replay, you must set $KXI_SP_EVENT_JOURNAL to "true", and set $KXI_SP_JOURNAL_DIR to
the directory where you'd like the event journals to be stored. Event journals are files
containing the messages published to the callback reader. Messages are replayed from the journals
on recovery. If $KXI_SP_JOURNAL_DIR is not set, it defaults to being
$KXI_SP_CHECKPOINT_DIR/journals.
Note that enabling replay adversely affects performance, and is disabled by default for this reason.
Single worker callback:
// This reader creates a monadic callback function that can be executed to push
// data into a pipeline. When the function `publish` is executed on an SP
// Worker with input data, that data is forwarded to subsequent operators in
// the pipeline.
.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.write.toConsole[]
// Since the callback is defined in the global scope, data can be passed
// directly within the same program. This could be used to define `upd`
// handlers in kdb+ tick, or to wrap other data-acquisition methods such as `.Q.fs`.
publish ([] val: 100?10f )           // Call directly from the same process
upd: enlist[`trade]!enlist publish   // Define kdb+ tick upd handler
.Q.fs[publish] `:myfile.json         // Stream the file 'myfile.json' of
                                     // '\n'-delimited JSON records
                                     // through the pipeline
Multi-worker callback:
// Creates a partitioned callback reader that can be distributed over multiple
// SP Workers. This is creating a callback for each partition called `publish`
// and assign one of 10 partitions to that callback.
.qsp.run
  .qsp.read.fromCallback[.qsp.use `callback`partitions!(`publish; til 10)]
  .qsp.write.toConsole[]
Callback with message replay:
`KXI_SP_JOURNAL_DIR setenv "/tmp/journals";
.qsp.run
  .qsp.read.fromCallback[`publish; .qsp.use ``replay!(::;1b)]
  .qsp.write.toConsole[];
publish 0
publish 1
2023.02.08D17:18:21.593351490 | 0
2023.02.08D17:18:22.106658246 | 1
2023-02-08 17:19:41.285 [] INFO  SP Finished setup
2023-02-08 17:19:41.285 [] INFO  SP Starting readers...
2023-02-08 17:19:41.285 [] INFO  SPREAD Performing Callback reader replay, id=callback_publish
2023.02.08D17:19:41.285481189 | 0
2023.02.08D17:19:41.285516250 | 1
2023-02-08 17:19:41.285 [] INFO  SP Readers started
sp.read.from_callback('receivingFunction')
Parameters:
| name | type | description | default | 
|---|---|---|---|
| callback | symbol | The name of the callback function that will be defined in the global q namespace. | Required | 
options:
| name | type | description | default | 
|---|---|---|---|
| partitions | vector | A list of partition identifiers to distribute over available workers. | () | 
| key | symbol or generic null (::) | Name of the field which contains the key of the published event, or Nonefor unkeyed data. | (::) | 
| replay | boolean | (Beta feature) If True, message replay is enabled for the Callback reader. On recovery, messages that arrived after the last checkpoint will be pushed to the pipeline. | 0b | 
| external | boolean | Allow users to execute the callback function via an IPC connection. | 0b | 
Beta Features
To enable beta features, set the environment variable KXI_SP_BETA_FEATURES to true. See Beta Feature Usage Terms.
Returns:
    A from_callback reader, which can be joined to other operators or pipelines.
Global Varible
This operator defines callback as a function in the q global namespace, causing data
passed to the function (locally or over IPC) to enter the pipeline data flow.
Using Replay
If using replay, you must set $KXI_SP_EVENT_JOURNAL to "true", and set
$KXI_SP_JOURNAL_DIR to the directory where you'd like the event journals to be
stored. Event journals are files containing the messages published to the callback
reader. Messages are replayed from the journals on recovery. If $KXI_SP_JOURNAL_DIR
is not set, it defaults to being $KXI_SP_CHECKPOINT_DIR/journals.
Note that enabling replay adversely affects performance, and is disabled by default for this reason.
Python strings
PyKX automatically converts Python strings to q symbols which can cause issues in the case where your callback returns a string for use in a subsequent SP operation. If your data should be treated as a string, you should explicitly convert it using pykx.CharVector.
Examples:
Calling a callback multiple times, with a window to batch the data:
>>> import numpy as np
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish')
        | sp.window.timer(np.timedelta64(10, 's'), count_trigger=12)
        | sp.map(lambda x: x * x)
        | sp.write.to_console(timestamp='none'))
>>> kx.q('publish', range(4))
>>> kx.q('publish', range(4))
>>> kx.q('publish', range(10))
0 1 4 9 0 1 4 9 0 1 4 9 16 25 36 49 64 81
Callback with message replay:
>>> import os
>>> from kxi import sp
>>> import pykx as kx
>>> os.makedirs('/tmp/journals')
>>> os.environ['KXI_SP_JOURNAL_DIR'] = '/tmp/journals'
>>> pipeline = (sp.read.from_callback('publish', replay=True)
        | sp.write.to_variable('out'))
>>> sp.run(pipeline)
>>> kx.q('publish', 0)
>>> kx.q('publish', 1)
>>> kx.q('out')
0 1
>>> sp.teardown()
>>> sp.run(pipeline)
>>> kx.q('out')
0 1
Callback with a string as argument:
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish')
        | sp.decode.csv(schema={'name': 'string', 'id': 'int'})
        | sp.write.to_variable('out'))
>>> data = kx.CharVector("\\n".join(
      ["name,id",
      "User1,1",
      "User2,2",
      "User3,3"]))
>>> kx.q('publish', data)
>>> kx.q('out')
name    id
----------
"User1" 1
"User2" 2
"User3" 3
Callback with a list of strings as argument, returning a list of symbols in q:
>>> import pykx as kx
>>> from kxi import sp
>>> sp.run(sp.read.from_callback('publish')
        | sp.write.to_variable('out'))
>>> kx.q('publish', ["abc", "def"])
>>> kx.q('out')
`abc`def
Expression
Evaluates expression or function into a pipeline.
.qsp.read.fromExpr[expr]
.qsp.read.fromExpr[expr; .qsp.use enlist[`trigger]!enlist trigger]
Parameters:
| name | type | description | default | 
|---|---|---|---|
| expr | string or function | A q expression, or in q only, a nullary synchronous function. | Required | 
| trigger | symbol or list | How this reader should be triggered. See pull reader options for details. | `once | 
For all common arguments, refer to configuring operators.
This operator evaluates the expression or function, and incorporates the resulting data into the pipeline.
Enter the table refData into the pipeline:
refData: ([] sensorID: 100?`8; sensorCode: 100?100)
.qsp.run
  .qsp.read.fromExpr["refData"]
  .qsp.write.toConsole[]
Query a separate process for data:
getSensor: { `:dap:4000 "select from trace where date = .z.d, sensorID = `id" }
.qsp.run
  .qsp.read.fromExpr[getSensor]
  .qsp.write.toConsole[]
sp.read.from_expr('([]time:enlist .z.p;sym:`AAA;price:1.23)')
Parameters:
| name | type | description | default | 
|---|---|---|---|
| expr | string or function | An expression which produces data when evaluated. This can either be a string or a nullary Python function. | Required | 
| trigger | symbol or list | How this reader should be triggered. See pull reader options for details. | `once | 
Returns:
    A from_expr reader, which can be joined to other operators or pipelines.
Restricted Python Support
The expression reader supports two types of inputs: - string - executed as a q statement - Python function - executed in Python and returns an object which is converted to q. Python nullary functions (including lambdas) are supported, i.e. function arguments are not supported.
Python strings
PyKX automatically converts Python strings to q symbols which can cause issues in the case where your callable expression returns a string for use in a subsequent SP operation. If your data should be treated as a string, you should explicitly convert it using pykx.CharVector.
Examples:
Read from a nullary Python function:
>>> from kxi import sp
>>> import pykx as kx
>>> def nullary_func():
        return range(0, 10)
>>> sp.run(sp.read.from_expr(nullary_func)
        | sp.write.to_variable('out'))
>>> kx.q('out')
0 1 2 3 4 5 6 7 8 9
Execute a lambda and return a q string:
>>> import pykx as kx
>>> from kxi import sp
>>> data = kx.CharVector("\\n".join(
    ["name,id,quantity",
    "a,1,4",
    "b,2,5",
    "c,3,6"]))
>>> sp.run(sp.read.from_expr(lambda: data)
        | sp.decode.csv({'name': 'string', 'id': 'int', 'quantity': 'int'})
        | sp.write.to_variable('out'))
>>> kx.q('out')
name id quantity
----------------
,"a" 1  4
,"b" 2  5
,"c" 3  6
Read from from a q variable as a string:
>>> from kxi import sp
>>> sp.teardown()
>>> import pykx as kx
>>> kx.q('counter:10')
>>> sp.run(sp.read.from_expr('counter')
        | sp.write.to_variable('out'))
>>> kx.q('out')
,10
Read from a q expression as a string:
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_expr('til 10')
        | sp.write.to_variable('out'))
>>> kx.q('out')
0 1 2 3 4 5 6 7 8 9
Read from a q function as a string:
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_expr('sumFunction: {[x; y] x + y} ; sumFunction[3; 5]')
    | sp.write.to_variable('out'))
>>> kx.q('out')
,8
File
Reads file contents into a pipeline.
.qsp.read.fromFile[path]
.qsp.read.fromFile[path; .qsp.use (!) . flip (
    (`mode     ; mode);
    (`chunking ; chunking);
    (`chunkSize; chunkSize);
    (`watch    ; watch))]
Parameters:
| name | type | description | default | 
|---|---|---|---|
| path | symbol or symbol[] or string or string[] | A filepath or list of file paths. Glob patterns are supported. | Required | 
options:
| name | type | description | default | 
|---|---|---|---|
| mode | symbol | Either binaryortext. | binary | 
| offset | long | How many bytes into the file reading should begin. | |
| chunking | symbol | Either enabled,disabledorauto. | auto | 
| chunkSize | long or string | The size in bytes of chunks to read when chunking is enabled or the number of bytes to read using a size suffix. Sizes that are specified as strings are always assumed to be bytes. | "1MB" | 
| watch | boolean or dictionary | Either 0bor1bor a dictionary with keysmethodandfrequency. The only current method istimer. The default frequency is0D00:00:05. If watch set to1buse the watch defaults. | 0b | 
For all common arguments, refer to configuring operators
This operator reads one or more files, pushing their contents (bytes or characters) to the pipeline.
File mode
Setting the file mode to binary will read content as a byte vector. When reading
 file(s) as text, data will be read as strings and be split on newlines. Each string
 represents a single line.
File Chunking
Chunking a file splits the file into smaller batches and streams the batches through
 the pipeline. If chunking is set to auto, the reader will determine the size of
 the target file and if it is sufficiently large (more than a few megabytes) it will
 be read in chunks.
File watching
Setting the file watch to 1b will enable file watching with default settings. Usually
 this will be paired with a glob pattern for the file path or list of glob patterns.
 The reader will continually watch for new files matching the file path(s) provided using
 the watch method. Partitioning is not possible when using file watching. Note that by using
 watching the pipeline will continue watching until there is manual intervention to finish
 the pipeline. Also note, at present the file watcher will do nothing other than print a
 warning if a file is modified. In particular for files that are appended to, the new data
 will not be processed.
File watching should not to be used with the directWrite option in .qsp.write.toDatabase
You should not set file watch to 1b when using the directWrite in Database Writer since directWrite relies on a definitive finish point which is not guaranteed when using the file watcher.
This is not an issue with .qsp.v2.write.toDatabase.
Reading a file:
`:numbers.txt 0: string 100?1000;
.qsp.run
  .qsp.read.fromFile["numbers.txt"]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
Reading multiple files:
`:numbersOne.txt 0: string til 5;
`:numbersTwo.txt 0: string 5 + til 5;
.qsp.run
  .qsp.read.fromFile[("numbersOne.txt";"numbersTwo.txt"); .qsp.use``mode!``text]
  .qsp.map[{[md; data] (md; "c"$data)}; .qsp.use ``params!(::; `metadata`data)]
  .qsp.write.toConsole[]
2022.10.19D16:57:37.385577001 | `offset`key!(10;`numbersOne.txt)
2022.10.19D16:57:37.385577001 | (,"0";,"1";,"2";,"3";,"4")
2022.10.19D16:57:37.393146238 | `offset`key!(10;`numbersTwo.txt)
2022.10.19D16:57:37.393146238 | (,"5";,"6";,"7";,"8";,"9")
Reading text data by line:
`:numbers.txt 0: string 100?1000;
.qsp.run
  .qsp.read.fromFile["numbers.txt"; ``mode!``text]
  .qsp.map["J"$]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..
Reading binary data:
`:numbers.bin 1: 100?0x0;
.qsp.run
  .qsp.read.fromFile["numbers.bin"]
  .qsp.write.toVariable[`output]
output
0xebf68daee782ca3f0156512c444a35e1dfe4bc471594..
File Watching:
`:/tmp/numbers.txt 0: string til 5;
.qsp.run
  .qsp.read.fromFile["/tmp/number*.txt"; .qsp.use `mode`watch!(`text;1b)]
  .qsp.write.toConsole[]
2024.08.19D16:42:59.888120571 | ,"0"
2024.08.19D16:42:59.888120571 | ,"1"
2024.08.19D16:42:59.888120571 | ,"2"
2024.08.19D16:42:59.888120571 | ,"3"
2024.08.19D16:42:59.888120571 | ,"4"
`:/tmp/numberOnes.txt 0: string 1 1 1;
2024.08.19D16:45:05.122681552 | ,"1"
2024.08.19D16:45:05.122681552 | ,"1"
2024.08.19D16:45:05.122681552 | ,"1"
Subscribing to file events:
An API exists which allows you to subscribe to file progress events and track the progress of file readers. The docs describe how to utilize it with examples.
sp.read.from_file('trades.csv')
Parameters:
| name | type | description | default | 
|---|---|---|---|
| path | symbol or symbol[] or string or string[] | A filepath or list of file paths. Glob patterns are supported. | Required | 
options:
| name | type | description | default | 
|---|---|---|---|
| mode | symbol | How the content of the file should be interpreted by the reader. This must be a member of the sp.read.FileMode enumeration. | binary | 
| offset | long | How many bytes into the file reading should begin. | 0 | 
| chunking | symbol | If/how the file should be split into chunks. | auto | 
| chunk_size | long or string | The size of chunks to read when chunking is enabled. Can be specified as an integer number of bytes, or as a string with the unit, e.g. '1MB'. | "1MB" | 
| watch | boolean or dictionary | Either True, False, or a dictionary containing method(a string) andfrequency(a timedelta). The only current method is"timer". The default frequency istimedelta(seconds=5). If watch is set to True, use the watch defaults. | 0b | 
Returns:
A from_file reader, which can be joined to other operators or pipelines.
Examples:
Local file watching:
>>> from kxi import sp
>>> file = 'numbers*.txt'
>>> sp.run(sp.read.from_file(file, watch=True)
        | sp.write.to_console(timestamp='none'))
,"0"
,"1"
,"2"
,"3"
,"4"
If next we upload numbersThree.txt we see:
,"5"
,"6"
,"7"
,"8"
,"9"
Google Cloud Storage
Reads files from Google Cloud Storage.
.qsp.read.fromGoogleStorage[path]
.qsp.read.fromGoogleStorage[path; .qsp.use (!) . flip (
    (`project    ; project);
    (`mode       ; mode);
    (`tenant     ; tenant);
    (`chunking   ; chunking);
    (`chunkSize  ; chunkSize);
    (`credentials; credentials);
    (`watch      ; watch))]
Parameters:
| name | type | description | default | 
|---|---|---|---|
| path | string or string[] or symbol or symbol[] | The Cloud Storage URI of an object or multiple objects to read from Google Cloud Storage. Glob patterns are supported. | Required | 
options:
| name | type | description | default | 
|---|---|---|---|
| project | string | The Google Cloud Storage project ID. | "" | 
| mode | symbol | Either binaryortext. | binary | 
| tenant | string | The authentication tenant. | "" | 
| chunking | symbol | Either enabled,disabledorauto. | auto | 
| chunkSize | long or string | The size in bytes of chunks to read when chunking is enabled or the number of bytes to read using a size suffix. Sizes that are specified as strings are always assumed to be bytes. | "1MB" | 
| credentials | string | Name of a credentials secret to mount. See the authentication section below for details. | "" | 
| watch | boolean or dictionary | Either 0bor1bor a dictionary with keysmethodandfrequency. The only current method istimer. The default frequency is0D00:00:05. If watch set to1buse the watch defaults. | 0b | 
For all common arguments, refer to configuring operators
This operator reads an object from a Google Cloud Storage bucket.
Object mode
Setting the object mode to binary will read content as a byte vector. When reading
 an object  as text, data will be read as strings and be split on newlines. Each string
 vector represents a single line.
Partitioning
The pipeline will automatically scale the number of workers to the minimum of the number
 of file paths and the maxWorkers in the pipeline settings. The workers will read a
 disjoint union of the paths. You can disable partitioning by setting maxWorkers
 to 1. Note that partitioning is not currently possible when there is more than one reader.
File watching
Setting the file watch to 1b will enable file watching with default settings. Usually
 this will be paired with a glob pattern for the file path or list of glob patterns.
 The reader will continually watch for new files matching the file path(s) provided using
 the watch method. Partitioning is not possible when using file watching. Note that by using
 watching the pipeline will continue watching until there is manual intervention to finish
 the pipeline. Also note, at present the file watcher will do nothing other than print a
 warning if a file is modified. In particular for files that are appended to, the new data
 will not be processed.
File watching should not to be used with the directWrite option in .qsp.write.toDatabase
You should not set file watch to 1b when using the directWrite in Database Writer since directWrite relies on a definitive finish point which is not guaranteed when using the file watcher.
This is not an issue with .qsp.v2.write.toDatabase.
Object chunking
Chunking an object splits the object into smaller batches and streams the batches through
 the pipeline. If chunking is set to auto, the reader will determine the size of
 the target object and if it is sufficiently large (more than a few megabytes) it will
 be read in chunks.
Operator can only read from one tenant
When reading in multiple files there must only be one tenant used to access all file paths passed into the operator.
Examples
Processing an Google Cloud Storage object:
.qsp.run
  .qsp.read.fromGoogleStorage[`:gs://bucket/numbers.txt]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
Processing multiple Google Cloud Storage objects:
.qsp.run
  .qsp.read.fromGoogleStorage[(`:gs://bucket/numbersOne.txt;`:gs://bucket/numbersTwo.txt)]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
2021.09.10D10:28:37.177672179 | 80
2021.09.10D10:28:37.177672179 | 91
2021.09.10D10:28:37.177672179 | 642
2021.09.10D10:28:37.177672179 | 485
2021.09.10D10:28:37.177672179 | 32
..
Processing multiple Google Cloud Storage objects using glob pattern matching:
.qsp.run
  .qsp.read.fromGoogleStorage["gs://bucket/numbers*.txt"]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
2021.09.10D10:28:37.177672179 | 80
2021.09.10D10:28:37.177672179 | 91
2021.09.10D10:28:37.177672179 | 642
2021.09.10D10:28:37.177672179 | 485
2021.09.10D10:28:37.177672179 | 32
..
File Watching:
.qsp.run
  .qsp.read.fromGoogleStorage["gs://bucket/numbers*.txt"; .qsp.use ``watch!(`;1b)]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2023.06.15D10:49:19.795351384 | 556
2023.06.15D10:49:19.795351384 | 465
2023.06.15D10:49:19.795351384 | 63
2023.06.15D10:49:19.795351384 | 106
2023.06.15D10:49:19.795351384 | 46
2023.06.15D10:49:20.352682229 | 80
2023.06.15D10:49:20.352682229 | 91
2023.06.15D10:49:20.352682229 | 642
2023.06.15D10:49:20.352682229 | 485
2023.06.15D10:49:20.352682229 | 32
If next we upload numbersThree.txt to the bucket we see:
2023.06.15D10:49:42.323727244 | 908
2023.06.15D10:49:42.323727244 | 360
2023.06.15D10:49:42.323727244 | 522
2023.06.15D10:49:42.323727244 | 257
2023.06.15D10:49:42.323727244 | 858
Reading a text object by line:
.qsp.run
  .qsp.read.fromGoogleStorage[`:gs://bucket/numbers.txt; ``mode!``text]
  .qsp.map["J"$]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..
Reading multiple text objects by line:
.qsp.run
  .qsp.read.fromGoogleStorage[(`:gs://bucket/numbersOne.txt;`:gs://bucket/numbersTwo.txt);
     ``mode!``text]
  .qsp.map["J"$]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..
2021.09.10D10:28:37.177672179 | 405
2021.09.10D10:28:37.177672179 | 215
2021.09.10D10:28:37.177672179 | 564
2021.09.10D10:28:37.177672179 | 50
2021.09.10D10:28:37.177672179 | 765
..
Reading with a custom tenant:
.qsp.run
  .qsp.read.fromGoogleStorage[`:gs://bucket/object; .qsp.use ``tenant!(""; "custom")]
  .qsp.write.toVariable[`output]
Subscribing to file events:
An API exists which allows you to subscribe to file progress events and track the progress of file readers. The docs describe how to utilize it with examples.
sp.read.from_google_storage('gs://productionBucket/closingPrices.csv')
Parameters:
| name | type | description | default | 
|---|---|---|---|
| path | string or string[] or symbol or symbol[] | The GS URI of an object or multiple objects to read from Google Cloud Storage. Glob patterns are supported. | Required | 
options:
| name | type | description | default | 
|---|---|---|---|
| project | string | The Google Cloud Storage project ID. | "" | 
| mode | symbol | A FileModeenum value, or the string equivalent. This must be a member of the sp.read.FileMode enumeration. | binary | 
| tenant | string | The authentication tenant. | "" | 
| offset | long | How many bytes into the file reading should begin. | 0 | 
| chunking | symbol | A FileChunkingenum value, or string equivalent. | auto | 
| chunk_size | long or string | The size of chunks to read when chunking is enabled. Can be specified as an integer number of bytes, or as a string with the unit, e.g. '1MB'. | "1MB" | 
| domain | string | A custom Google Cloud Storage domain. | "" | 
| credentials | string | Name of a credentials secret to mount. Refer to the authentication section below for more information. | "" | 
| watch | boolean or dictionary | Either True, False, or a dictionary containing method(a string) andfrequency(a timedelta). The only current method is"timer". The default frequency istimedelta(seconds=5). If watch is set to True, use the watch defaults. | 0b | 
Returns:
A from_google_storage reader, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(
        sp.read.from_google_storage('gs://myBucket/numbers.txt', mode='text')
        | sp.write.to_variable('out'))
>>> kx.q('out')
"556"
"465"
"63"
"106"
"46"
Required Permissions
This reader requires the ability to list and retrieve objects on the defined Google
bucket and as such requires the credentials provided by the user to have access to the
storage.objects.list, storage.objects.get and storage.buckets.get IAM permission,
see here
for more information.
Authentication
The Google Cloud Storage reader uses kurl for credential discovery.
See credential discovery
in the GCP tab for more information. When running on Google provisioned cloud infrastructure,
the credential discovery will automatically use the credentials of the  user that launched the
instance.
Environment variable authentication:
To setup authentication using an environment variable, set GOOGLE_STORAGE_TOKEN with the output of
running gcloud auth print-access-token. To install the Google SDK, refer to
these instructions.
For Docker based configurations, set the variables in the worker image configuration.
docker-compose:
  
version: "3.3"
services:
  worker:
    image: portal.dl.kx.com/kxi-sp-worker:1.13.0
    environment:
      GOOGLE_STORAGE_TOKEN: "123abc"
For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.
curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name     : "gs-reader",
        type     : "spec",
        config   : { content: $spec },
        settings : { minWorkers: "1", maxWorkers: "10" },
        env      : { GOOGLE_STORAGE_TOKEN: "123abc" }
    }' | jq -asR .)"
Kubernetes secrets for authentication:
When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.
First, create a secret using a generic secret. Take care to ensure the secret is created in the correct namespace.
kubectl create secret generic --from-literal token=$(gcloud auth print-access-token) gscreds
Next, add the secret name to the Google Cloud Reader reader configuration
.qsp.read.fromGoogleStorage[`:gs://bucket/hello; .qsp.use``credentials!``gscreds]
Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.
curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "gs-reader",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets : ["gscreds"] }
    }' | jq -asR .)"
HTTP
Requests data from an HTTP(S) endpoint.
.qsp.read.fromHTTP[url]
.qsp.read.fromHTTP[url; method]
.qsp.read.fromHTTP[url; method; .qsp.use (!) . flip (
    (`body            ; body);
    (`header          ; header);
    (`onResponse      ; onResponse);
    (`followRedirects ; followRedirects);
    (`maxRedirects    ; maxRedirects);
    (`maxRetryAttempts; maxRetryAttempts);
    (`timeout         ; timeout);
    (`tenant          ; tenant);
    (`insecure        ; insecure);
    (`binary          ; binary);
    (`sync            ; sync);
    (`rejectErrors    ; rejectErrors);
    (`trigger         ; trigger)
    )]
Parameters:
| name | type | description | default | 
|---|---|---|---|
| url | string or symbol | The URL to send a request to. | Required | 
| method | string or symbol | The HTTP method for the HTTP request (ex. GET, POST, etc.). | GET | 
options:
| name | type | description | default | 
|---|---|---|---|
| body | string | The payload of the HTTP request. | "" | 
| header | dict | A map of header fields to send as part of the request. | ()!() | 
| onResponse | function | After a response, allows the response to be preprocessed or to send another request. This function receives the result of the last request as a list where the first element is the return code, the second is the body and the third is the headers of the request. Returning ::will process the return from the original request immediately. A return of a string will issue another request with the return value as the URL. A return of a dictionary allows for any of the operator parameters to be reset and a new HTTP request issued. A special 'response' key can be used in the return dictionary to change the payload of the response. If the response key is set to::, no data is pushed into the pipeline. | :: | 
| followRedirects | boolean | If set, any redirects will automatically be followed up to the maximum number of redirects. | 1b | 
| maxRedirects | long | The maximum number of redirects to follow before reporting an error. | 5 | 
| maxRetryAttempts | long | The maximum number of times to retry a request that fails due to a timeout. | 10 | 
| timeout | long | The duration in milliseconds to wait for a request to be completed before reporting an error. A timeout of 0is unlimited | 5000 | 
| tenant | symbol | The request tenant to use for providing request authentication details. | "" | 
| insecure | boolean | Indicates if unverified server SSL/TLS certificates should be trusted. | 0b | 
| binary | boolean | Indicates that the resulting payload should be returned as binary data, otherwise text is assumed. | 0b | 
| sync | boolean | Indicates if this request should be made synchronously or asynchronously. Setting the request to be synchronous will block the process until the request is completed. | 0b | 
| rejectErrors | boolean | Non-successful response codes will generate an error and stop the pipeline. | 1b | 
| trigger | symbol or list | How this reader should be triggered. See pull reader options for details. | `once | 
For all common arguments, refer to configuring operators
Nondeterminism
This reader is not yet deterministic. When recovering from a checkpoint, data from before the recovery will not be replayed, which can result in data loss or duplication.
Return data from a GET request:
.qsp.run
  .qsp.read.fromHTTP["https://example.com"]
  .qsp.write.toVariable[`output]
Read paged data:
// @overview
// A static example URL.
// @type {string}
URL: "https://example.com"
// @overview
// An 'onResponse' handler that pages through the results of a web API.
// In this example, the response header (`res[2]`) contains a "next-page" key
// with the index of the next page to read. If the key is present, it returns
// a new URL for the HTTP reader to get data from. This will repeat the request
// with the next page until all the data has been read and there are no more pages.
//
// @param res {(int;string;dict)} The response of the last request.
//
// @return {string|null} A string return is a URL for the HTTP reader to issue a
//   a new request to.
nextPage: {[res] $[0 < count page: res[2]"S"$"next-page"; URL,"?page=",page; ::]}
.qsp.run
 .qsp.read.fromHTTP[URL,"?page=0"; .qsp.use``onResponse!(::;nextPage)]
 .qsp.write.toVariable[`output]
Return data from header:
// @overview
// A static example URL.
// @type {string}
URL: "https://example.com"
// @overview
// An onResponse handler that extracts information from the response handler and
// overwrites the data that is pushed into the pipeline to be this value.
//
// @param res {(int;string;dict)} The response of the last request.
//
// @return {dict (response: dict)}
onResponse: {[res] enlist[`response]!enlist res 2 }
.qsp.run
 .qsp.read.fromHTTP[URL; .qsp.use``onResponse!(::;onResponse)]
 .qsp.write.toVariable[`output]
sp.read.from_http('http://example.com', 'GET')
Parameters:
| name | type | description | default | 
|---|---|---|---|
| url | string or symbol | The URL to send a request to. | Required | 
| method | string or symbol | The HTTP method for the HTTP request (ex. GET, POST, etc.). | GET | 
options:
| name | type | description | default | 
|---|---|---|---|
| body | string | The payload of the HTTP request. | "" | 
| header | dict | A map of header fields to their corresponding values. | ()!() | 
| on_response | function | After a response, allows the response to be preprocessed or to send another request. Returning 'None' will process the return from the original request immediately. A return of a string will issue another request with the return value as the URL. A return of a dictionary allows for any of the operator parameters to be reset and a new HTTP request issued. A special 'response' key can be used in the return dictionary to change the payload of the response. If the response key is set to 'None', no data is pushed into the pipeline. | :: | 
| follow_redirects | boolean | If set, any redirect return will automatically be followed up to the maximum number of redirects. | 1b | 
| max_redirects | long | The maximum number of redirects to follow before reporting an error. | 5 | 
| max_retry_attempts | long | The number of times to retry a connection after a request timeout. | 10 | 
| timeout | long | The duration in milliseconds to wait for a request to be completed before reporting an error. | 5000 | 
| tenant | symbol | The request tenant to use for providing request authentication details. | "" | 
| insecure | boolean | Indicates if unverified server SSL/TLS certificates should be trusted. | 0b | 
| binary | boolean | Indicates that the resulting payload should be returned as binary data, otherwise text is assumed. | 0b | 
| sync | boolean | Indicates if this request should be made synchronously or asynchronously. Setting the request to be synchronous will block the process until the request is completed. | 0b | 
| reject_errors | boolean | Non-successful response codes will generate an error and stop the pipeline. | 1b | 
| trigger | symbol or list | How this reader should be triggered. See pull reader options for details. | `once | 
Returns:
A pipeline comprised of the from_http reader, which can be joined to other pipelines.
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_http('http://example.com', 'GET')
        | sp.write.to_variable('out'))
>>> kx.q('out')
"<!doctype html>\\n<html>\\n<head>\\n    <title>Example Domain</title>\\n\\n...
Nondeterminism
This reader is not yet deterministic. When recovering from a checkpoint, data from before the recovery will not be replayed, which can result in data loss or duplication.
Kafka
Consumes data from a Kafka topic.
.qsp.read.fromKafka[topic]
.qsp.read.fromKafka[topic; brokers]
.qsp.read.fromKafka[topic; brokers; .qsp.use (!) . flip (
    (`retries      ; retries);
    (`retryWait    ; retryWait);
    (`pollLimit    ; pollLimit);
    (`offset       ; offset);
    (`registry     ; registry);
    (`asList       ; asList);
    (`options      ; options))]
Parameters:
| name | type | description | default | 
|---|---|---|---|
| topic | symbol or string | The name of a topic. | Required | 
| brokers | string or string[] or symbol or symbol[] | One or more brokers identified as host:port pairs. | "localhost:9092" | 
options:
| name | type | description | default | 
|---|---|---|---|
| retries | long | Max retry attempts for Kafka API calls. | 10 | 
| retryWait | timespan | Time to wait between retry attempts. | 0D00:00:02 | 
| pollLimit | long | Maximum number of records to process in a single poll loop. | 1000 | 
| offset | dictionary | Partitions to offsets to start reading from. | (`int$())!() | 
| registry | string | Optional URL to a Kafka Schema Registry. When provided, Kafka Schema Registry mode is enabled, allowing for automatic payload decoding. | "" | 
| asList | boolean | Set to true for Kafka Schema Registry messages to omit field names when decoding Protocol Buffer schemas, and instead return only the list of values. | 0b | 
| options | dictionary | Options to be passed through to Kafka. | ()!() | 
For all common arguments, refer to configuring operators
This operator acts as a Kafka consumer, consuming data from a Kafka topic and pushing it to the pipeline.
A Kafka consumer will distribute load across parallel readers to maximize the throughput of a given topic.
Workers that join the stream will start reading from the end of the stream unless an
explicit offset is provided. To start by reading from the beginning of the stream, a special start
symbol can be used as the offset value, as in the Custom Offsets example below.
Beside the above keys, all available Kafka consumer
configuration options
are supported using the options dictionary.
This includes properties such as socket.timeout.ms, fetch.message.max.bytes, etc.
Reserved keys
group.id, metadata.broker.list, and consumer.id are reserved values and are maintained
 directly by the Kafka reader. The Kafka reader will error if these options are used.
Maximum Poll Limit
The Kafka reader reads multiple messages in a single poll iteration. A global limit is
 imposed on the number of messages to read in a single cycle to avoid locking the
 process in a read loop only serving Kafka messages. By default this limit is set to
 1000 messages. Setting the configuration option pollLimit will change this value.
 This limit is global so if multiple readers set this value, only one will be used.
Running with TLS
It is recommended for all production deployments that TLS be enabled for encrypting data in transit. For more information about configuring TLS, refer to the TLS tutorial.
Offset Committing and Group Permissions
.qsp.read.fromKafka will automatically commit offsets when reading from a kafka broker.
 This is essential for exactly once semantics and fault tolerance.
 The error "Local: Waiting for coordinator" will occur when attempting to commit
 offsets if you do not have sufficient group permissions. The broker should be configured so that
 the user has group Describe permissions. Since .qsp.read.fromKafka will generate a
 random group ID, ensure the user is permissioned for all group names on the broker.
Localhost broker:
// Read topic 'numbers' with a default broker at 'localhost:9092'.
// This topic is generating random numbers between 0 and 10
.qsp.run .qsp.read.fromKafka[`numbers] .qsp.write.toConsole[]
2021.03.08D19:19:08.171805000 | 4
2021.03.08D19:19:08.171915000 | 7
2021.03.08D19:19:08.172081000 | 3
..
Multiple brokers:
// Read topic 'numbers' from multiple brokers. This topic is generating
// random numbers between 0 and 10
.qsp.run
  .qsp.read.fromKafka[.qsp.use `topic`brokers!(`numbers;("localhost:1234";"localhost:1235"))]
  .qsp.write.toConsole[]
2021.03.08D19:19:08.171805000 | 4
2021.03.08D19:19:08.171915000 | 7
2021.03.08D19:19:08.172081000 | 3
..
Advanced configuration:
// Reads topic 'numbers' with a default broker setting custom values for advanced
// consumer configuration.
.qsp.run
  .qsp.read.fromKafka[.qsp.use (!) . flip (
    (`topic  ; `numbers);
    (`options; (!) . flip (
        (`socket.timeout.ms         ; 60000);       // Wait 1 minute for socket timeout
        (`fetch.message.max.bytes   ; 4*1024*1024); // Allow 4MiB max message size
        (`fetch.wait.max.ms         ; 500))))]      // Wait up to 500ms for fetch cycle
  .qsp.write.toConsole[]
2021.03.08D19:19:08.171805000 | 4
2021.03.08D19:19:08.171915000 | 7
2021.03.08D19:19:08.172081000 | 3
..
Custom offsets:
// Reads topic 'numbers' from the end of the data stream. This will skip any
// old messages in the stream and start reading from the point that a given
// worker joins the stream. Note that the default offset for a given stream is
// to read from the beginning of the stream.
.qsp.run
  .qsp.read.fromKafka[.qsp.use (!) . flip (
    (`topic  ; `numbers);
    (`offset ; `start))]   // The 'offset' field can be set to a single value
                            // to apply to all partitions. Alternatively it can
                            // be set to a dictionary of partitions and
                            // offsets. Offsets can either be the special
                            // symbols `start or `end or a literal offset index
                            // as a number. Example: 0 1i!(`start; 10)
  .qsp.write.toConsole[]
2021.03.08D19:19:08.171805000 | 8
2021.03.08D19:19:08.171915000 | 1
2021.03.08D19:19:08.172081000 | 9
..
TLS authentication:
For more information about configuring TLS, refer to the TLS tutorial.
// Reads topic 'numbers' from a broker configured at `kafka:9092` using
// custom TLS/SSL certificates.
.qsp.run
  .qsp.read.fromKafka[.qsp.use (!) . flip (
    (`brokers; "kafka:9092");
    (`topic  ; `numbers);
    (`options; (!) . flip (
        (`ssl.secret       ; "certs");
        (`ssl.key.password ; "iamsecure"))))]
  .qsp.write.toConsole[]
2021.03.08D19:19:08.171805000 | 4
2021.03.08D19:19:08.171915000 | 7
2021.03.08D19:19:08.172081000 | 3
..
Kafka Schema Registry:
.qsp.run
  .qsp.read.fromKafka[.qsp.use (!) . flip (
    (`topic;`numbers);
    (`brokers;"localhost:9092");
    (`registry;"http://localhost:8081"))]
  .qsp.write.toConsole[];
sp.read.from_kafka('trades')
Maximum poll limit
The Kafka reader reads multiple messages in a single poll iteration. A global limit is imposed on the number of messages to read in a single cycle to avoid locking the process in a read loop only serving Kafka messages. By default this limit is set to 1000 messages. Setting the configuration option pollLimit will change this value. This limit is global so if multiple readers set this value, only one will be used.
Reserved keys
group.id, metadata.broker.list, and consumer.id are reserved values and are maintained
directly by the Kafka reader. The Kafka reader will error if these options are used.
Running with TLS
It is recommended for all production deployments that TLS be enabled for encrypting data in transit. For more information about configuring TLS, refer to the TLS tutorial.
Offset Committing and Group Permissions
sp.read.from_kafka will automatically commit offsets when reading from a kafka broker.
This is essential for exactly once semantics and fault tolerance.
The error "Local: Waiting for coordinator" will occur when attempting to commit
offsets if you do not have sufficient group permissions. The broker should be configured so that
the user has group Describe permissions. Since sp.read.from_kafka will generate a
random group ID, ensure the user is permissioned for all group names on the broker.
Kafka configuration options
Beside the above keys, all available Kafka consumer
configuration options
are supported using the options dictionary.
This includes properties such as socket.timeout.ms, fetch.message.max.bytes, etc.
Parameters:
| name | type | description | default | 
|---|---|---|---|
| topic | symbol or string | The name of a topic. | Required | 
| brokers | string or string[] or symbol or symbol[] | Brokers identified a 'host:port'string, or a list of'host:port'strings. | "localhost:9092" | 
options:
| name | type | description | default | 
|---|---|---|---|
| retries | long | Maximum number of retries that will be attempted for Kafka API calls. | 10 | 
| retry_wait | timespan | How long to wait between retry attempts. | 0D00:00:02 | 
| poll_limit | long | Maximum number of records to process in a single poll loop. | 1000 | 
| offset | dictionary | Dictionary mapping from partition IDs to their offsets. | (`int$())!() | 
| options | dictionary | Dictionary of Kafka consumer options. | ()!() | 
| registry | string | Optional URL to a Kafka Schema Registry. When provided, Kafka Schema Registry mode is enabled, allowing for payload decoding. | "" | 
| as_list | boolean | Boolean value which, when set to True results in Kafka Schema Registry messages omitting field names when decoding Protocol Buffer schemas and returning values as a list. | 0b | 
Returns:
A from_kafka reader, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_kafka('numbers')
        | sp.write.to_variable('out'))
>>> kx.q('out')
4
7
3
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_kafka('numbers', options={
        'socket.timeout.ms': '60000',
        'socket.send.buffer.bytes': '4194304',
        'max.in.flight': '100000'})
        | sp.write.to_variable('out'))
>>> kx.q('out')
4
7
3
kdb Insights Database
Reads data from a kdb Insights Database.
.qsp.read.fromDatabase[sql]
.qsp.read.fromDatabase[.qsp.use (!) . flip (
    (`table           ; table);
    (`startTS         ; startTS);
    (`endTS           ; endTS);
    (`filter          ; filter);
    (`groupBy         ; groupBy);
    (`agg             ; agg);
    (`fill            ; fill);
    (`temporality     ; temporality);
    (`sortCols        ; sortCols);
    (`labels          ; labels);
    (`retries         ; retries);
    (`retryWait       ; retryWait);
    (`trigger         ; trigger))]
.qsp.read.fromDatabase[.qsp.use enlist[`uda]!enlist(`.myns.myUdaName; udaArgs)]
.qsp.read.fromDatabase[.qsp.use (!) . flip (
    (`uda             ; uda);
    (`labels          ; labels);
    (`trigger         ; trigger))]
Parameters:
options:
| name | type | description | default | 
|---|---|---|---|
| sql | string | SQL query to execute on the database.  Leave blank for getDatarequests or when utilizing UDAs.  Enable by following these steps for sql2. | "" | 
| uda | list | List of the UDA registered name and required UDA parameters.  Omit for getDataor query requests. Follows the example definedhere. | () | 
| table | symbol | Name of table to retrieve data from. Cannot be combined with above uda option. | ` | 
| startTS | timestamp | Inclusive start time of period of interest. | -0Wp | 
| endTS | timestamp | Exclusive end time of period of interest. | 0Wp | 
| filter | list | List of triadic lists of the form (function;column name;parameter). | () | 
| groupBy | symbol[] | List of columns to group aggregation result by. | () | 
| agg | symbol[] | List of triples of aggregations or columns to select. e.g. Aggregation dict example: (`c1`avg`price;`c2`sum`size), basic select example`price`size. | () | 
| fill | symbol | How to handle nulls in the data. Supported values are zeroandforward. Thezerooption fills numeric types with zeroes. Theforwardoption fills nulls with previous, non-null entry. Using any value for this parameter when aggregating is ineffective if theaggparameter is a list of triples e.g.(`c1`avg`price;`c2`sum`size) | `zero | 
| temporality | symbol | Sets the range of data in view for each day within the query. Supports two types of temporality: snapshotwhich takes a continuous range of the data, andslicewhich grabs data between the time values ofstartTSandendTSparameters, for each date within the start and end timestamp. | `snapshot | 
| sortCols | symbol[] | Columns to sort result data on (ascending). | () | 
| labels | dict | Map of label selectors for specifying Assemblies. | ()!() | 
| retries | long | Number of times to retry the connection to the database. | 60 | 
| retryWait | timespan | Time to wait between database connection attempts. | 0D00:00:03 | 
| trigger | symbol or list | How this reader should be triggered. See pull reader options for details. | `once | 
For all common arguments, refer to configuring operators
This operator issues a query either using SQL or a functional 'get data' query. If a single
string parameter is provided, a SQL query is issued. Otherwise, the configuration dictionary is
used, and table, startTS and endTS are considered required fields.
Database Host
In an Insights Enterprise deployment, the database reader will use the deployment release name to
locate the query service gateway. When using an Insights SDK configuration, or to override the
host address of the database, set $KXI_SP_DATABASE_HOST to the hostname. The hostname should be the
full URL of the Service Gateway including the port to connect to.
SQL query:
.qsp.run
  .qsp.read.fromDatabase["SELECT * FROM stocks"]
  .qsp.write.toConsole[]
                             | date        id  sym       market   name                                                  cap
-----------------------------| ---------------------------------------------------------------------------------------------------
2021.09.10D18:29:31.124921700| 2021.09.10  1   "OXLC"    "NASDAQ" "Oxford Lane Capital Corp."                           "$229.56M"
2021.09.10D18:29:31.124921700| 2021.09.10  2   "CIA"     "NYSE"   "Citizens, Inc."                                      "$350.57M"
2021.09.10D18:29:31.124921700| 2021.09.10  3   "PEI"     "NYSE"   "Pennsylvania Real Estate Investment Trust"           "$785.28M"
2021.09.10D18:29:31.124921700| 2021.09.10  4   "SPWR"    "NASDAQ" "SunPower Corporation"                                "$1.06B"
2021.09.10D18:29:31.124921700| 2021.09.10  5   "DVA"     "NYSE"   "DaVita Inc."                                         "$12.88B"
2021.09.10D18:29:31.124921700| 2021.09.10  6   "BHACW"   "NASDAQ" "Barington/Hilco Acquisition Corp."                   "n/a"
2021.09.10D18:29:31.124921700| 2021.09.10  7   "BCRX"    "NASDAQ" "BioCryst Pharmaceuticals, Inc."                      "$446.33M"
..
Get data API:
.qsp.run
  .qsp.read.fromDatabase[.qsp.use `table`startTS`endTS!(`stocks; .z.p-1D; .z.p)]
  .qsp.write.toConsole[]
                             | date        id  sym       market   name                                                  cap
-----------------------------| ---------------------------------------------------------------------------------------------------
2021.09.10D18:29:31.124921700| 2021.09.10  1   "OXLC"    "NASDAQ" "Oxford Lane Capital Corp."                           "$229.56M"
2021.09.10D18:29:31.124921700| 2021.09.10  2   "CIA"     "NYSE"   "Citizens, Inc."                                      "$350.57M"
2021.09.10D18:29:31.124921700| 2021.09.10  3   "PEI"     "NYSE"   "Pennsylvania Real Estate Investment Trust"           "$785.28M"
2021.09.10D18:29:31.124921700| 2021.09.10  4   "SPWR"    "NASDAQ" "SunPower Corporation"                                "$1.06B"
2021.09.10D18:29:31.124921700| 2021.09.10  5   "DVA"     "NYSE"   "DaVita Inc."                                         "$12.88B"
2021.09.10D18:29:31.124921700| 2021.09.10  6   "BHACW"   "NASDAQ" "Barington/Hilco Acquisition Corp."                   "n/a"
2021.09.10D18:29:31.124921700| 2021.09.10  7   "BCRX"    "NASDAQ" "BioCryst Pharmaceuticals, Inc."                      "$446.33M"
..
Enrich data with stream
stocks: .qsp.read.fromDatabase["select * from stocks"];
// Reads from a stream and filters on a table called 'trades', windows
// the data into 5 second intervals, then enriches the data with a join
// on time. Finally the data is written to a table called 'avgPrice'.
.qsp.run
  .qsp.read.fromStream[`trades]
  .qsp.window.tumbling[0D00:00:05; `time]
  .qsp.merge[stocks; { x lj `sym xkey y }]  // Join stocks to trades on sym
  .qsp.map[{ select avg price by sym from x }]
  .qsp.write.toStream[`avgPrice]
toConsole writer might be as follows.
                             | sym  price
-----------------------------| -------------
2022.02.23D17:38:31.256064000| AAPL 360.1039
2022.02.23D17:38:31.256064000| GOOG 658.463
2022.02.23D17:38:31.256064000| HBP  737.0521
2022.02.23D17:38:31.256064000| NFLX 289.6375
2022.02.23D17:38:31.256064000| PEA  191.2175
..
Utilize a registered UDA
args:`table`startTS`endTS`byCols!(`trade;"p"$.z.D-3;"p".z.D-2;`date`sym);
.qsp.run
  .qsp.read.fromDatabase[.qsp.use enlist[`uda]!enlist(`.customUDA.countBy; args)]
  .qsp.write.toConsole[]
                             | sym  price
-----------------------------| -------------
2022.02.23D17:38:31.256064000| AAPL 360.1039
2022.02.23D17:38:31.256064000| GOOG 658.463
2022.02.23D17:38:31.256064000| HBP  737.0521
2022.02.23D17:38:31.256064000| NFLX 289.6375
2022.02.23D17:38:31.256064000| PEA  191.2175
..
sp.read.from_database(
    table='weather',
    start_ts='2022.07.28D00:00:00.000000000',
    end_ts='2022.07.29D00:00:00.000000000',
    agg=['eventTimestamp', 'instrumentID', 'closingPrice']
)
This operator is built on the kdb Insights query APIs.
The default mode is to use the getData API to query for
a specific table and time range.
It also supports the execution of SQL queries, however this is disabled by default. It can be enabled by following these steps for sql2.
For more information on supported functionality read the API docs.
Parameters:
options:
| name | type | description | default | 
|---|---|---|---|
| sql | string | SQL query to execute on the database. Leave blank for getDatarequests or when utilizing UDAs. | "" | 
| uda | list | List of the UDA registered name and required UDA parameters.  Omit for getDataor query requests. | () | 
| table | symbol | Table name in kdb Insights Database. Cannot be combined with above uda option. | ` | 
| start_ts | timestamp | Inclusive start time of period of interest. | -0Wp | 
| end_ts | timestamp | Exclusive end time of period of interest. | 0Wp | 
| filter | list | List of triadic lists of the form (function;column name;parameter). | () | 
| group_by | symbol[] | List of columns to group aggregation result by. | () | 
| agg | symbol[] | List of triples of aggregations or columns to select, e.g. [['c1', 'avg', 'valFloat'], ['c2', 'min', 'qual']]. | () | 
| fill | symbol | How to handle nulls in the data. Supported values are zeroandforward. | `zero | 
| temporality | symbol | Sets the range of data in view for each day within the query. Supports two types of temporality: snapshotwhich takes a continuous range of the data, andslicewhich grabs data between the time values ofstartTSandendTSparameters, for each date within the start and end timestamp. | `snapshot | 
| sort_cols | symbol[] | Columns to sort result data on (ascending), e.g. ['sensorID', 'readTS']. | () | 
| labels | dict | Map of label selectors for specifying Assemblies. | ()!() | 
| retries | long | Maximum number of retries that will be attempted for establishing the initial connection, or reconnecting after the connection has been lost. | 60 | 
| retry_wait | timespan | How long to wait between retry attempts, e.g. timedelta(milliseconds=2000)or(2, 's'). | 0D00:00:03 | 
| trigger | symbol or list | How this reader should be triggered. See pull reader options for details. | `once | 
Returns:
A from_database reader, which can be joined to other operators or pipelines.
Database Host
In an Insights Enterprise deployment, the database reader will use the deployment release name to
locate the query service gateway. When using an Insights SDK configuration, or to override the
host address of the database, set $KXI_SP_DATABASE_HOST to the hostname. The hostname should be the
full URL of the Service Gateway including the port to connect to.
Aggregation
The agg parameter can be used to select a subset of table columns in a getData query,
e.g. ['valFloat','qual'].
Fills
The zero option fills numeric types with zeroes. The forward option fills nulls
with previous, non-null entry. Using any value for this parameter when aggregating
is ineffective if the agg parameter is a list of triples, e.g.
[['c1', 'avg', 'valFloat'], ['c2', 'min', 'qual']].
Timestamps
The start_ts and end_ts parameters can be either a Python datetime object or a
date-formatted string, e.g. 2022.07.29D00:00:00.000000000.
UDA parameter types
When using this operator with the UDA query mode, it's important to ensure the UDA parameters are correctly typed or they may not be handled correctly by the server. See the UDA example below.
Examples:
>>> from datetime import datetime
>>> date_str = "2024-10-14 14:00:45"
>>> date_format = "%Y-%m-%d %H:%M:%S"
>>> start_ts = datetime.strptime(date_str, date_format)
getData API:
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_database( table='weather',
                                  start_ts='2022.07.28D00:00:00.000000000',
                                  end_ts='2022.07.29D00:00:00.000000000',
                                  agg=['timestamp', 'sensor', 'airtemp', 'name', 'borough', 'longitude', 'latitude'],
                                  retries=3,
                                  retry_wait=(2, 's'))
        | sp.write.to_variable('out'))
>>> kx.q('out')
timestamp                     sensor   airtemp  name                      borough       longitude latitude
----------------------------------------------------------------------------------------------------------
2022.07.28D00:00:00.000000000 Q-JC_04  75.05117 Fox Hills                 Staten Island -74.08174 40.61731
2022.07.28D00:00:00.000000000 Bk-BR_17 54.27767 Rosedale                  Queens        -73.73526 40.65982
2022.07.28D00:00:00.000000000 Bk-EF_09 54.828   Far Rockaway              Queens        -73.75498 40.60313
2022.07.28D00:00:00.000000000 Bk-EN_14 54.00883 Beechhurst                Queens        -73.80436 40.79278
2022.07.28D00:00:00.000000000 Bk-SH_08 54.364   St. George                Staten Island -74.07935 40.64498
2022.07.28D00:00:00.000000000 Bx-EC_01 52.84533 Stapleton                 Staten Island -74.0779  40.62693
2022.07.28D00:00:00.000000000 Bx-EC_02 50.88533 Rosebank                  Staten Island -74.06981 40.6153
2022.07.28D00:00:00.000000000 Bx-EC_03 52.4225  West Brighton             Staten Island -74.10718 40.63188
2022.07.28D00:00:00.000000000 Bx-EC_04 52.45167 Grymes Hill               Staten Island -74.08725 40.62418
2022.07.28D00:00:00.000000000 Bx-EC_05 51.70583 Todt Hill                 Staten Island -74.11133 40.59707
Advanced getData API:
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_database( table='weather',
                            start_ts='2022.07.28D00:00:00.000000000',
                            end_ts='2022.07.29D00:00:00.000000000',
                            filter=[['<=', 'airtemp', 60], ['within', 'longitude', [-75, -72]]],
                            fill='zero',
                            temporality='slice',
                            sort_cols=['sensor', 'timestamp'],
                            agg=['timestamp', 'sensor', 'airtemp', 'name', 'borough', 'longitude', 'latitude'],
                            retries=3,
                            retry_wait=(2, 's'))
        | sp.write.to_variable('out'))
date       label_kxname  timestamp                     sensor   airtemp  name            borough longitude latitude
-------------------------------------------------------------------------------------------------------------------
2022.07.28 insights-demo 2022.07.28D01:00:00.000000000 Bk-BR_02 59.20733 Cambria Heights Queens  -73.73527 40.69277
2022.07.28 insights-demo 2022.07.28D02:00:00.000000000 Bk-BR_02 57.5775  Cambria Heights Queens  -73.73527 40.69277
2022.07.28 insights-demo 2022.07.28D03:00:00.000000000 Bk-BR_02 56.317   Cambria Heights Queens  -73.73527 40.69277
2022.07.28 insights-demo 2022.07.28D04:00:00.000000000 Bk-BR_02 55.57333 Cambria Heights Queens  -73.73527 40.69277
2022.07.28 insights-demo 2022.07.28D05:00:00.000000000 Bk-BR_02 54.50233 Cambria Heights Queens  -73.73527 40.69277
2022.07.28 insights-demo 2022.07.28D06:00:00.000000000 Bk-BR_02 53.62383 Cambria Heights Queens  -73.73527 40.69277
2022.07.28 insights-demo 2022.07.28D07:00:00.000000000 Bk-BR_02 53.74717 Cambria Heights Queens  -73.73527 40.69277
2022.07.28 insights-demo 2022.07.28D08:00:00.000000000 Bk-BR_02 55.60933 Cambria Heights Queens  -73.73527 40.69277
2022.07.28 insights-demo 2022.07.28D09:00:00.000000000 Bk-BR_02 57.8435  Cambria Heights Queens  -73.73527 40.69277
2022.07.28 insights-demo 2022.07.28D10:00:00.000000000 Bk-BR_02 59.26417 Cambria Heights Queens  -73.73527 40.69277
SQL API:
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_database(sql="select * from weather",
                                 retries=3,
                                 retry_wait=timedelta(milliseconds=2000))
        | sp.write.to_variable('out'))
>>> kx.q('out')
timestamp                     sensor   airtemp  name                borough       longitude latitude
----------------------------------------------------------------------------------------------------
2022.07.28D07:00:00.000000000 Q-CH_19  46.256   Queens Village      Queens        -73.73871 40.71889
2022.07.28D06:00:00.000000000 Q-CH_19  46.7232  Queens Village      Queens        -73.73871 40.71889
2022.07.28D06:00:00.000000000 Q-CH_10  47.63167 Kew Gardens Hills   Queens        -73.82088 40.72258
2022.07.28D05:00:00.000000000 Q-CH_19  47.70583 Queens Village      Queens        -73.73871 40.71889
2022.07.28D07:00:00.000000000 Q-CH_10  47.88417 Kew Gardens Hills   Queens        -73.82088 40.72258
2022.07.28D00:00:00.000000000 Q-CH_29  48.04817 Madison             Brooklyn      -73.94842 40.60938
2022.07.28D00:00:00.000000000 Q-CH_31  48.47183 Bronxdale           Bronx         -73.86173 40.85272
2022.07.28D05:00:00.000000000 Q-CH_10  48.70867 Kew Gardens Hills   Queens        -73.82088 40.72258
2022.07.28D01:00:00.000000000 Q-CH_31  48.85617 Bronxdale           Bronx         -73.86173 40.85272
Utilize a registered UDA:
>>> from kxi import sp
>>> import pykx as kx
# explicitly type the UDA params to avoid issues on the server side
>>> args=['.example.multiplierAPI', {'table': kx.SymbolAtom('weather'),
        'column': kx.CharVector('airtemp'), 'multiplier': kx.FloatAtom(10.0)}]
>>> sp.run(sp.read.from_database(uda=args)
        | sp.write.to_variable('out'))
>>> kx.q('out')
original_col multiplied_col
---------------------------
46.256        462.56
46.7232       467.232
47.63167      476.3167
..
kdb Insights Stream
Reads data using a kdb Insights Stream.
.qsp.read.fromStream[table]
.qsp.read.fromStream[table; stream]
.qsp.read.fromStream[table; stream; .qsp.use enlist[`position]!enlist position]
.qsp.read.fromStream[table; stream; .qsp.use enlist[`index]!enlist index]
Parameters:
| name | type | description | default | 
|---|---|---|---|
| table | symbol or string | A table name. | A table name to filter incoming messages by. Leave blank to receive all messages. | 
| stream | symbol or string | The inbound stream | $RT_SUB_TOPIC | 
options:
| name | type | description | default | 
|---|---|---|---|
| position | symbol or string | The position in the stream to replay from. Options are startorendof the stream. | start | 
| index (DEPRECATED) | long | The integer position in the stream to replay from. If both indexandpositionare specified thepositionparameter will take priority. | 0 | 
For all common arguments, refer to configuring operators
This operator reads data using kdb Insights Reliable Transport Streams.
sp.read.from_stream('trace', 'production-stream-package')
Parameters:
| name | type | description | default | 
|---|---|---|---|
| table | symbol or string | Name of the table to filter the stream on. By default, no filtering is performed. | A table name to filter incoming messages by. Leave blank to receive all messages. | 
| stream | symbol or string | Name of stream to subscribe to. By default, the stream specified by the $RT_SUB_TOPICenvironment variable is used. | $RT_SUB_TOPIC | 
options:
| name | type | description | default | 
|---|---|---|---|
| position | symbol or string | The position in the stream to replay from. Options are startorendof the stream. If no position is specified, defaults tostart. | start | 
| index (DEPRECATED) | long | The integer position in the stream to replay from. If both indexandpositionare set thepositionparameter will take priority. | 0 | 
| prefix (DEPRECATED) | string | Prefix to add to the hostname for RT cluster. By default, the prefix given by the $RT_TOPIC_PREFIXenvironment variable is used. | "" | 
| assembly (DEPRECATED) | string | The kdb Insights assembly to read from. By default, no assembly is used. | "" | 
| insights (DEPRECATED) | string | Whether the stream being subscribed to uses Insights message formats. | "" | 
Returns:
A from_stream reader, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_stream('trace', 'north')
        | sp.write.to_variable('out'))
>>> kx.q('out')
date       val
--------------
2023.09.26 0
2023.09.26 1
2023.09.26 2
2023.09.26 3
2023.09.26 4
2023.09.26 5
2023.09.26 6
2023.09.26 7
2023.09.26 8
2023.09.26 9
Microsoft Azure Storage
Reads files from Microsoft Azure Storage.
.qsp.read.fromAzureStorage[path]
.qsp.read.fromAzureStorage[path; account]
.qsp.read.fromAzureStorage[path; account; .qsp.use (!) . flip (
    (`mode     ; mode);
    (`tenant   ; tenant);
    (`chunking ; chunking);
    (`chunkSize; chunkSize);
    (`watch    ; watch))]
Parameters:
| name | type | description | default | 
|---|---|---|---|
| path | string or string[] or symbol or symbol[] | The Blob URI of an object or multiple objects to read from Microsoft Azure Storage. Note that this must be an ms://URL*, not anhttps://URL. Glob patterns are supported. | Required | 
| account | string or symbol | The name of the Microsoft Azure Storage account to read an object from. | $AZURE_STORAGE_ACCOUNT | 
options:
| name | type | description | default | 
|---|---|---|---|
| mode | symbol | Either binaryortext. | binary | 
| tenant | string | The authentication tenant. | "" | 
| chunking | symbol | Either enabled,disabledorauto. | auto | 
| chunkSize | long or string | The size in bytes of chunks to read when chunking is enabled or the number of bytes to read using a size suffix. Sizes that are specified as strings are always assumed to be bytes. | "1MB" | 
| credentials | string | Name of a credentials secret to mount. See the authentication section below for details. | "" | 
| watch | boolean or dictionary | Either 0bor1bor a dictionary with keysmethodandfrequency. The only current method istimer. The default frequency is0D00:00:05. If watch set to1buse the watch defaults. | 0b | 
For all common arguments, refer to configuring operators
ms:// URLs
This operator requires URLs to be in the format ms://yourContainerName/path/to/file.csv,
so the URL https://myStorageAccount.blob.core.windows.net/myContainerName/path/to/file.csv
would be written as .qsp.read.fromAzureStorage["ms://myContainerName/path/to/file.csv"; "myStorageAccount"]
Object mode
Setting the object mode to binary will read content as a byte vector. When reading
 an object  as text, data will be read as strings and be split on newlines. Each string
 vector represents a single line.
Partitioning
The pipeline will automatically scale the number of workers to the minimum of the number
 of file paths and the maxWorkers in the pipeline settings. The workers will read a
 disjoint union of the paths. You can disable partitioning by setting maxWorkers
 to 1. Note that partitioning is not currently possible when there is more than one reader.
File watching
Setting the file watch to 1b will enable file watching with default settings. Usually
 this will be paired with a glob pattern for the file path or list of glob patterns.
 The reader will continually watch for new files matching the file path(s) provided using
 the watch method. Partitioning is not possible when using file watching. Note that by using
 watching the pipeline will continue watching until there is manual intervention to finish
 the pipeline. Also note, at present the file watcher will do nothing other than print a
 warning if a file is modified. In particular for files that are appended to, the new data
 will not be processed.
File watching should not to be used with the directWrite option in .qsp.write.toDatabase
You should not set file watch to 1b when using the directWrite in Database Writer since directWrite relies on a definitive finish point which is not guaranteed when using the file watcher.
This is not an issue with .qsp.v2.write.toDatabase.
Object chunking
Chunking an object splits the object into smaller batches and streams the batches through
 the pipeline. If chunking is set to auto, the reader will determine the size of
 the target object and if it is sufficiently large (more than a few megabytes) it will
 be read in chunks.
Operator can only read from one account, domain and tenant
When reading in multiple files there must only be one account used to access all file paths passed into the operator. The same goes for domain and tenant.
Examples
Processing an Microsoft Azure Storage object:
.qsp.run
  .qsp.read.fromAzureStorage[`:ms://bucket/numbers.txt; `myaccount]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
Processing multiple Microsoft Azure Storage objects:
.qsp.run
  .qsp.read.fromAzureStorage[(`:ms://bucket/fileOne.txt;`:ms://bucket/fileTwo.txt); `myaccount]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
2021.09.10D10:28:37.177672179 | 32
2021.09.10D10:28:37.177672179 | 62
2021.09.10D10:28:37.177672179 | 567
2021.09.10D10:28:37.177672179 | 20
2021.09.10D10:28:37.177672179 | 13
..
Processing multiple Microsoft Azure Storage objects using glob pattern matching:
.qsp.run
  .qsp.read.fromAzureStorage["ms://bucket/file*.txt"; `myaccount]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 556
2021.09.10D10:28:37.177672179 | 465
2021.09.10D10:28:37.177672179 | 63
2021.09.10D10:28:37.177672179 | 106
2021.09.10D10:28:37.177672179 | 46
..
2021.09.10D10:28:37.177672179 | 32
2021.09.10D10:28:37.177672179 | 62
2021.09.10D10:28:37.177672179 | 567
2021.09.10D10:28:37.177672179 | 20
2021.09.10D10:28:37.177672179 | 13
..
File Watching:
.qsp.run
  .qsp.read.fromAzureStorage["ms://bucket/file*.txt"; `myaccount; .qsp.use ``watch!(`;1b)]
  .qsp.map[{"J"$"\n" vs x}]
  .qsp.write.toConsole[]
2023.06.15D10:49:19.795351384 | 556
2023.06.15D10:49:19.795351384 | 465
2023.06.15D10:49:19.795351384 | 63
2023.06.15D10:49:19.795351384 | 106
2023.06.15D10:49:19.795351384 | 46
2023.06.15D10:49:20.352682229 | 32
2023.06.15D10:49:20.352682229 | 62
2023.06.15D10:49:20.352682229 | 567
2023.06.15D10:49:20.352682229 | 20
2023.06.15D10:49:20.352682229 | 13
If next we upload fileThree.txt to the bucket we see:
2023.06.15D10:49:42.323727244 | 908
2023.06.15D10:49:42.323727244 | 360
2023.06.15D10:49:42.323727244 | 522
2023.06.15D10:49:42.323727244 | 257
2023.06.15D10:49:42.323727244 | 858
.qsp.run
  .qsp.read.fromAzureStorage[`:ms://bucket/numbers.txt; `myaccount; .qsp.use ``mode!``text]
  .qsp.map["J"$]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..
Reading multiple text objects by line:
.qsp.run
  .qsp.read.fromAzureStorage[(`:ms://bucket/numbersOne.txt;`:ms://bucket/numbersTwo.txt);
     `myaccount; .qsp.use ``mode!``text]
  .qsp.map["J"$]
  .qsp.write.toConsole[]
2021.09.10D10:28:37.177672179 | 44
2021.09.10D10:28:37.177672179 | 858
2021.09.10D10:28:37.177672179 | 390
2021.09.10D10:28:37.177672179 | 60
2021.09.10D10:28:37.177672179 | 631
..
2021.09.10D10:28:37.177672179 | 324
2021.09.10D10:28:37.177672179 | 214
2021.09.10D10:28:37.177672179 | 877
2021.09.10D10:28:37.177672179 | 96
2021.09.10D10:28:37.177672179 | 102
..
Reading with a custom tenant:
.qsp.run
  .qsp.read.fromAzureStorage[`:ms://bucket/object; .qsp.use `account`tenant!("myaccount"; "custom")]
  .qsp.write.toVariable[`output]
Subscribing to file events:
An API exists which allows you to subscribe to file progress events and track the progress of file readers. The docs describe how to utilize it with examples.
sp.read.from_azure_storage('ms://mycontainer/numbers.txt')
URL format ms:// URLs
This operator requires URLs to be in the format ms://yourContainerName/path/to/file.csv,
so the URL https://myStorageAccount.blob.core.windows.net/myContainerName/path/to/file.csv
would be written as
sp.read.fromAzureStorage["ms://myContainerName/path/to/file.csv"; "myStorageAccount"]
Parameters:
| name | type | description | default | 
|---|---|---|---|
| path | string or string[] or symbol or symbol[] | The Blob URI of an object or multiple objects to read from Microsoft Azure Storage. Note that this must be an ms://URL*, not anhttps://URL. Glob patterns are supported. | Required | 
| account | string or symbol | The Azure account to read from. | $AZURE_STORAGE_ACCOUNT | 
options:
| name | type | description | default | 
|---|---|---|---|
| mode | symbol | How the content of the file should be interpreted by the reader. This must be a member of the sp.read.FileMode enumeration. | binary | 
| tenant | string | The authorization tenant. | "" | 
| offset | long | DEPRECATED How many bytes into the file reading should begin. | 0 | 
| chunking | symbol | A FileChunkingenum value, or string equivalent. | auto | 
| chunk_size | long or string | The size of chunks to read when chunking is enabled. Can be specified as an integer number of bytes, or as a string with the unit, e.g. '1MB'. | "1MB" | 
| domain | string | A custom Azure domain. | "" | 
| credentials | string | The secret name for the Azure credentials. Refer to the authentication section below for more information. | "" | 
| watch | boolean or dictionary | Either True, False, or a dictionary containing method(a string) andfrequency(a timedelta). The only current method is"timer". The default frequency istimedelta(seconds=5)If watch is set to True, use the watch defaults. | 0b | 
Returns:
A from_azure_storage reader, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_azure_storage('ms://mycontainer/numbers.txt',
            account='myaccount',mode='text')
        | sp.write.to_variable('out'))
>>> kx.q('out')
"556"
"465"
"63"
"106"
"46"
Required Permissions
This reader requires the ability to list and retrieve objects on the defined Azure blob storage
buckets and as such requires the credentials provided by the user to have access to the
following operations List Blobs
and Get Blobs
follow the supplied links for more information.
Authentication
The Microsoft Azure Storage reader uses kurl for credential discovery.
See credential discovery
in the Azure tab for more information. When running on Azure provisioned cloud infrastructure,
the credential discovery will automatically use the credentials of the  user that launched the
instance.
Environment variable authentication:
To setup authentication using environment variables, set AZURE_STORAGE_ACCOUNT to the name
of the storage account to read from and AZURE_STORAGE_SHARED_KEY to one of the keys for
that account. To use shared keys, refer to
these instructions
For Docker based configurations, set the variables in the worker image configuration.
docker-compose:
  
version: "3.3"
services:
  worker:
    image: portal.dl.kx.com/kxi-sp-worker:1.13.0
    environment:
      AZURE_STORAGE_ACCOUNT: "123abc"
      AZURE_STORAGE_SHARED_KEY: "aAabBbcCcdDd111222333"
For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.
curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name     : "ms-reader",
        type     : "spec",
        config   : { content: $spec },
        settings : { minWorkers: "1", maxWorkers: "10" },
        env      : {
            AZURE_STORAGE_ACCOUNT: "123abc",
            AZURE_STORAGE_SHARED_KEY: "aAabBbcCcdDd111222333"
        }
    }' | jq -asR .)"
Kubernetes secrets for authentication:
When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.
First, create a secret using a generic secret. Take care to ensure the secret is created in the correct namespace.
kubectl create secret generic --from-literal token="aAabBbcCcdDd111222333" mscreds
Next, add the secret name and the account name to the Azure Storage Reader reader configuration.
.qsp.read.fromAzureStorage[`:ms://bucket/hello; "abc123"; .qsp.use``credentials!``mscreds]
Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.
curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "ms-reader",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets : ["mscreds"] }
    }' | jq -asR .)"
MQTT
Subscribes to an MQTT topic.
.qsp.read.fromMQTT[topic;broker]
.qsp.read.fromMQTT[topic;broker; .qsp.use (!) . flip (
    (`username; username);
    (`password; password))]
Parameters:
| name | type | description | default | 
|---|---|---|---|
| topic | string or symbol | The MQTT topic to subscribe to. | Required | 
| broker | string or symbol | URI of MQTT broker in form of protocol://host:port.protocolmust be one of "tcp" or "ssl". SSL options such assslCertare ignored when protocol is "tcp". | Required | 
options:
| name | type | description | default | 
|---|---|---|---|
| username | string or symbol | Username for the MQTT broker connection. | "" | 
| password | string or symbol | Password for the MQTT broker connection. | "" | 
| sslCert | string or symbol | Path of SSL/TLS certificate PEM file. | "" | 
| sslKey | string or symbol | Path of SSL/TLS key PEM file. | "" | 
| sslKeyPassword | string or symbol | Password of private key file if encrypted. | "" | 
| sslCAFile | string or symbol | Path of SSL/TLS public certificate chain. | "" | 
| sslCAPath | string or symbol | Path to directory of trusted certificates for SSL connection. | "" | 
| sslSecret | string or symbol | Name of Kubernetes secret to read SSL configuration from. | "" | 
For all common arguments, refer to configuring operators
This operator subscribes to an MQTT broker, pushing messages published to that topic to the pipeline.
Quality of Service
This operator only supports quality of service 0 (at most once messaging). Messages published at a higher QoS will be received at QoS 0.
Determinism
This reader is non-deterministic, as only quality of service 0 (at most once messaging) has been implemented. As such, the flag KXI_ALLOW_NONDETERMINISM must be set to "true" to use this reader.
Subscribe to topic "readings":
.qsp.run
    .qsp.read.fromMQTT["readings"; "tcp://localhost:1883"]
    .qsp.write.toConsole[];
$ mosquitto_pub -t readings -m "Hello, World!"
2022.07.21D14:51:45.023073601 | "Hello, World!"
Subscribe to a topic "readings" using SSL
.qsp.run
     .qsp.read.fromMQTT[.qsp.use (!) . flip (
         (`topic         ; "readings");
         (`broker        ; "ssl://localhost:1883");
         (`sslCert       ; "/ssl/cert.pem");
         (`sslKey        ; "/ssl/key.pem");
         (`sslKeyPassword; "iamsecure");
         (`sslCAFile     ; "/ssl/ca.pem"))];
     .qsp.write.toConsole[]
sp.read.from_mqtt('trades', 'tcp://localhost:1883')
Parameters:
| name | type | description | default | 
|---|---|---|---|
| topic | string or symbol | The name of the topic to subscribe to. | Required | 
| broker | string or symbol | The address of the MQTT broker. | Required | 
options:
| name | type | description | default | 
|---|---|---|---|
| username | string or symbol | Username for the MQTT broker. | "" | 
| password | string or symbol | Password for the MQTT broker. | "" | 
| sslCert | string or symbol | Path of SSL/TLS certificate PEM file. | "" | 
| sslKey | string or symbol | Path of SSL/TLS key PEM file. | "" | 
| sslKeyPassword | string or symbol | Password of private key file if encrypted. | "" | 
| sslCAFile | string or symbol | Path of SSL/TLS public certificate chain. | "" | 
| sslCAPath | string or symbol | Path to directory of trusted certificates for SSL connection. | "" | 
| sslSecret | string or symbol | Name of Kubernetes secret to read SSL configuration from. | "" | 
Returns:
A from_mqtt reader, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> sp.run(sp.read.from_http('readings', 'tcp://localhost:1883')
            | sp.write.to_variable('out'))
>>> kx.q('out')
$ mosquitto_pub -t readings -m "Hello, World!"
"Hello, World!"
Quality of Service
This operator only supports quality of service 0 (at most once messaging). Messages published at a higher QoS will be received at QoS 0.
Determinism
This reader is non-deterministic, and the flag KXI_ALLOW_NONDETERMINISM must be set to "true" to use this reader.
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_mqtt('readings', 'tcp://localhost:1883')
        | sp.write.to_variable('out'))
>>> kx.q('out')
$ mosquitto_pub -t readings -m "Hello, World!"
"Hello, World!"
Parquet
Reads parquet file contents into a pipeline.
The parquet file reader reads the given parquet file, entering the parquet file contents (table or mixed list of arrays) into the pipeline.
Performance and Memory Efficiency
The defaults are generally tuned towards good performance. The plugin may not handle parquet files which are larger than the total amount of memory. To handle such files, you can partition them into separate tabular datasets to be efficiently processed by the arrowkdb APIs for partitioning and metadata.
.qsp.read.fromParquet[path]
.qsp.read.fromParquet[path; .qsp.use (!) . flip (
    (`mode         ; mode);
    (`metadata     ; metadata);
    (`region       ; region);
    (`certificates ; certificates);
    (`credentials  ; credentials);
    (`connection   ; connection);
    (`project      ; project);
    (`domain       ; domain);
    (`tenant       ; tenant);
    (`watch        ; watch))]
Parameters:
| name | type | description | default | 
|---|---|---|---|
| path | symbol or symbol[] or string or string[] | A file path or list of file paths of parquet file(s). Glob patterns are supported. | Required | 
options:
| name | type | description | default | 
|---|---|---|---|
| mode | symbol | Either tableorlists. | table | 
| metadata | dictionary | Metadata keys with their types to be applied. | (::) | 
| region | string | The AWS region to authenticate against. | us-east-1 | 
| certificates | string | Location of default trust store of SSL certificates. | "/opt/kx/ca-bundle.crt" | 
| credentials | string | Name of a credentials secret to mount. | "" | 
| connection | string | The Azure Storage account connection string. | "" | 
| project | string | The Google Cloud Storage project ID. | "" | 
| domain | string | The name of your domain. | "" | 
| tenant | symbol | The authentication tenant. | ` | 
| watch | boolean or dictionary | Either 0bor1bor a dictionary with keysmethodandfrequency. The only current method istimer. The default frequency is0D00:00:05. If watch set to1buse the watch defaults. | 0b | 
For all common arguments, refer to configuring operators
This operator decodes a Parquet file into either a kdb+ table or mixed list of arrays.
Parquet mode
Setting the parquet mode to table will read content as a native KDB table.
When reading a parquet file as lists, data will be read as list of arrays.
Each array represents a single column.
Parquet metadata
Metadata maps tag values taken as symbols or strings to KDB datatypes named at https://code.kx.com/q/basics/datatypes/. Valid examples of such metadata might be: `year`month!`int`short or ("year";"month")!("int";"short").
Partitioning
The pipeline will automatically scale the number of workers to the minimum
of the number of file paths and the maxWorkers in the pipeline settings.
The workers will read a disjoint union of the paths. You can disable
partitioning by setting  maxWorkers to 1. Note that partitioning is not
currently possible when there is more than one reader.
Parquet file watching
Setting parquet file watching to 1b will enable parquet file watching with
default settings. Usually this will be paired with a glob pattern for the
parquet file path or list of glob patterns. The reader will continually
watch for new parquet files matching the parquet file path(s) provided
using the watch method. Partitioning is not possible when using file
watching. Note that by using watching the pipeline will continue watching
until there is manual intervention to finish the pipeline. Also note, at
present the parquet file watcher will do nothing other than print a warning
if a file is modified. In particular for parquet files that are appended to,
the new data will not be processed.
File watching should not to be used with the directWrite option in .qsp.write.toDatabase
You should not set file watch to 1b when using the directWrite in Database Writer
since directWrite relies on a definitive finish point which is not guaranteed when using the file watcher.
This is not an issue with .qsp.v2.write.toDatabase.
Parquet chunking
Chunking is not possible with the current reader. Since parquet files include a file footer which is mandatory with each file, a file cannot be parsed by simply partitioning the file. To partition parquet files, go to parquet.apache.org and follow the guide to split parquet files.
Operator can only watch from one domain and tenant
When reading in multiple files there must only be one domain and tenant used to access all file paths passed into the operator.
Support of Apache Arrow compression algorithms
This reader currently only supports Parquet data stored in Snappy compression format.
Examples
These examples assume you have Parquet files stored locally or in object storage buckets.
Decode a Parquet file into a kdb+ table:
/ -- Reading to a kdb+ table
input: "/tmp/table.parquet"
.qsp.run
    .qsp.read.fromParquet[input]
    .qsp.write.toVariable[`output]
output
tm                            sym     px    qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX"   101.9 10
2023.06.09D11:00:00.000000000 "HSI"   102.8 20
2023.06.09D12:00:00.000000000 "DIA"   103.7 30
2023.06.09D13:00:00.000000000 "SPY"   104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50
Decode a Parquet file into a mixed list of arrays:
/ -- Reading the Parquet file to a list of arrays
input: "/tmp/table.parquet"
.qsp.run
    .qsp.read.fromParquet[input; .qsp.use``mode!``lists];
    .qsp.write.toVariable[`output]
output
2023.06.09D10:00:00.000000000 "AEX"   101.9 10
2023.06.09D11:00:00.000000000 "HSI"   102.8 20
2023.06.09D12:00:00.000000000 "DIA"   103.7 30
2023.06.09D13:00:00.000000000 "SPY"   104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50
Reading a Parquet file from AWS S3 registry:
input: "s3://bucket/table.parquet"
.qsp.run
    .qsp.read.fromParquet[input; .qsp.use``region!(`;"us-east-1")]
    .qsp.write.toVariable[`output]
output
tm                            sym     px    qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX"   101.9 10
2023.06.09D11:00:00.000000000 "HSI"   102.8 20
2023.06.09D12:00:00.000000000 "DIA"   103.7 30
2023.06.09D13:00:00.000000000 "SPY"   104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50
Parquet file watching from AWS S3 registry:
input: "s3://bucket/retention/interval*.parquet"
.qsp.run
    .qsp.read.fromParquet[input; .qsp.use``region`watch!(`;"us-east-1";1b)]
    .qsp.write.toVariable[`output]
output
tm                            sym     px    qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX"   101.9 10
2023.06.09D11:00:00.000000000 "HSI"   102.8 20
2023.06.09D12:00:00.000000000 "DIA"   103.7 30
2023.06.09D13:00:00.000000000 "SPY"   104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50
If next we upload intervalJul.parquet to the bucket we see:
tm                            sym      px    qty
-----------------------------------------------
2023.07.19D15:00:00.000000000 "FTXO"   106.4 60
2023.07.19D16:00:00.000000000 "OAIE"   107.3 70
2023.07.19D17:00:00.000000000 "RDIV"   108.2 80
2023.07.19D18:00:00.000000000 "KBWB"   109.1 90
2023.07.19D19:00:00.000000000 "BIGB"   110.0 100
Decode a Parquet Microsoft Azure Storage file:
/ -- Reading to a kdb+ table
input: "ms://bucket/table.parquet"
.qsp.run
    .qsp.read.fromParquet[input; .qsp.use``connection!(`;"<REDACTED>")]
    .qsp.write.toVariable[`output]
output
tm                            sym     px    qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX"   101.9 10
2023.06.09D11:00:00.000000000 "HSI"   102.8 20
2023.06.09D12:00:00.000000000 "DIA"   103.7 30
2023.06.09D13:00:00.000000000 "SPY"   104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50
Parquet watching of tabular dataset from Azure:
input: "ms://bucket/retention/year=2023/month=*/interval*.parquet"
metadata: `year`month!`int`short
.qsp.run
    .qsp.read.fromParquet[input; .qsp.use``connection`metadata`watch!(`;"<REDACTED";metadata;1b)]
    .qsp.write.toVariable[`output]
output
tm                            sym     px    qty year month
----------------------------------------------------------
2023.06.09D10:00:00.000000000 "AEX"   101.9 10  2023 6
2023.06.09D11:00:00.000000000 "HSI"   102.8 20  2023 6
2023.06.09D12:00:00.000000000 "DIA"   103.7 30  2023 6
2023.06.09D13:00:00.000000000 "SPY"   104.6 40  2023 6
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50  2023 6
If next we upload intervalJul.parquet to the bucket we see:
tm                            sym    px    qty year month
---------------------------------------------------------
2023.07.19D15:00:00.000000000 "FTXO" 106.4 60  2023 7
2023.07.19D16:00:00.000000000 "OAIE" 107.3 70  2023 7
2023.07.19D17:00:00.000000000 "RDIV" 108.2 80  2023 7
2023.07.19D18:00:00.000000000 "KBWB" 109.1 90  2023 7
2023.07.19D19:00:00.000000000 "BIGB" 110   100 2023 7
Decode a Parquet Google Cloud Storage file:
/ -- Reading to a kdb+ table
input: "gs://bucket/table.parquet"
.qsp.run
    .qsp.read.fromParquet[input; .qsp.use``project!(`;"<REDACTED>")]
    .qsp.write.toVariable[`output]
output
tm                            sym     px    qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX"   101.9 10
2023.06.09D11:00:00.000000000 "HSI"   102.8 20
2023.06.09D12:00:00.000000000 "DIA"   103.7 30
2023.06.09D13:00:00.000000000 "SPY"   104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50
Subscribing to file events:
An API exists which allows you to subscribe to file progress events and track the progress of file readers. The docs describe how to utilize it with examples.
sp.read.from_parquet('/tmp/trades.parquet')
Parameters:
| name | type | description | default | 
|---|---|---|---|
| path | symbol or symbol[] or string or string[] | An object URI of a parquet file or multiple parquet files to read from AWS/Azure/GCP or local file system. Note that this must be an [ s3:// | ms:// | 
options:
| name | type | description | default | 
|---|---|---|---|
| mode | symbol | How the content of the parquet file should be interpreted by the reader. | table | 
| metadata | dictionary | Dictionary of metadata keys with their types to be applied according to tabular datasets of arrow. | (::) | 
| region | string | Physical locations around the world where Amazon clusters data centers. | us-east-1 | 
| certificates | string | Location of default trust store of SSL certificates. | "/opt/kx/ca-bundle.crt" | 
| credentials | string | The secret name for the Amazon S3 credentials. Refer to the authentication section below for more information. | "" | 
| connection | string | The Azure Storage account connection string. | "" | 
| project | string | The Google Cloud Storage project ID. | "" | 
| domain | string | A custom Amazon S3 domain. | "" | 
| tenant | symbol | The authorization tenant. | ` | 
| watch | boolean or dictionary | The flag enables file watcher mode for a parquet cloud registry. | 0b | 
Returns:
A from_parquet reader, which can be joined to other operators or pipelines.
Examples: These examples assume you have Parquet files stored locally or in object storage buckets.
Reading a parquet file to a table:
>>> from kxi import sp
>>> sp.run(sp.read.from_parquet('/tmp/pytable.parquet')
        | sp.write.to_console(timestamp='none'))
tm                            sym     px    qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX"   101.9 10
2023.06.09D11:00:00.000000000 "HSI"   102.8 20
2023.06.09D12:00:00.000000000 "DIA"   103.7 30
2023.06.09D13:00:00.000000000 "SPY"   104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50
Reading a parquet file to a list of arrays:
>>> from kxi import sp
>>> sp.run(sp.read.from_parquet('/tmp/pytable.parquet', mode=sp.read.ParquetMode.lists)
        | sp.write.to_console(timestamp='none'))
2023.06.09D10:00:00.000000000 "AEX"   101.9 10
2023.06.09D11:00:00.000000000 "HSI"   102.8 20
2023.06.09D12:00:00.000000000 "DIA"   103.7 30
2023.06.09D13:00:00.000000000 "SPY"   104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50
Reading a parquet file from AWS S3 registry:
>>> from kxi import sp
>>> parquet = 's3://kx-insights-nm-support/parquet.parquet'
>>> sp.run(sp.read.from_parquet(parquet, region='eu-west-1')
        | sp.write.to_console(timestamp='none'))
tm                            sym     px    qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX"   101.9 10
2023.06.09D11:00:00.000000000 "HSI"   102.8 20
2023.06.09D12:00:00.000000000 "DIA"   103.7 30
2023.06.09D13:00:00.000000000 "SPY"   104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50
Parquet file watching from AWS S3:
>>> from kxi import sp
>>> parquet = 's3://bucket/retention/interval*.parquet'
>>> sp.run(sp.read.from_parquet(parquet, region='eu-west-1', watch=True)
        | sp.write.to_console(timestamp='none'))
tm                            sym     px    qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX"   101.9 10
2023.06.09D11:00:00.000000000 "HSI"   102.8 20
2023.06.09D12:00:00.000000000 "DIA"   103.7 30
2023.06.09D13:00:00.000000000 "SPY"   104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50
If next we upload intervalJul.parquet to the bucket we see:
tm                            sym      px    qty
-----------------------------------------------
2023.07.19D15:00:00.000000000 "FTXO"   106.4 60
2023.07.19D16:00:00.000000000 "OAIE"   107.3 70
2023.07.19D17:00:00.000000000 "RDIV"   108.2 80
2023.07.19D18:00:00.000000000 "KBWB"   109.1 90
2023.07.19D19:00:00.000000000 "BIGB"   110.0 100
Parquet watching of tabular dataset from Azure:
>>> from kxi import sp
>>> parquet = 'ms://blob/year=2023/month=*/interval*.parquet'
>>> metadata = {'year': 'int', 'month': 'short'}
>>> sp.run(sp.read.from_parquet(parquet,
        connection='<REDACTED>',
        metadata=metadata,
        watch=True)
    | sp.write.to_console(timestamp='none'))
tm                            sym     px    qty year month
----------------------------------------------------------
2023.06.09D10:00:00.000000000 "AEX"   101.9 10  2023 6
2023.06.09D11:00:00.000000000 "HSI"   102.8 20  2023 6
2023.06.09D12:00:00.000000000 "DIA"   103.7 30  2023 6
2023.06.09D13:00:00.000000000 "SPY"   104.6 40  2023 6
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50  2023 6
If next we upload intervalJul.parquet to the bucket we see:
tm                            sym    px    qty year month
---------------------------------------------------------
2023.07.19D15:00:00.000000000 "FTXO" 106.4 60  2023 7
2023.07.19D16:00:00.000000000 "OAIE" 107.3 70  2023 7
2023.07.19D17:00:00.000000000 "RDIV" 108.2 80  2023 7
2023.07.19D18:00:00.000000000 "KBWB" 109.1 90  2023 7
2023.07.19D19:00:00.000000000 "BIGB" 110   100 2023 7
Reading the parquet file from Microsoft Azure registry:
>>> from kxi import sp
>>> parquet = 'ms://bucket/table.parquet'
>>> sp.init()
>>> sp.run(sp.read.from_parquet(parquet, connection='<REDACTED>')
    | sp.write.to_console(timestamp='none'))
tm                            sym     px    qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX"   101.9 10
2023.06.09D11:00:00.000000000 "HSI"   102.8 20
2023.06.09D12:00:00.000000000 "DIA"   103.7 30
2023.06.09D13:00:00.000000000 "SPY"   104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50
Authentication
Docker authentication:
For Docker based configurations, set the variables in the worker image configuration.
docker-compose:
version: "3.3"
services:
    worker:
        image: portal.dl.kx.com/kxi-sp-worker:1.13.0
        environment:
            AWS_ACCESS_KEY_ID: "abcd"
            AWS_SECRET_ACCESS_KEY: "iamasecret"
            AZURE_STORAGE_ACCOUNT: "123abc"
            AZURE_STORAGE_SHARED_KEY: "aAabBbcCcdDd111222333"
            AZURE_STORAGE_CONNECTION_STRING: "<REDACTED>"
            GOOGLE_STORAGE_TOKEN: "123abc"
            GOOGLE_APPLICATION_CREDENTIALS: "/path/to/application_default_credentials.json"
Kubernetes secret authentication:
To setup authentication in Kubernetes cluster combine credentials of your cloud providers into a single generic secret of its corresponding namespace.
kubectl create secret generic \
--from-file credentials=$HOME/.aws/credentials \
--from-literal account="123abc" \
--from-literal token="aAabBbcCcdDd111222333" \
--from-literal connection="<REDACTED>" \
--from-literal token="123abc" \
--from-file application_default_credentials.json=$HOME/.config/gcloud/application_default_credentials.json \
pqtcreds
AWS
The Parquet file watcher uses kurl for credential discovery.
See credential discovery
in the AWS tab for more information.
Environment variable authentication:
To setup authentication using environment variables, set AWS_ACCESS_KEY_ID and
AWS_SECRET_ACCESS_KEY.
For Docker based configurations, set the variables in the worker image configuration.
docker-compose:
version: "3.3"
services:
    worker:
        image: portal.dl.kx.com/kxi-sp-worker:1.13.0
        environment:
            AWS_ACCESS_KEY_ID: "abcd"
            AWS_SECRET_ACCESS_KEY: "iamasecret"
Docker credentials file for authentication:
To load a custom configuration file into a Docker deployment, mount the credentials file
directory into the container and set KXI_SP_CONFIG_PATH to point to the configuration
directory.
version: "3.3"
services:
    worker:
        image: portal.dl.kx.com/kxi-sp-worker:1.13.0
        volumes:
            - $HOME/.aws/:/config/pqtcreds
        environment:
            KXI_SP_CONFIG_PATH: "/config"
Next, add the secret name to the Parquet reader configuration
.qsp.read.fromParquet["s3://bucket/hello.parquet"; .qsp.use``credentials!``pqtcreds]
Now deploying the pipeline will read the credentials from the AWS credentials file. Note
that the credentials file name must be credentials.
For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.
curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name     : "parquet-reader",
        type     : "spec",
        config   : { content: $spec },
        settings : { minWorkers: "1", maxWorkers: "10" },
        env      : { AWS_ACCESS_KEY_ID: "abcd", AWS_SECRET_ACCESS_KEY: "iamasecret" }
    }' | jq -asR .)"
Kubernetes secrets for authentication:
When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.
First, create a secret using an AWS credentials file. Take care to ensure the secret is created in the correct namespace.
kubectl create secret generic --from-file credentials=$HOME/.aws/credentials pqtcreds
Note that the secret must be formatted so that the variable names are lower case and there is a space either side of the equals sign like so:
[default]
aws_access_key_id = abcd1234
aws_secret_access_key = abcd1234
Next, add the secret name to the Parquet reader configuration
.qsp.read.fromParquet["s3://bucket/hello.parquet"; .qsp.use``credentials!``pqtcreds]
Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.
curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "parquet-reader",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets : ["pqtcreds"] }
    }' | jq -asR .)"
Azure
The Parquet Microsoft Azure Storage watcher uses kurl for credential discovery.
See credential discovery
in the Azure tab for more information. When running on Azure provisioned cloud
infrastructure, the credential discovery will automatically use the credentials of
the user that launched the instance.
Environment variable authentication:
To setup authentication using environment variables,
refer to Configure Azure Storage connection strings
and set AZURE_STORAGE_CONNECTION_STRING to download Parquet files from the storage.
For Parquet file watcher,
set AZURE_STORAGE_ACCOUNT to the name of the storage account to watch for
and AZURE_STORAGE_SHARED_KEY to one of the keys for that account.
To use shared keys, refer to these instructions
For Docker based configurations, set the variables in the worker image configuration.
docker-compose:
version: "3.3"
services:
    worker:
    image: portal.dl.kx.com/kxi-sp-worker:1.13.0
    environment:
        AZURE_STORAGE_ACCOUNT: "123abc"
        AZURE_STORAGE_SHARED_KEY: "aAabBbcCcdDd111222333"
        AZURE_STORAGE_CONNECTION_STRING: "<REDACTED>"
For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.
curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name     : "parquet-reader",
        type     : "spec",
        config   : { content: $spec },
        settings : { minWorkers: "1", maxWorkers: "10" },
        env      : {
            AZURE_STORAGE_ACCOUNT: "123abc",
            AZURE_STORAGE_SHARED_KEY: "aAabBbcCcdDd111222333",
            AZURE_STORAGE_CONNECTION_STRING: "<REDACTED>"
        }
    }' | jq -asR .)"
Kubernetes secrets for authentication:
When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.
First, create a secret using a generic secret. Take care to ensure the secret is created in the correct namespace.
kubectl create secret generic \
    --from-literal account="123abc" \
    --from-literal token="aAabBbcCcdDd111222333" \
    --from-literal connection="<REDACTED>" pqtcreds
Next, add the secret name and the account name to the Parquet Azure Storage Reader configuration.
.qsp.read.fromParquet["ms://bucket/*.parquet"; .qsp.use``credentials`connection`watch!(`;`pqtcreds;"<REDACTED>";1b)]
Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.
curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "parquet-reader",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets : ["pqtcreds"] }
    }' | jq -asR .)"
The Google Cloud Storage watcher uses kurl for credential discovery.
See credential discovery
in the GCP tab for more information. When running on Google provisioned cloud infrastructure,
the credential discovery will automatically use the credentials of the  user that launched the
instance.
Environment variable authentication:
To setup authentication using an environment variable,
refer to How Application Default Credentials works
and set GOOGLE_APPLICATION_CREDENTIALS to download Parquet files from the storage.
For Parquet file watcher, set GOOGLE_STORAGE_TOKEN with
the output of running gcloud auth print-access-token.
To install the Google SDK, refer to
these instructions.
By default, Google Cloud API tokens expire after 1 hour
However, you can request a longer expiration time of up to 12 hours by specifying the "expires_in" parameter when you authenticate.
For Docker based configurations, set the variables in the worker image configuration.
docker-compose:
version: "3.3"
services:
    worker:
    image: portal.dl.kx.com/kxi-sp-worker:1.13.0
    environment:
        GOOGLE_STORAGE_TOKEN: "123abc"
        GOOGLE_APPLICATION_CREDENTIALS: "/path/to/application_default_credentials.json"
For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.
curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name     : "parquet-reader",
        type     : "spec",
        config   : { content: $spec },
        settings : { minWorkers: "1", maxWorkers: "10" },
        env      : {
            GOOGLE_STORAGE_TOKEN: "123abc",
            GOOGLE_APPLICATION_CREDENTIALS: "/path/to/application_default_credentials.json"
        }
    }' | jq -asR .)"
Kubernetes secrets for authentication:
When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.
First, create a secret using a generic secret. Take care to ensure the secret is created in the correct namespace.
kubectl create secret generic \
    --from-literal token=$(gcloud auth print-access-token) \
    --from-file application_default_credentials.json=$HOME/.config/gcloud/application_default_credentials.json \
    pqtcreds
Next, add the secret name to the Google Cloud Reader reader configuration
.qsp.read.fromParquet["gs://bucket/hello.parquet"; .qsp.use``credentials!``pqtcreds]
Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.
curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" \
    '{
        name       : "parquet-reader",
        type       : "spec",
        config     : { content: $spec },
        settings   : { minWorkers: "1", maxWorkers: "10" },
        kubeConfig : { secrets : ["pqtcreds"] }
    }' | jq -asR .)"
PostgreSQL
Executes a query on a PostgreSQL database.
.qsp.read.fromPostgres[query]
.qsp.read.fromPostgres[query; database]
.qsp.read.fromPostgres[query; database; .qsp.use (!) . flip (
    (`server  ; server);
    (`port    ; port);
    (`username; username);
    (`password; password))]
Parameters:
| name | type | description | default | 
|---|---|---|---|
| query | string | Query to execute on the Postgres database. | Required | 
| database | string | Name of the database to connect to. | $KXI_SP_POSTGRES_DATABASE | 
options:
| name | type | description | default | 
|---|---|---|---|
| server | string | Address of the database to connect to. | $KXI_SP_POSTGRES_SERVER | 
| port | string | Port of database. | $KXI_SP_POSTGRES_PORT | 
| username | string | Username to authenticate with. | $KXI_SP_POSTGRES_USERNAME | 
| password | string | Password to authenticate with. | $KXI_SP_POSTGRES_PASSWORD | 
| trigger | symbol or list | How this reader should be triggered. See pull reader options for details. | `once | 
For all common arguments, refer to configuring operators
This operator executes a query on a PostgreSQL database and pushes the result into the pipeline.
After the query has completed processing, the Postgres reader will signal a 'finish' command which will teardown the pipeline if there are no other pending requests.
Reading from a table 'stocks':
.qsp.run
  .qsp.read.fromPostgres["select * from stocks"]
  .qsp.write.toConsole[]
                             | id  sym       market   name                                                  cap
-----------------------------| ---------------------------------------------------------------------------------------
2021.09.10D18:29:31.124921700| 1   "OXLC"    "NASDAQ" "Oxford Lane Capital Corp."                           "$229.56M"
2021.09.10D18:29:31.124921700| 2   "CIA"     "NYSE"   "Citizens, Inc."                                      "$350.57M"
2021.09.10D18:29:31.124921700| 3   "PEI"     "NYSE"   "Pennsylvania Real Estate Investment Trust"           "$785.28M"
2021.09.10D18:29:31.124921700| 4   "SPWR"    "NASDAQ" "SunPower Corporation"                                "$1.06B"
2021.09.10D18:29:31.124921700| 5   "DVA"     "NYSE"   "DaVita Inc."                                         "$12.88B"
2021.09.10D18:29:31.124921700| 6   "BHACW"   "NASDAQ" "Barington/Hilco Acquisition Corp."                   "n/a"
2021.09.10D18:29:31.124921700| 7   "BCRX"    "NASDAQ" "BioCryst Pharmaceuticals, Inc."                      "$446.33M"
..
sp.read.from_postgres('SELECT * FROM trades')
Any parameter (except for the query) can be omitted, in which case it will be sourced from
an environment variable following the format $KXI_SP_POSTGRES_<param> where <param> is the
parameter name in uppercase.
Auto-teardown
The pipeline will be torn down after processing a Postgres query After the query has completed processing, the Postgres reader will signal a 'finish' command which will teardown the pipeline if there are no other pending requests.
Parameters:
| name | type | description | default | 
|---|---|---|---|
| query | string | Query to execute on the Postgres database. | Required | 
| database | string | Name of the database to connect to. Defaults to $KXI_SP_POSTGRES_DATABASE. | $KXI_SP_POSTGRES_DATABASE | 
options:
| name | type | description | default | 
|---|---|---|---|
| server | string | Address of the database to connect to. Defaults to $KXI_SP_POSTGRES_SERVER. | $KXI_SP_POSTGRES_SERVER | 
| port | string | Port of the database. Defaults to $KXI_SP_POSTGRES_PORT. | $KXI_SP_POSTGRES_PORT | 
| username | string | Username to authenticate with. Defaults to $KXI_SP_POSTGRES_USERNAME. | $KXI_SP_POSTGRES_USERNAME | 
| password | string | Password to authenticate with. Defaults to $KXI_SP_POSTGRES_PASSWORD. | $KXI_SP_POSTGRES_PASSWORD | 
| trigger | symbol or list | How this reader should be triggered. See pull reader options for details. | `once | 
Returns:
A from_postgres reader, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_postgres('SELECT * FROM stocks')
        | sp.write.to_variable('out'))
>>> kx.q('out')
id  sym       market   name
----------------------------------------------------------------
1   "OXLC"    "NASDAQ" "Oxford Lane Capital Corp."
2   "CIA"     "NYSE"   "Citizens, Inc."
3   "PEI"     "NYSE"   "Pennsylvania Real Estate Investment..."
4   "SPWR"    "NASDAQ" "SunPower Corporation"
5   "DVA"     "NYSE"   "DaVita Inc."
6   "BHACW"   "NASDAQ" "Barington/Hilco Acquisition Corp."
7   "BCRX"    "NASDAQ" "BioCryst Pharmaceuticals, Inc."
SQL Server
Executes a query on an SQL Server database.
.qsp.read.fromSQLServer[query]
.qsp.read.fromSQLServer[query; database]
.qsp.read.fromSQLServer[query; database; .qsp.use (!) . flip (
    (`server  ; server);
    (`port    ; port);
    (`username; username);
    (`password; password))]
Parameters:
| name | type | description | default | 
|---|---|---|---|
| query | string | Query to execute on the SQL Server database. | Required | 
| database | string | Name of the database to connect to. | $KXI_SP_SQLSERVER_DATABASE | 
options:
| name | type | description | default | 
|---|---|---|---|
| server | string | Address of the database. | $KXI_SP_SQLSERVER_SERVER | 
| port | string | Port of database. | $KXI_SP_SQLSERVER_PORT | 
| username | string | Username to authenticate with. | $KXI_SP_SQLSERVER_USERNAME | 
| password | string | Password to authenticate with. | $KXI_SP_SQLSERVER_PASSWORD | 
| trigger | symbol or list | How this reader should be triggered. See pull reader options for details. | `once | 
For all common arguments, refer to configuring operators
This operator executes a query on a Microsoft SQL Server database and pushes the result into the pipeline.
After the query has completed processing, the SQL Server reader will signal a 'finish' command which will teardown the pipeline if there are no other pending requests.
Reading from a table 'stocks':
.qsp.run
  .qsp.read.fromSQLServer["select * from stocks"]
  .qsp.write.toConsole[]
                             | id  sym       market   name                                                  cap
-----------------------------| ---------------------------------------------------------------------------------------
2021.09.10D18:29:31.124921700| 1   "OXLC"    "NASDAQ" "Oxford Lane Capital Corp."                           "$229.56M"
2021.09.10D18:29:31.124921700| 2   "CIA"     "NYSE"   "Citizens, Inc."                                      "$350.57M"
2021.09.10D18:29:31.124921700| 3   "PEI"     "NYSE"   "Pennsylvania Real Estate Investment Trust"           "$785.28M"
2021.09.10D18:29:31.124921700| 4   "SPWR"    "NASDAQ" "SunPower Corporation"                                "$1.06B"
2021.09.10D18:29:31.124921700| 5   "DVA"     "NYSE"   "DaVita Inc."                                         "$12.88B"
2021.09.10D18:29:31.124921700| 6   "BHACW"   "NASDAQ" "Barington/Hilco Acquisition Corp."                   "n/a"
2021.09.10D18:29:31.124921700| 7   "BCRX"    "NASDAQ" "BioCryst Pharmaceuticals, Inc."                      "$446.33M"
..
sp.read.from_sqlserver('SELECT * FROM trades')
Any parameter (except for the query) can be omitted, in which case it will be sourced from
an environment variable following the format $KXI_SP_SQLSERVER_<param> where <param> is the
parameter name in uppercase.
Parameters:
| name | type | description | default | 
|---|---|---|---|
| query | string | Query to execute on the SQLServer database. | Required | 
| database | string | Name of the database to connect to. Defaults to $KXI_SP_SQLSERVER_DATABASE. | $KXI_SP_SQLSERVER_DATABASE | 
options:
| name | type | description | default | 
|---|---|---|---|
| server | string | Address of the database to connect to. Defaults to $KXI_SP_SQLSERVER_SERVER. | $KXI_SP_SQLSERVER_SERVER | 
| port | string | Port of the database. Defaults to $KXI_SP_SQLSERVER_PORT. | $KXI_SP_SQLSERVER_PORT | 
| username | string | Username to authenticate with. Defaults to $KXI_SP_SQLSERVER_USERNAME. | $KXI_SP_SQLSERVER_USERNAME | 
| password | string | Password to authenticate with. Defaults to $KXI_SP_SQLSERVER_PASSWORD. | $KXI_SP_SQLSERVER_PASSWORD | 
| trigger | symbol or list | How this reader should be triggered. See pull reader options for details. | `once | 
Returns:
A from_sqlserver reader, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_sqlserver('SELECT * FROM stocks')
        | sp.write.to_variable('out'))
>>> kx.q('out')
id  sym       market   name
----------------------------------------------------------------
1   "OXLC"    "NASDAQ" "Oxford Lane Capital Corp."
2   "CIA"     "NYSE"   "Citizens, Inc."
3   "PEI"     "NYSE"   "Pennsylvania Real Estate Investment..."
4   "SPWR"    "NASDAQ" "SunPower Corporation"
5   "DVA"     "NYSE"   "DaVita Inc."
6   "BHACW"   "NASDAQ" "Barington/Hilco Acquisition Corp."
7   "BCRX"    "NASDAQ" "BioCryst Pharmaceuticals, Inc."
Upload
Reads data supplied through an HTTP endpoint.
.qsp.read.fromUpload[uploadName]
Parameters:
| name | type | description | default | 
|---|---|---|---|
| uploadName | symbol or string | Unique name for this fromUpload operator. This must be supplied in the path of HTTP upload requests. The request path will be of the form: https://insights.kx.com/streamprocessor/upload/{uploadName} . The uploadName is passed through in the operator metadata. | Required | 
Fully Deterministic
The upload reader is fully deterministic by default. It will use event journaling to record
the raw data it is provided. In a recovery scenario it will replay the journal (from the latest
checkpoint) and return to the state it was in prior to crashing. Since all read events are journalled
it will require enough disk space to write that data. To turn this feature off set the environment variable
KXI_SP_EVENT_JOURNAL to false.
Limits and HTTP chunking
The upload limit is set to 10MB. The reader does not currently support HTTP chunking, therefore
the user is responsible for manually chunking their files and making multiple upload requests
if they wish to upload larger files.
No High Availability
Use of this node in a replicated pipeline is not supported and will fail. This will be fixed in a future release.
kdb Insights Enterprise only
The .qsp.read.fromUpload reader operates only within kdb Insights Enterprise deployments of
the Stream Processor at this time. This functionality does not operate within the kdb Insights
Stream Processor Microservice.
Optional HTTP parameters
The optional HTTP parameters are table and finish.
finish is used to finish the operator and subsequently the pipeline. This can be useful
if we pair the upload reader with a direct write.
The table parameter is unused by the upload operator, however if provided it will be passed
through in the metadata and could be used by a node downstream e.g. a map node.
The template of the HTTP request to upload data is: (See the Open API Spec for full details).
curl -X POST https://insights.kx.com/streamprocessor/upload/{uploadName}?table=z&finish=true \
   --header "Content-Type: application/octet-stream" \
   --header "Authorization: Bearer $INSIGHTS_TOKEN" \
   --data-binary "@my/file.csv"
The @ symbol in "@my/file.csv" is required to signify the string resolves to a file rather than
a literal.
Example:
Start pipeline using the fromUpload reader.
.qsp.run
  .qsp.read.fromUpload["myUniqueName"]
  .qsp.decode.csv[.qsp.use``header!``always]
  .qsp.write.toConsole[]
Send HTTP request to the coordinator to upload your file data.
>> head quotes.csv
time,ticker,bid,bidSize,ask,askSize
2023-01-09D06:03:54.726000000,AAPL,58.79616,890,5.538282,100
2023-01-09D09:05:49.815000000,AAPL,12.70558,790,47.71454,990
2023-01-10D09:24:01.982000000,AAPL,4.636787,620,50.61855,480
2023-01-10D03:58:45.586000000,MSFT,81.40808,970,23.05427,500
2023-01-10D03:16:20.405000000,FB,44.11634,560,74.11803,330
2023-01-08D09:43:50.155000000,KX,78.12387,650,26.48796,40
2023-01-09D10:55:05.862000000,AAPL,71.60091,840,66.77192,130
2023-01-09D09:03:34.867000000,KX,37.10555,130,69.64077,940
2023-01-08D07:59:00.020000000,TSLA,67.64414,500,43.57184,160
>> curl -X POST https://insights.kx.com/streamprocessor/upload/myUniqueName \
      --header "Authorization: Bearer $INSIGHTS_TOKEN"
      --data-binary "@quotes.csv"
Worker logs:
                             | time                            ticker bid           bidSize ask           askSize
-----------------------------| ----------------------------------------------------------------------------------
2023.07.20D15:37:31.641574003| "2023-01-09D06:03:54.726000000" "AAPL" "58.79616"    "890"   "5.538282"    "100"
2023.07.20D15:37:31.641574003| "2023-01-09D09:05:49.815000000" "AAPL" "12.70558"    "790"   "47.71454"    "990"
2023.07.20D15:37:31.641574003| "2023-01-10D09:24:01.982000000" "AAPL" "4.636787"    "620"   "50.61855"    "480"
2023.07.20D15:37:31.641574003| "2023-01-10D03:58:45.586000000" "MSFT" "81.40808"    "970"   "23.05427"    "500"
2023.07.20D15:37:31.641574003| "2023-01-10D03:16:20.405000000" "FB"   "44.11634"    "560"   "74.11803"    "330"
2023.07.20D15:37:31.641574003| "2023-01-08D09:43:50.155000000" "KX"   "78.12387"    "650"   "26.48796"    "40"
2023.07.20D15:37:31.641574003| "2023-01-09D10:55:05.862000000" "AAPL" "71.60091"    "840"   "66.77192"    "130"
2023.07.20D15:37:31.641574003| "2023-01-09D09:03:34.867000000" "KX"   "37.10555"    "130"   "69.64077"    "940"
2023.07.20D15:37:31.641574003| "2023-01-08D07:59:00.020000000" "TSLA" "67.64414"    "500"   "43.57184"    "160"
Use the finish parameter to finish the reader after data has been processed. If not provided it defaults to false.
>> curl -X POST https://insights.kx.com/streamprocessor/upload/myUniqueName?finish=true \
      --header "Authorization: Bearer $INSIGHTS_TOKEN"
      --data-binary "@quotes.csv"
Additionally we can finish the operator without sending any data.
>> curl -X POST https://insights.kx.com/streamprocessor/upload/myUniqueName?finish=true \
      --header "Authorization: Bearer $INSIGHTS_TOKEN"
Note that the only Content-Type the HTTP requests can use are application/octet-stream (the default).
Attempts to use any other Content-Type in the request will fail.
If for example you wish to upload a JSON file upload use content type application/octet-stream
(as opposed to application/json) and use a JSON decoder in the pipeline.
sp.read.from_upload('productionEndPoint')
Parameters:
| name | type | description | default | 
|---|---|---|---|
| uploadName | symbol or string | Unique name for this fromUpload operator. This must be supplied in the path of HTTP upload requests. The request path will be of the form: https://insights.kx.com/streamprocessor/upload/{uploadName} . The uploadName is passed through in the operator metadata. | Required | 
Returns:
    A from_upload reader, which can be joined to other operators or pipelines.
Fully Deterministic
The upload reader is fully deterministic by default. It will use event journaling to
record the raw data it is provided. In a recovery scenario it will replay the journal
(from the latest checkpoint) and return to the state it was in prior to crashing.
Since all read events are journalled it will require enough disk space to write that
data. To turn this feature off set the environment variable KXI_SP_EVENT_JOURNAL
to false.
Limits and HTTP chunking
The upload limit is set to 10MB. The reader does not currently support HTTP
chunking, therefore the user is responsible for manually chunking their files and
making multiple upload requests if they wish to upload larger files.
No High Availability
Use of this node in a replicated pipeline is not supported and will fail. This will be fixed in a future release.
kdb Insights Enterprise only
The .qsp.read.fromUpload reader operates only within kdb Insights Enterprise
deployments of the Stream Processor at this time. This functionality does not
operate within the kdb Insights Stream Processor Microservice.
Optional HTTP parameters
The optional HTTP parameters are table and finish.
finish is used to finish the operator and subsequently the pipeline. This can be
useful if we pair the upload reader with a direct write. The table parameter is
unused by the upload operator, however if provided it will be passed through in the
metadata and could be used by a node downstream e.g. a map node.
The template of the HTTP request to upload data is: (See the Open API Spec for full details).
curl -X POST \\
    https://insights.kx.com/streamprocessor/upload/{uploadName}?table=z&finish=true \\
    --header "Content-Type: application/octet-stream" \\
    --header "Authorization: Bearer $INSIGHTS_TOKEN" \\
    --data-binary "@my/file.csv"
The @ symbol in "@my/file.csv" is required to signify the string resolves to a file
rather than a literal.
Examples:
Start pipeline using the from_upload reader:
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_upload('myUniqueName')
        | sp.decode.csv(kx.q('''([]time:`$(); ticker:`$(); bid:`$();
            bidSize:`$(); ask:`$(); askSize:`$())'''), header=sp.decode.CSVHeader.always)
        | sp.write.to_console(timestamp='none'))
>> head quotes.csv
time,ticker,bid,bidSize,ask,askSize
2023-01-09D06:03:54.726000000,AAPL,58.79616,890,5.538282,100
2023-01-09D09:05:49.815000000,AAPL,12.70558,790,47.71454,990
2023-01-10D09:24:01.982000000,AAPL,4.636787,620,50.61855,480
2023-01-10D03:58:45.586000000,MSFT,81.40808,970,23.05427,500
2023-01-10D03:16:20.405000000,FB,44.11634,560,74.11803,330
2023-01-08D09:43:50.155000000,KX,78.12387,650,26.48796,40
2023-01-09D10:55:05.862000000,AAPL,71.60091,840,66.77192,130
2023-01-09D09:03:34.867000000,KX,37.10555,130,69.64077,940
2023-01-08D07:59:00.020000000,TSLA,67.64414,500,43.57184,160
>> curl -X POST https://insights.kx.com/streamprocessor/upload/myUniqueName \\
    --header "Authorization: Bearer $INSIGHTS_TOKEN" \\
    --data-binary "@quotes.csv"
Worker logs:
time                            ticker bid           bidSize
------------------------------------------------------------
"2023-01-09D06:03:54.726000000" "AAPL" "58.79616"    "890"..
"2023-01-09D09:05:49.815000000" "AAPL" "12.70558"    "790"..
"2023-01-10D09:24:01.982000000" "AAPL" "4.636787"    "620"..
"2023-01-10D03:58:45.586000000" "MSFT" "81.40808"    "970"..
"2023-01-10D03:16:20.405000000" "FB"   "44.11634"    "560"..
"2023-01-08D09:43:50.155000000" "KX"   "78.12387"    "650"..
"2023-01-09D10:55:05.862000000" "AAPL" "71.60091"    "840"..
"2023-01-09D09:03:34.867000000" "KX"   "37.10555"    "130"..
"2023-01-08D07:59:00.020000000" "TSLA" "67.64414"    "500"..
>> curl -X POST https://insights.kx.com/streamprocessor/upload/myUniqueName?finish=true \\
    --header "Authorization: Bearer $INSIGHTS_TOKEN" \\
    --data-binary "@quotes.csv"
>> curl -X POST https://insights.kx.com/streamprocessor/myUniqueName?finish=true \\
    --header "Authorization: Bearer $INSIGHTS_TOKEN"
application/octet-stream
(the default). Attempts to use any other Content-Type in the request will fail. If for
example you wish to upload a JSON file upload use content type application/octet-stream
(as opposed to application/json) and use a JSON decoder in the pipeline.