Readers
Stream Processor readers.
Readers are a specialized type of operator that allow users to feed data from different external
data sources into a streaming pipeline. Each reader has its own start
, stop
, setup
, and
teardown
functions to handle different streaming lifecycle events. Readers are assumed to be
asynchronous and push data into a pipeline using sp.push
.
Some readers, such as from_callback
and
from_kafka
, have implemented a stream-partitioning
interface. When using these readers, the partitions are distributed over multiple Workers,
orchestrated by a Controller.
kxi.sp.read.AmazonS3Reader (Reader)
kxi.sp.read.AzureStorageReader (Reader)
kxi.sp.read.CallbackReader (Reader)
kxi.sp.read.ExprReader (Reader)
kxi.sp.read.FileReader (Reader)
kxi.sp.read.GoogleStorageReader (Reader)
kxi.sp.read.HttpReader (Reader)
kxi.sp.read.KafkaReader (Reader)
kxi.sp.read.PostgresReader (Reader)
kxi.sp.read.SQLServerReader (Reader)
kxi.sp.read.StreamReader (Reader)
kxi.sp.read.FileChunking (IntEnum)
Enum for file chunking options.
Chunking a file splits the file into smaller batches, and streams the batches through the pipeline.
These enum values can be provided as True
or False
for enabled
and disabled
respectively.
kxi.sp.read.FileChunking.auto
Automatically determine the size of the target file, and if it is sufficiently large (more than a few megabytes) it will be read in chunks.
kxi.sp.read.FileChunking.disabled
Do not split the file into chunks.
kxi.sp.read.FileChunking.enabled
Split the file into chunks.
kxi.sp.read.FileMode (AutoNameEnum)
Enum for file mode options.
These enum values can be provided as enum member objects (e.g. FileMode.binary
), or as
strings matching the names of members (e.g. 'binary'
).
kxi.sp.read.FileMode.binary
Read the content of the file into a byte vector.
kxi.sp.read.FileMode.text
Read the content of the file into strings, and split on newlines.
kxi.sp.read.from_callback
Read from a callback function defined in the global q namespace.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callback |
Union[str, pykx.wrappers.SymbolAtom] |
The name of the callback function that will be defined in the global q namespace. |
required |
partitions |
List |
A list of parition identifiers to distribute over available workers. |
() |
Returns:
Type | Description |
---|---|
Reader |
A |
Examples:
Calling a callback multiple times, with a window to batch the data:
>>> from kxi import sp
>>> import numpy as np
>>> import pykx as kx
>>> sp.init()
>>> sp.run(sp.read.from_callback('publish')
| sp.window.timer(np.timedelta64(10, 's'), count_trigger=12)
| sp.map(lambda x: x * x)
| sp.write.to_console(timestamp='none'))
>>> kx.q('publish', range(4))
pykx.Identity(pykx.q('::'))
>>> kx.q('publish', range(4))
pykx.Identity(pykx.q('::'))
>>> kx.q('publish', range(10))
0 1 4 9 0 1 4 9 0 1 4 9 16 25 36 49 64 81
pykx.Identity(pykx.q('::'))
kxi.sp.read.from_expr
Evaluate expression or function into the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
expr |
Union[str, bytes, Callable] |
Either a q expression as a string, which will be evaluated to produce data for the pipeline, or a nullary function, which will be called to produce data for the pipeline. |
required |
Returns:
Type | Description |
---|---|
Reader |
A |
kxi.sp.read.from_file
Read file contents into the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
PathLike |
The path to the file. |
required |
mode |
FileMode |
How the content of the file should be interpreted by the reader. |
<FileMode.binary: 'binary'> |
offset |
int |
How many bytes into the file reading should begin. |
0 |
chunking |
FileChunking |
If/how the file should be split into chunks. |
<FileChunking.auto: 2> |
chunk_size |
Union[int, str, bytes, pykx.wrappers.CharVector] |
The size of chunks to read when chunking is enabled. Can be specified as an
integer number of bytes, or as a string with the unit, e.g. |
'1MB' |
Returns:
Type | Description |
---|---|
Reader |
A |
kxi.sp.read.from_kafka
Consume data from a Kafka topic.
Maximum poll limit
The Kafka reader reads multiple messages in a single poll iteration. A global limit is imposed on the number of messages to read in a single cycle to avoid locking the process in a read loop only serving Kafka messages. By default this limit is set to 1000 messages. Setting the configuration option pollLimit will change this value. This limit is global so if multiple readers set this value, only one will be used.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
topic |
str |
The name of a topic. |
required |
brokers |
Union[str, List[str]] |
Brokers identified a |
'localhost:9092' |
retries |
int |
Maximum number of retries that will be attempted for Kafka API calls. |
10 |
retry_wait |
Union[datetime.timedelta, numpy.timedelta64, pykx.wrappers.TimespanAtom, kxi.sp.types.TimedeltaSpec] |
How long to wait between retry attempts. |
(1, 's') |
poll_limit |
int |
Maximum number of records to process in a single poll loop. |
1000 |
offset |
Optional[Dict[int, kxi.sp.read.KafkaOffset]] |
Dictionary mapping from parition IDs to their offsets. |
None |
options |
Optional[Dict[str, Any]] |
Dictionary of Kafka consumer options. |
None |
Returns:
Type | Description |
---|---|
Reader |
A |
kxi.sp.read.from_postgres
Execute query on a PostgreSQL database.
Any parameter (except for the query) can be omitted, in which case it will be sourced from
an environment variable following the format $KXI_SP_POSTGRES_<param>
where <param>
is the
parameter name in uppercase.
The pipeline will be torn down after processing a Postgres query
After the query has completed processing, the Postgres reader will signal a 'finish' command which will teardown the pipeline if there are no other pending requests.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
Union[str, bytes, pykx.wrappers.CharVector] |
Query to execute on the Postgres database. |
required |
database |
Union[str, bytes, pykx.wrappers.CharVector] |
Name of the database to connect to. Defaults to |
None |
server |
Union[str, bytes, pykx.wrappers.CharVector] |
Address of the database to connect to. Defaults to |
None |
port |
Union[str, bytes, pykx.wrappers.CharVector] |
Port of the database. Defaults to |
None |
username |
Union[str, bytes, pykx.wrappers.CharVector] |
Username to authenticate with. Defaults to |
None |
password |
Union[str, bytes, pykx.wrappers.CharVector] |
Password to authenticate with. Defaults to |
None |
Returns:
Type | Description |
---|---|
Reader |
A |
kxi.sp.read.from_sqlserver
Execute query on a SQLServer database.
Any parameter (except for the query) can be omitted, in which case it will be sourced from
an environment variable following the format $KXI_SP_SQLSERVER_<param>
where <param>
is the
parameter name in uppercase.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
Union[str, bytes, pykx.wrappers.CharVector] |
Query to execute on the SQLServer database. |
required |
database |
Union[str, bytes, pykx.wrappers.CharVector] |
Name of the database to connect to. Defaults to |
None |
server |
Union[str, bytes, pykx.wrappers.CharVector] |
Address of the database to connect to. Defaults to |
None |
port |
Union[int, str, bytes, pykx.wrappers.CharVector] |
Port of the database. Defaults to |
None |
username |
Union[str, bytes, pykx.wrappers.CharVector] |
Username to authenticate with. Defaults to |
None |
password |
Union[str, bytes, pykx.wrappers.CharVector] |
Password to authenticate with. Defaults to |
None |
Returns:
Type | Description |
---|---|
Reader |
A |
kxi.sp.read.from_stream
Read data using a KX Insights Stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
Optional[str] |
Name of the table to filter the stream on. By default, no filtering is performed. |
None |
stream |
Optional[str] |
Name of stream to subscribe to. By default, the stream specified by the
|
None |
prefix |
Union[str, bytes, pykx.wrappers.CharVector] |
Prefix to add to the hostname for RT cluster. By default, the prefix given by the
|
'' |
assembly |
Optional[str] |
The KX Insights assembly to read from. By default, no assembly is used. |
None |
insights |
bool |
Whether the stream being subscribed to uses Insights message formats. |
True |
index |
int |
The position in the stream to replay from. |
0 |
Returns:
Type | Description |
---|---|
Reader |
A |
kxi.sp.read.from_amazon_s3
Reads a file from Amazon S3.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path of an object to read from S3. |
required |
mode |
FileMode |
How the content of the file should be interpreted by the reader. |
<FileMode.binary: 'binary'> |
offset |
int |
How many bytes into the file reading should begin. |
0 |
chunking |
FileChunking |
A |
<FileChunking.auto: 2> |
chunk_size |
Union[int, str, bytes, pykx.wrappers.CharVector] |
The size of chunks to read when chunking is enabled. Can be specified as an
integer number of bytes, or as a string with the unit, e.g. |
'1MB' |
tenant |
Union[str, bytes, pykx.wrappers.CharVector] |
The authorization tenant. |
'' |
domain |
Union[str, bytes, pykx.wrappers.CharVector] |
A custom Amazon S3 domain. |
'' |
region |
Union[str, bytes, pykx.wrappers.CharVector] |
The AWS region to authenticate against. |
'us-east-1' |
credentials |
Union[str, bytes, pykx.wrappers.CharVector] |
The secret name for the Amazon S3 credentials. Refer to the authentication
section of the |
'' |
Returns:
Type | Description |
---|---|
Reader |
A |
kxi.sp.read.from_azure_storage
Reads a file from Azure Blob Storage.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path of an object to read from Azure Blob Storage. |
required |
mode |
FileMode |
How the content of the file should be interpreted by the reader. |
<FileMode.binary: 'binary'> |
offset |
int |
How many bytes into the file reading should begin. |
0 |
chunking |
FileChunking |
A |
<FileChunking.auto: 2> |
chunk_size |
Union[int, str, bytes, pykx.wrappers.CharVector] |
The size of chunks to read when chunking is enabled. Can be specified as an
integer number of bytes, or as a string with the unit, e.g. |
'1MB' |
account |
Union[str, bytes, pykx.wrappers.CharVector] |
The Azure account to read from. |
'' |
tenant |
Union[str, bytes, pykx.wrappers.CharVector] |
The authorization tenant. |
'' |
domain |
Union[str, bytes, pykx.wrappers.CharVector] |
A custom Azure domain. |
'' |
credentials |
Union[str, bytes, pykx.wrappers.CharVector] |
The secret name for the Azure credentials. Refer to the authentication
section of the |
'' |
Returns:
Type | Description |
---|---|
Reader |
A |
kxi.sp.read.from_google_storage
Read a file hosted on Google Cloud Storage.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The URL of the file hosted on Google Cloud Storage. |
required |
mode |
FileMode |
A |
<FileMode.binary: 'binary'> |
offset |
int |
How many bytes into the file reading should begin. |
0 |
chunking |
FileChunking |
A |
<FileChunking.auto: 2> |
chunk_size |
Union[int, str, bytes, pykx.wrappers.CharVector] |
The size of chunks to read when chunking is enabled. Can be specified as an
integer number of bytes, or as a string with the unit, e.g. |
'1MB' |
tenant |
Union[str, bytes, pykx.wrappers.CharVector] |
The authentication tenant. |
None |
Returns:
Type | Description |
---|---|
Reader |
A |
kxi.sp.read.from_http
Requests data from an HTTP endpoint.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
Union[str, bytes, pykx.wrappers.CharVector] |
The URL to send a request to. |
required |
method |
Union[str, bytes, pykx.wrappers.CharVector] |
The HTTP method for the HTTP request (ex. GET, POST, etc.). |
'GET' |
body |
Union[str, bytes, pykx.wrappers.CharVector] |
The payload of the HTTP request. |
'' |
header |
dict |
A map of header fields to their corresponding values. |
None |
on_response |
Union[Callable, str] |
After a response, allows the response to be preprocessed or to trigger another request. Returning 'None' will process the return from the original request immediately. A return of a string will issue another request with the return value as the URL. A return of a dictionary allows for any of the operator parameters to be reset and a new HTTP request issued. A special 'response' key can be used in the return dictionary to change the payload of the response. If the response key is set to 'None', no data is pushed into the pipeline. |
None |
follow_redirects |
bool |
If set, any redirect return will automatically be followed up to the maximum number of redirects. |
True |
max_redirects |
int |
The maximum number of redirects to follow before reporting an error. |
5 |
max_retry_attempts |
int |
The number of times to retry a connection after a request timeout. |
10 |
timeout |
int |
The duration in milliseconds to wait for a request to be completed before reporting an error. |
None |
tenant |
Union[str, bytes, pykx.wrappers.CharVector] |
The request tenant to use for providing request authentication details. |
'' |
insecure |
bool |
Indicates if unverified server SSL/TLS certificates should be trusted. |
False |
binary |
bool |
Indicates that the resulting payload should be returned as binary data, otherwise text is assumed. |
False |
sync |
bool |
Indicates if this request should be made synchronously or asynchronously. Setting the request to be synchronous will block the process until the request is completed. |
False |
reject_errors |
bool |
Non-successful response codes will generate an error and stop the pipeline. |
True |
Returns:
Type | Description |
---|---|
Reader |
A pipeline comprised of the |