kxi.sp.read
Stream Processor readers.
Readers are a specialized type of operator that allow users to feed data from different external
data sources into a streaming pipeline. Each reader has its own start
, stop
, setup
, and
teardown
functions to handle different streaming lifecycle events. Readers are assumed to be
asynchronous and push data into a pipeline using sp.push
.
Some readers, such as from_callback
and
from_kafka
, have implemented a stream-partitioning
interface. When using these readers, the partitions are distributed over multiple Workers,
orchestrated by a Controller.
FileChunking Objects
class FileChunking(IntEnum)
Enum for file chunking options.
Chunking a file splits the file into smaller batches, and streams the batches through the pipeline.
These enum values can be provided as True
or False
for enabled
and disabled
respectively.
disabled
Do not split the file into chunks.
enabled
Split the file into chunks.
auto
Automatically determine the size of the target file, and if it is sufficiently large (more than a few megabytes) it will be read in chunks.
FileMode Objects
class FileMode(AutoNameEnum)
Enum for file mode options.
These enum values can be provided as enum member objects (e.g. FileMode.binary
), or as
strings matching the names of members (e.g. 'binary'
).
binary
Read the content of the file into a byte vector.
text
Read the content of the file into strings, and split on newlines.
ParquetMode Objects
class ParquetMode(AutoNameEnum)
Enum for parquet file mode options.
These enum values can be provided as enum member objects (e.g. ParquetMode.table
), or as
strings matching the names of members (e.g. 'table'
).
table
Read the content of the parquet file into a table.
lists
Read the content of the parquet file into list of arrays.
from_callback
@Reader
def from_callback(callback: Union[str, kx.SymbolAtom],
*,
partitions: List = (),
key: Optional[Union[str, kx.SymbolAtom]] = None,
replay: Union[bool, kx.BooleanAtom] = False) -> Reader
Read from a callback function defined in the global q namespace.
Notes:
To enable beta features, set the environment variable KXI_SP_BETA_FEATURES
to true
.
Arguments:
callback
- The name of the callback function that will be defined in the global q namespace.partitions
- A list of partition identifiers to distribute over available workers.key
- Name of the field which contains the key of the published event, orNone
for unkeyed data.replay
- (Beta feature) If True, message replay is enabled for the Callback reader. On recovery, messages that arrived after the last checkpoint will be pushed to the pipeline.external
- Allow users to execute the callback function via an IPC connection.
Returns:
A from_callback
reader, which can be joined to other operators or pipelines.
This operator defines callback
as a function in the q global namespace, causing data passed
to the function (locally or over IPC) to enter the pipeline data flow.
If using replay, you must set $KXI_SP_EVENT_JOURNAL
to "true", and set $KXI_SP_JOURNAL_DIR
to the directory where you'd like the event journals to be stored. Event journals are files
containing the messages published to the callback reader. Messages are replayed from the
journals on recovery. If $KXI_SP_JOURNAL_DIR
is not set, it defaults to being
$KXI_SP_CHECKPOINT_DIR/journals
.
Note that enabling replay adversely affects performance, and is disabled by default for this reason.
Examples:
Calling a callback multiple times, with a window to batch the data:
>>> from kxi import sp
>>> import numpy as np
>>> import pykx as kx
>>> sp.init()
>>> sp.run(sp.read.from_callback('publish')
| sp.window.timer(np.timedelta64(10, 's'), count_trigger=12)
| sp.map(lambda x: x * x)
| sp.write.to_console(timestamp='none'))
>>> kx.q('publish', range(4))
>>> kx.q('publish', range(4))
>>> kx.q('publish', range(10))
0 1 4 9 0 1 4 9 0 1 4 9 16 25 36 49 64 81
Callback with message replay:
>>> from kxi import sp
>>> import pykx as kx
>>> import os
>>> os.makedirs('/tmp/journals')
>>> os.environ['KXI_SP_JOURNAL_DIR'] = '/tmp/journals'
>>> pipeline = (sp.read.from_callback('publish', replay=True)
| sp.write.to_variable('out'))
>>> sp.run(pipeline)
>>> kx.q('publish', 0)
>>> kx.q('publish', 1)
>>> kx.q('out')
0 1
>>> sp.teardown()
>>> sp.run(pipeline)
>>> kx.q('out')
0 1
from_expr
@Reader
def from_expr(expr: Union[str, bytes, Callable]) -> Reader
Evaluate expression or function into the pipeline.
Arguments:
expr
- Either a q expression as a string, which will be evaluated to produce data for the pipeline, or a nullary function, which will be called to produce data for the pipeline.
Returns:
A from_expr
reader, which can be joined to other operators or pipelines.
>>> from kxi import sp
>>> sp.run(sp.read.from_expr('til 10')
| sp.write.to_variable('out'))
>>> kx.q('out')
0 1 2 3 4 5 6 7 8 9
>>> from kxi import sp
>>> def nullary_func():
return range(0, 10)
>>> sp.run(sp.read.from_expr(nullary_func)
| sp.write.to_variable('out'))
>>> kx.q('out')
0 1 2 3 4 5 6 7 8 9
from_file
@Reader
def from_file(path: Union[os.PathLike, List[os.PathLike]],
mode: FileMode = FileMode.binary,
*,
offset: int = 0,
chunking: FileChunking = FileChunking.auto,
chunk_size: Union[int, CharString] = '1MB') -> Reader
Read file contents into the pipeline.
Arguments:
path
- A filepath or list of file paths. Glob patterns are supported.mode
- How the content of the file should be interpreted by the reader. This must be a member of the sp.read.FileMode enumeration.offset
- How many bytes into the file reading should begin.chunking
- If/how the file should be split into chunks.chunk_size
- The size of chunks to read when chunking is enabled. Can be specified as an integer number of bytes, or as a string with the unit, e.g.'1MB'
.
Returns:
A from_file
reader, which can be joined to other operators or pipelines.
KafkaOffset Objects
class KafkaOffset(IntEnum)
Where to start consuming a Kafka partition.
beginning
Start consumption at the beginning of the partition.
end
Start consumption at the end of the partition.
from_kafka
@Reader
def from_kafka(topic: str,
brokers: Union[str, List[str]] = 'localhost:9092',
*,
retries: int = 10,
retry_wait: Timedelta = (1, 's'),
poll_limit: int = 1000,
offset: Optional[Dict[int, KafkaOffset]] = None,
options: Optional[Dict[str, Any]] = None,
registry: Optional[Union[CharString, str]] = '',
as_list: Optional[bool] = False) -> Reader
Consume data from a Kafka topic.
Note: Maximum poll limit The Kafka reader reads multiple messages in a single poll iteration. A global limit is imposed on the number of messages to read in a single cycle to avoid locking the process in a read loop only serving Kafka messages. By default this limit is set to 1000 messages. Setting the configuration option pollLimit will change this value. This limit is global so if multiple readers set this value, only one will be used.
Note: Reserved keys
group.id
, metadata.broker.list
, and consumer.id
are reserved values and are maintained
directly by the Kafka reader. The Kafka reader will error if these options are used.
Running with TLS
It is recommended for all production deployments that TLS be enabled for encrypting data in transit. For more information about configuring TLS, refer to either the microservice tutorial or the platform tutorial depending on your deployment type.
Offset Committing and Group Permissions
sp.read.from_kafka
will automatically commit offsets when reading from a kafka broker.
This is essential for exactly once semantics and fault tolerance.
The error "Local: Waiting for coordinator" will occur when attempting to commit
offsets if you do not have sufficient group permissions. The broker should be configured so that
the user has group Describe permissions. Since sp.read.from_kafka
will generate a
random group ID, ensure the user is permissioned for all group names on the broker.
Arguments:
topic
- The name of a topic.brokers
- Brokers identified a'host:port'
string, or a list of'host:port'
strings.retries
- Maximum number of retries that will be attempted for Kafka API calls.retry_wait
- How long to wait between retry attempts.poll_limit
- Maximum number of records to process in a single poll loop.offset
- Dictionary mapping from partition IDs to their offsets.options
- Dictionary of [Kafka consumer options] (https://docs.confluent.io/5.5.1/clients/librdkafka/md_CONFIGURATION.html).registry
- Optional URL to a Kafka Schema Registry. When provided, Kafka Schema Registry mode is enabled, allowing for payload decoding.as_list
- Boolean value which, when set to True results in Kafka Schema Registry messages omitting field names when decoding Protocol Buffer schemas and returning values as a list.
Returns:
A from_kafka
reader, which can be joined to other operators or pipelines.
>>> from kxi import sp
>>> sp.run(sp.read.from_kafka('numbers')
| sp.write.to_variable('out'))
4
7
3
from_postgres
@Reader
def from_postgres(query: CharString,
database: Optional[CharString] = None,
server: Optional[CharString] = None,
port: Optional[CharString] = None,
username: Optional[CharString] = None,
password: Optional[CharString] = None) -> Reader
Execute query on a PostgreSQL database.
Any parameter (except for the query) can be omitted, in which case it will be sourced from
an environment variable following the format $KXI_SP_POSTGRES_<param>
where <param>
is the
parameter name in uppercase.
Note: The pipeline will be torn down after processing a Postgres query After the query has completed processing, the Postgres reader will signal a 'finish' command which will teardown the pipeline if there are no other pending requests.
Arguments:
query
- Query to execute on the Postgres database.database
- Name of the database to connect to. Defaults to$KXI_SP_POSTGRES_DATABASE
.server
- Address of the database to connect to. Defaults to$KXI_SP_POSTGRES_SERVER
.port
- Port of the database. Defaults to$KXI_SP_POSTGRES_PORT
.username
- Username to authenticate with. Defaults to$KXI_SP_POSTGRES_USERNAME
.password
- Password to authenticate with. Defaults to$KXI_SP_POSTGRES_PASSWORD
.
Returns:
A from_postgres
reader, which can be joined to other operators or pipelines.
>>> from kxi import sp
>>> sp.run(sp.read.from_postgres('SELECT * FROM stocks')
| sp.write.to_variable('out'))
| id sym market name
-----------------------------| ----------------------------------------------------------------
2021.09.10D18:29:31.124921700| 1 "OXLC" "NASDAQ" "Oxford Lane Capital Corp."
2021.09.10D18:29:31.124921700| 2 "CIA" "NYSE" "Citizens, Inc."
2021.09.10D18:29:31.124921700| 3 "PEI" "NYSE" "Pennsylvania Real Estate Investment..."
2021.09.10D18:29:31.124921700| 4 "SPWR" "NASDAQ" "SunPower Corporation"
2021.09.10D18:29:31.124921700| 5 "DVA" "NYSE" "DaVita Inc."
2021.09.10D18:29:31.124921700| 6 "BHACW" "NASDAQ" "Barington/Hilco Acquisition Corp."
2021.09.10D18:29:31.124921700| 7 "BCRX" "NASDAQ" "BioCryst Pharmaceuticals, Inc."
from_sqlserver
@Reader
def from_sqlserver(query: CharString,
database: Optional[CharString] = None,
server: Optional[CharString] = None,
port: Optional[Union[int, CharString]] = None,
username: Optional[CharString] = None,
password: Optional[CharString] = None) -> Reader
Execute query on a SQLServer database.
Any parameter (except for the query) can be omitted, in which case it will be sourced from
an environment variable following the format $KXI_SP_SQLSERVER_<param>
where <param>
is the
parameter name in uppercase.
Arguments:
query
- Query to execute on the SQLServer database.database
- Name of the database to connect to. Defaults to$KXI_SP_SQLSERVER_DATABASE
.server
- Address of the database to connect to. Defaults to$KXI_SP_SQLSERVER_SERVER
.port
- Port of the database. Defaults to$KXI_SP_SQLSERVER_PORT
.username
- Username to authenticate with. Defaults to$KXI_SP_SQLSERVER_USERNAME
.password
- Password to authenticate with. Defaults to$KXI_SP_SQLSERVER_PASSWORD
.
Returns:
A from_sqlserver
reader, which can be joined to other operators or pipelines.
>>> from kxi import sp
>>> sp.run(sp.read.from_sqlserver('SELECT * FROM stocks')
| sp.write.to_variable('out'))
| id sym market name
-----------------------------| ----------------------------------------------------------------
2021.09.10D18:29:31.124921700| 1 "OXLC" "NASDAQ" "Oxford Lane Capital Corp."
2021.09.10D18:29:31.124921700| 2 "CIA" "NYSE" "Citizens, Inc."
2021.09.10D18:29:31.124921700| 3 "PEI" "NYSE" "Pennsylvania Real Estate Investment..."
2021.09.10D18:29:31.124921700| 4 "SPWR" "NASDAQ" "SunPower Corporation"
2021.09.10D18:29:31.124921700| 5 "DVA" "NYSE" "DaVita Inc."
2021.09.10D18:29:31.124921700| 6 "BHACW" "NASDAQ" "Barington/Hilco Acquisition Corp."
2021.09.10D18:29:31.124921700| 7 "BCRX" "NASDAQ" "BioCryst Pharmaceuticals, Inc."
from_stream
@Reader
def from_stream(table: Optional[str] = None,
stream: Optional[str] = None,
*,
prefix: CharString = '',
assembly: Optional[str] = None,
insights: bool = True,
index: int = 0) -> Reader
Read data using a kdb Insights Stream.
Arguments:
table
- Name of the table to filter the stream on. By default, no filtering is performed.stream
- Name of stream to subscribe to. By default, the stream specified by the$RT_SUB_TOPIC
environment variable is used.prefix
- Prefix to add to the hostname for RT cluster. By default, the prefix given by the$RT_TOPIC_PREFIX
environment variable is used.assembly
- The kdb Insights assembly to read from. By default, no assembly is used.insights
- Whether the stream being subscribed to uses Insights message formats.index
- The position in the stream to replay from.
Returns:
A from_stream
reader, which can be joined to other operators or pipelines.
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_stream('trace', 'north')
| sp.write.to_variable('out'))
>>> kx.q('out')
date val
--------------
2023.09.26 0
2023.09.26 1
2023.09.26 2
2023.09.26 3
2023.09.26 4
2023.09.26 5
2023.09.26 6
2023.09.26 7
2023.09.26 8
2023.09.26 9
from_amazon_s3
@Reader
def from_amazon_s3(path: Union[CharString, List[CharString]],
mode: FileMode = FileMode.binary,
*,
offset: int = 0,
chunking: FileChunking = FileChunking.auto,
chunk_size: Union[int, CharString] = '1MB',
tenant: CharString = '',
domain: CharString = '',
region: CharString = 'us-east-1',
credentials: CharString = '',
watch: Union[bool, dict] = False) -> Reader
Reads a file from Amazon S3.
Arguments:
path
- The S3 URI of an object or multiple objects to read from S3. Glob patterns are supported.mode
- How the content of the file should be interpreted by the reader. This must be a member of the sp.read.FileMode enumeration.offset
- How many bytes into the file reading should begin.chunking
- AFileChunking
enum value, or string equivalent.chunk_size
- The size of chunks to read when chunking is enabled. Can be specified as an integer number of bytes, or as a string with the unit, e.g.'1MB'
.tenant
- The authorization tenant.domain
- A custom Amazon S3 domain.region
- The AWS region to authenticate against.credentials
- The secret name for the Amazon S3 credentials. Refer to the authentication section of the.qsp.read.fromAmazonS3
documentation for more information.watch
- Either True, False, or a dictionary containingmethod
(a string) andfrequency
(a timedelta). The only current method is"timer"
. The default frequency istimedelta(seconds=1)
. If watch is set to True, use the watch defaults.
Returns:
A from_amazon_s3
reader, which can be joined to other operators or pipelines.
```python
from kxi import sp import pykx as kx
sp.run(sp.read.from_amazon_s3(':s3://mybucket/numbers1.txt', mode='text',region='us-east-2') | sp.write.to_variable('out')) kx.q('out')
```txt "556" "465" "63" "106" "46"
from_azure_storage
@Reader
def from_azure_storage(path: Union[CharString, List[CharString]],
mode: FileMode = FileMode.binary,
*,
offset: int = 0,
chunking: FileChunking = FileChunking.auto,
chunk_size: Union[int, CharString] = '1MB',
account: CharString = '',
tenant: CharString = '',
domain: CharString = '',
credentials: CharString = '',
watch: Union[bool, dict] = False) -> Reader
Reads a file from Azure Blob Storage.
ms:// URLs
This operator requires URLs to be in the format ms://yourContainerName/path/to/file.csv
,
so the URL https://myStorageAccount.blob.core.windows.net/myContainerName/path/to/file.csv
would be written as
sp.read.fromAzureStorage["ms://myContainerName/path/to/file.csv"; "myStorageAccount"]
Arguments:
path
- The Blob URI of an object or multiple objects to read from Microsoft Azure Storage. Note that this must be anms://
URL*, not anhttps://
URL. Glob patterns are supported.mode
- How the content of the file should be interpreted by the reader. This must be a member of the sp.read.FileMode enumeration.offset
- How many bytes into the file reading should begin.chunking
- AFileChunking
enum value, or string equivalent.chunk_size
- The size of chunks to read when chunking is enabled. Can be specified as an integer number of bytes, or as a string with the unit, e.g.'1MB'
.account
- The Azure account to read from.tenant
- The authorization tenant.domain
- A custom Azure domain.credentials
- The secret name for the Azure credentials. Refer to the authentication section of the.qsp.read.fromAzureStorage
documentation for more information.watch
- Either True, False, or a dictionary containingmethod
(a string) andfrequency
(a timedelta). The only current method is"timer"
. The default frequency istimedelta(seconds=1)
. If watch is set to True, use the watch defaults.
Returns:
A from_azure_storage
reader, which can be joined to other operators or pipelines.
Notes:
ms://
URLs
This operator requires URLs to be in the format ms://yourContainerName/path/to/file.csv
,
so the URL https://myStorageAccount.blob.core.windows.net/myContainerName/path/to/file.csv
would be written as
sp.read.fromAzureStorage["ms://myContainerName/path/to/file.csv"; "myStorageAccount"]
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_azure_storage('ms://mycontainer/numbers.txt',
account='myaccount',mode='text')
| sp.write.to_variable('out'))
>>> kx.q('out')
"556"
"465"
"63"
"106"
"46"
from_google_storage
@Reader
def from_google_storage(path: Union[CharString, List[CharString]],
mode: FileMode = FileMode.binary,
*,
offset: int = 0,
chunking: FileChunking = FileChunking.auto,
chunk_size: Union[int, CharString] = '1MB',
tenant: Optional[CharString] = None,
domain: CharString = '',
watch: Union[bool, dict] = False) -> Reader
Read a file hosted on Google Cloud Storage.
Arguments:
path
- The GS URI of an object or multiple objects to read from Google Cloud Storage. Glob patterns are supported.mode
- AFileMode
enum value, or the string equivalent. This must be a member of the sp.read.FileMode enumeration.offset
- How many bytes into the file reading should begin.chunking
- AFileChunking
enum value, or string equivalent.chunk_size
- The size of chunks to read when chunking is enabled. Can be specified as an integer number of bytes, or as a string with the unit, e.g.'1MB'
.tenant
- The authentication tenant.domain
- A custom Google Cloud Storage domain.watch
- Either True, False, or a dictionary containingmethod
(a string) andfrequency
(a timedelta). The only current method is"timer"
. The default frequency istimedelta(seconds=1)
. If watch is set to True, use the watch defaults.
Returns:
A from_google_storage
reader, which can be joined to other operators or pipelines.
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(
sp.read.from_google_storage('gs://myBucket/numbers.txt', mode='text')
| sp.write.to_variable('out'))
>>> kx.q('out')
"556"
"465"
"63"
"106"
"46"
from_http
@Reader
def from_http(url: CharString,
method: CharString = 'GET',
*,
body: CharString = '',
header: dict = None,
on_response: OperatorFunction = None,
follow_redirects: bool = True,
max_redirects: int = 5,
max_retry_attempts: int = 10,
timeout: int = 0,
tenant: CharString = '',
insecure: bool = False,
binary: bool = False,
sync: bool = False,
reject_errors: bool = True) -> Reader
Requests data from an HTTP endpoint.
Arguments:
url
- The URL to send a request to.method
- The HTTP method for the HTTP request (ex. GET, POST, etc.).body
- The payload of the HTTP request.header
- A map of header fields to their corresponding values.on_response
- After a response, allows the response to be preprocessed or to trigger another request. Returning 'None' will process the return from the original request immediately. A return of a string will issue another request with the return value as the URL. A return of a dictionary allows for any of the operator parameters to be reset and a new HTTP request issued. A special 'response' key can be used in the return dictionary to change the payload of the response. If the response key is set to 'None', no data is pushed into the pipeline.follow_redirects
- If set, any redirect return will automatically be followed up to the maximum number of redirects.max_redirects
- The maximum number of redirects to follow before reporting an error.max_retry_attempts
- The number of times to retry a connection after a request timeout.timeout
- The duration in milliseconds to wait for a request to be completed before reporting an error.tenant
- The request tenant to use for providing request authentication details.insecure
- Indicates if unverified server SSL/TLS certificates should be trusted.binary
- Indicates that the resulting payload should be returned as binary data, otherwise text is assumed.sync
- Indicates if this request should be made synchronously or asynchronously. Setting the request to be synchronous will block the process until the request is completed.reject_errors
- Non-successful response codes will generate an error and stop the pipeline.
Returns:
A pipeline comprised of the from_http
reader, which can be joined to other
pipelines.
>>> from kxi import sp
>>> sp.run(sp.read.from_http('http://example.com', 'GET')
| sp.write.to_variable('out'))
>>> kx.q('out')
"<!doctype html>\n<html>\n<head>\n <title>Example Domain</title>\n\n...
Warnings:
Nondeterminism This reader is not yet deterministic. When recovering from a checkpoint, data from before the recovery will not be replayed, which can result in data loss or duplication.
from_mqtt
@Reader
def from_mqtt(topic,
broker,
*,
username: str = "",
password: str = "") -> Reader
Read from an MQTT broker.
Arguments:
topic
- The name of the topic to subscribe to.broker
- The address of the MQTT broker.username
- Username for the MQTT broker.password
- Password for the MQTT broker.
Returns:
A from_mqtt
reader, which can be joined to other operators or pipelines.
This operator subscribes to an MQTT broker, pushing messages published to that topic to the pipeline.
Notes:
Quality of Service
This operator only supports quality of service 0 (at most once messaging). Messages published at a higher QoS will be received at QoS 0.
Determinism
This reader is non-deterministic, and the flag KXI_ALLOW_NONDETERMINISM must be set to "true" to use this reader.
>>> from kxi import sp
>>> sp.run(sp.read.from_mqtt('readings', 'tcp://localhost:1883')
| sp.write.to_variable('out'))
>>> kx.q('out')
$ mosquitto_pub -t readings -m "Hello, World!"
"Hello, World!"
from_parquet
@Reader
def from_parquet(path: Union[CharString, List[CharString]],
mode: ParquetMode = ParquetMode.table,
storage: CharString = '/tmp',
region: CharString = 'us-east-1',
certificates: CharString = '/opt/kx/ca-bundle.crt',
credentials: CharString = '',
connection: CharString = '',
project: CharString = '',
domain: CharString = '',
tenant: CharString = '',
watch: bool = False) -> Reader
Read parquet file contents into the pipeline.
Arguments:
path
- An object URI of a parquet file or multiple parquet files to read from AWS/Azure/GCP or local file system. Note that this must be an [s3://
|ms://
|gs://
|/fs
] URL*, not anhttps://
URL. Glob patterns are supported.mode
- How the content of the parquet file should be interpreted by the reader.storage
- Temporary storage of downloaded parquet files.region
- Physical locations around the world where Amazon clusters data centers.certificates
- Location of default trust store of SSL certificates.credentials
- The secret name for the Amazon S3 credentials. Refer to the authentication section of the.qsp.read.fromParquet
documentation for more information.connection
- The Azure Storage account connection string.project
- The Google Cloud Storage project ID.domain
- A custom Amazon S3 domain.tenant
- The authorization tenant.watch
- The flag enables file watcher mode for a parquet cloud registry.
Returns:
A from_parquet
reader, which can be joined to other operators or pipelines.
Examples:
Reading the parquet file to a table:
>>> from kxi import sp
>>> sp.init()
>>> sp.run(sp.read.from_parquet('/tmp/pytable.parquet')
| sp.write.to_console(timestamp='none'))
tm sym px qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX" 101.9 10
2023.06.09D11:00:00.000000000 "HSI" 102.8 20
2023.06.09D12:00:00.000000000 "DIA" 103.7 30
2023.06.09D13:00:00.000000000 "SPY" 104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50
Reading the parquet file to a list of arrays:
>>> from kxi import sp
>>> sp.init()
>>> sp.run(sp.read.from_parquet('/tmp/pytable.parquet', mode=ParquetMode.lists)
| sp.write.to_console(timestamp='none'))
2023.06.09D10:00:00.000000000 "AEX" 101.9 10
2023.06.09D11:00:00.000000000 "HSI" 102.8 20
2023.06.09D12:00:00.000000000 "DIA" 103.7 30
2023.06.09D13:00:00.000000000 "SPY" 104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50
Reading the parquet file from AWS S3 registry:
>>> from kxi import sp
>>> parquet = 's3://kx-insights-nm-support/parquet.parquet'
>>> sp.init()
>>> sp.run(sp.read.from_parquet(parquet, region='eu-west-1')
| sp.write.to_console(timestamp='none'))
tm sym px qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX" 101.9 10
2023.06.09D11:00:00.000000000 "HSI" 102.8 20
2023.06.09D12:00:00.000000000 "DIA" 103.7 30
2023.06.09D13:00:00.000000000 "SPY" 104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50
Parquet file watching from AWS S3 registry:
>>> from kxi import sp
>>> parquet = 's3://bucket/retention/interval*.parquet'
>>> sp.init()
>>> sp.run(sp.read.from_parquet(parquet, region='eu-west-1', watch=True)
| sp.write.to_console(timestamp='none'))
tm sym px qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX" 101.9 10
2023.06.09D11:00:00.000000000 "HSI" 102.8 20
2023.06.09D12:00:00.000000000 "DIA" 103.7 30
2023.06.09D13:00:00.000000000 "SPY" 104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50
If next we upload intervalJul.parquet
to the bucket we see:
tm sym px qty
-----------------------------------------------
2023.07.19D15:00:00.000000000 "FTXO" 106.4 60
2023.07.19D16:00:00.000000000 "OAIE" 107.3 70
2023.07.19D17:00:00.000000000 "RDIV" 108.2 80
2023.07.19D18:00:00.000000000 "KBWB" 109.1 90
2023.07.19D19:00:00.000000000 "BIGB" 110.0 100
from_upload
@Reader
def from_upload(uploadName: Union[str, kx.SymbolAtom]) -> Reader
Reads data supplied through an HTTP endpoint.
Arguments:
uploadName
- Unique name for this fromUpload operator. This must be supplied in the path of HTTP upload requests. The request path will be of the form: https://insights.kx.com/streamprocessor/upload/{uploadName} . The uploadName is passed through in the operator metadata.
Returns:
A from_upload
reader, which can be joined to other operators or pipelines.
Notes:
Fully Deterministic
The upload reader is fully deterministic by default. It will use event journaling to
record the raw data it is provided. In a recovery scenario it will replay the journal
(from the latest checkpoint) and return to the state it was in prior to crashing.
Since all read events are journalled it will require enough disk space to write that
data. To turn this feature off set the environment variable KXI_SP_EVENT_JOURNAL
to false.
Limits and HTTP chunking
The upload limit is set to 10MB
. The reader does not currently support HTTP
chunking, therefore the user is responsible for manually chunking their files and
making multiple upload requests if they wish to upload larger files.
No High Availability
Use of this node in a replicated pipeline is not supported and will fail. This will be fixed in a future release.
kdb Insights Enterprise only
The .qsp.read.fromUpload
reader operates only within kdb Insights Enterprise
deployments of the Stream Processor at this time. This functionality does not
operate within the kdb Insights Stream Processor Microservice.
Optional HTTP parameters
The optional HTTP parameters are table
and finish
.
finish
is used to finish the operator and subsequently the pipeline. This can be
useful if we pair the upload reader with a direct write. The table
parameter is
unused by the upload operator, however if provided it will be passed through in the
metadata and could be used by a node downstream e.g. a map node.
The template of the HTTP request to upload data is: (See the Open API Spec for full details).
curl -X POST \
https://insights.kx.com/streamprocessor/upload/{uploadName}?table=z&finish=true \
--header "Content-Type: application/octet-stream" \
--header "Authorization: Bearer $INSIGHTS_TOKEN" \
--data-binary "@my/file.csv"
The ampersand in "@my/file.csv" is required to signify the string resolves to a file rather than a literal.
Examples:
Start pipeline using the from_upload reader:
>>> from kxi import sp
>>> sp.init()
>>> sp.run(sp.read.from_upload('myUniqueName')
| sp.decode.csv(kx.q('([]time:`$(); ticker:`$(); bid:`$();
bidSize:`$(); ask:`$(); askSize:`$())'), header=CSVHeader.always)
| sp.write.to_console(timestamp='none'))
>> head quotes.csv
time,ticker,bid,bidSize,ask,askSize
2023-01-09D06:03:54.726000000,AAPL,58.79616,890,5.538282,100
2023-01-09D09:05:49.815000000,AAPL,12.70558,790,47.71454,990
2023-01-10D09:24:01.982000000,AAPL,4.636787,620,50.61855,480
2023-01-10D03:58:45.586000000,MSFT,81.40808,970,23.05427,500
2023-01-10D03:16:20.405000000,FB,44.11634,560,74.11803,330
2023-01-08D09:43:50.155000000,KX,78.12387,650,26.48796,40
2023-01-09D10:55:05.862000000,AAPL,71.60091,840,66.77192,130
2023-01-09D09:03:34.867000000,KX,37.10555,130,69.64077,940
2023-01-08D07:59:00.020000000,TSLA,67.64414,500,43.57184,160
>> curl -X POST https://insights.kx.com/streamprocessor/upload/myUniqueName \
--header "Authorization: Bearer $INSIGHTS_TOKEN" \
--data-binary "@quotes.csv"
Worker logs:
time ticker bid bidSize
------------------------------------------------------------
"2023-01-09D06:03:54.726000000" "AAPL" "58.79616" "890"..
"2023-01-09D09:05:49.815000000" "AAPL" "12.70558" "790"..
"2023-01-10D09:24:01.982000000" "AAPL" "4.636787" "620"..
"2023-01-10D03:58:45.586000000" "MSFT" "81.40808" "970"..
"2023-01-10D03:16:20.405000000" "FB" "44.11634" "560"..
"2023-01-08D09:43:50.155000000" "KX" "78.12387" "650"..
"2023-01-09D10:55:05.862000000" "AAPL" "71.60091" "840"..
"2023-01-09D09:03:34.867000000" "KX" "37.10555" "130"..
"2023-01-08D07:59:00.020000000" "TSLA" "67.64414" "500"..
>> curl -X POST https://insights.kx.com/streamprocessor/upload/myUniqueName?finish=true \
--header "Authorization: Bearer $INSIGHTS_TOKEN" \
--data-binary "@quotes.csv"
>> curl -X POST https://insights.kx.com/streamprocessor/myUniqueName?finish=true \
--header "Authorization: Bearer $INSIGHTS_TOKEN"
application/octet-stream
(the default). Attempts to use any other Content-Type in the request will fail. If for
example you wish to upload a JSON file upload use content type application/octet-stream
(as opposed to application/json
) and use a JSON decoder in the pipeline.