Skip to content

Readers

Stream Processor readers.

Readers are a specialized type of operator that allow users to feed data from different external data sources into a streaming pipeline. Each reader has its own start, stop, setup, and teardown functions to handle different streaming lifecycle events. Readers are assumed to be asynchronous and push data into a pipeline using sp.push.

Some readers, such as from_callback and from_kafka, have implemented a stream-partitioning interface. When using these readers, the partitions are distributed over multiple Workers, orchestrated by a Controller.

kxi.sp.read.AmazonS3Reader (Reader)

kxi.sp.read.AzureStorageReader (Reader)

kxi.sp.read.CallbackReader (Reader)

kxi.sp.read.ExprReader (Reader)

kxi.sp.read.FileReader (Reader)

kxi.sp.read.GoogleStorageReader (Reader)

kxi.sp.read.HttpReader (Reader)

kxi.sp.read.KafkaReader (Reader)

kxi.sp.read.PostgresReader (Reader)

kxi.sp.read.SQLServerReader (Reader)

kxi.sp.read.StreamReader (Reader)

kxi.sp.read.FileChunking (IntEnum)

Enum for file chunking options.

Chunking a file splits the file into smaller batches, and streams the batches through the pipeline.

These enum values can be provided as True or False for enabled and disabled respectively.

kxi.sp.read.FileChunking.auto

Automatically determine the size of the target file, and if it is sufficiently large (more than a few megabytes) it will be read in chunks.

kxi.sp.read.FileChunking.disabled

Do not split the file into chunks.

kxi.sp.read.FileChunking.enabled

Split the file into chunks.

kxi.sp.read.FileMode (AutoNameEnum)

Enum for file mode options.

These enum values can be provided as enum member objects (e.g. FileMode.binary), or as strings matching the names of members (e.g. 'binary').

kxi.sp.read.FileMode.binary

Read the content of the file into a byte vector.

kxi.sp.read.FileMode.text

Read the content of the file into strings, and split on newlines.

kxi.sp.read.from_callback

Read from a callback function defined in the global q namespace.

Parameters:

Name Type Description Default
callback Union[str, pykx.wrappers.SymbolAtom]

The name of the callback function that will be defined in the global q namespace.

required
partitions List

A list of parition identifiers to distribute over available workers.

()

Returns:

Type Description
Reader

A from_callback reader, which can be joined to other operators or pipelines.

Examples:

Calling a callback multiple times, with a window to batch the data:

>>> from kxi import sp
>>> import numpy as np
>>> import pykx as kx
>>> sp.init()
>>> sp.run(sp.read.from_callback('publish')
    | sp.window.timer(np.timedelta64(10, 's'), count_trigger=12)
    | sp.map(lambda x: x * x)
    | sp.write.to_console(timestamp='none'))
>>> kx.q('publish', range(4))
pykx.Identity(pykx.q('::'))
>>> kx.q('publish', range(4))
pykx.Identity(pykx.q('::'))
>>> kx.q('publish', range(10))
0 1 4 9 0 1 4 9 0 1 4 9 16 25 36 49 64 81
pykx.Identity(pykx.q('::'))

kxi.sp.read.from_expr

Evaluate expression or function into the pipeline.

Parameters:

Name Type Description Default
expr Union[str, bytes, Callable]

Either a q expression as a string, which will be evaluated to produce data for the pipeline, or a nullary function, which will be called to produce data for the pipeline.

required

Returns:

Type Description
Reader

A from_expr reader, which can be joined to other operators or pipelines.

kxi.sp.read.from_file

Read file contents into the pipeline.

Parameters:

Name Type Description Default
path PathLike

The path to the file.

required
mode FileMode

How the content of the file should be interpreted by the reader.

<FileMode.binary: 'binary'>
offset int

How many bytes into the file reading should begin.

0
chunking FileChunking

If/how the file should be split into chunks.

<FileChunking.auto: 2>
chunk_size Union[int, str, bytes, pykx.wrappers.CharVector]

The size of chunks to read when chunking is enabled. Can be specified as an integer number of bytes, or as a string with the unit, e.g. '1MB'.

'1MB'

Returns:

Type Description
Reader

A from_file reader, which can be joined to other operators or pipelines.

kxi.sp.read.from_kafka

Consume data from a Kafka topic.

Maximum poll limit

The Kafka reader reads multiple messages in a single poll iteration. A global limit is imposed on the number of messages to read in a single cycle to avoid locking the process in a read loop only serving Kafka messages. By default this limit is set to 1000 messages. Setting the configuration option pollLimit will change this value. This limit is global so if multiple readers set this value, only one will be used.

Parameters:

Name Type Description Default
topic str

The name of a topic.

required
brokers Union[str, List[str]]

Brokers identified a 'host:port' string, or a list of 'host:port' strings.

'localhost:9092'
retries int

Maximum number of retries that will be attempted for Kafka API calls.

10
retry_wait Union[datetime.timedelta, numpy.timedelta64, pykx.wrappers.TimespanAtom, kxi.sp.types.TimedeltaSpec]

How long to wait between retry attempts.

(1, 's')
poll_limit int

Maximum number of records to process in a single poll loop.

1000
offset Optional[Dict[int, kxi.sp.read.KafkaOffset]]

Dictionary mapping from parition IDs to their offsets.

None
options Optional[Dict[str, Any]]

Dictionary of Kafka consumer options.

None

Returns:

Type Description
Reader

A from_kafka reader, which can be joined to other operators or pipelines.

kxi.sp.read.from_postgres

Execute query on a PostgreSQL database.

Any parameter (except for the query) can be omitted, in which case it will be sourced from an environment variable following the format $KXI_SP_POSTGRES_<param> where <param> is the parameter name in uppercase.

The pipeline will be torn down after processing a Postgres query

After the query has completed processing, the Postgres reader will signal a 'finish' command which will teardown the pipeline if there are no other pending requests.

Parameters:

Name Type Description Default
query Union[str, bytes, pykx.wrappers.CharVector]

Query to execute on the Postgres database.

required
database Union[str, bytes, pykx.wrappers.CharVector]

Name of the database to connect to. Defaults to $KXI_SP_POSTGRES_DATABASE.

None
server Union[str, bytes, pykx.wrappers.CharVector]

Address of the database to connect to. Defaults to $KXI_SP_POSTGRES_SERVER.

None
port Union[str, bytes, pykx.wrappers.CharVector]

Port of the database. Defaults to $KXI_SP_POSTGRES_PORT.

None
username Union[str, bytes, pykx.wrappers.CharVector]

Username to authenticate with. Defaults to $KXI_SP_POSTGRES_USERNAME.

None
password Union[str, bytes, pykx.wrappers.CharVector]

Password to authenticate with. Defaults to $KXI_SP_POSTGRES_PASSWORD.

None

Returns:

Type Description
Reader

A from_postgres reader, which can be joined to other operators or pipelines.

kxi.sp.read.from_sqlserver

Execute query on a SQLServer database.

Any parameter (except for the query) can be omitted, in which case it will be sourced from an environment variable following the format $KXI_SP_SQLSERVER_<param> where <param> is the parameter name in uppercase.

Parameters:

Name Type Description Default
query Union[str, bytes, pykx.wrappers.CharVector]

Query to execute on the SQLServer database.

required
database Union[str, bytes, pykx.wrappers.CharVector]

Name of the database to connect to. Defaults to $KXI_SP_SQLSERVER_DATABASE.

None
server Union[str, bytes, pykx.wrappers.CharVector]

Address of the database to connect to. Defaults to $KXI_SP_SQLSERVER_SERVER.

None
port Union[int, str, bytes, pykx.wrappers.CharVector]

Port of the database. Defaults to $KXI_SP_SQLSERVER_PORT.

None
username Union[str, bytes, pykx.wrappers.CharVector]

Username to authenticate with. Defaults to $KXI_SP_SQLSERVER_USERNAME.

None
password Union[str, bytes, pykx.wrappers.CharVector]

Password to authenticate with. Defaults to $KXI_SP_SQLSERVER_PASSWORD.

None

Returns:

Type Description
Reader

A from_sqlserver reader, which can be joined to other operators or pipelines.

kxi.sp.read.from_stream

Read data using a KX Insights Stream.

Parameters:

Name Type Description Default
table Optional[str]

Name of the table to filter the stream on. By default, no filtering is performed.

None
stream Optional[str]

Name of stream to subscribe to. By default, the stream specified by the $RT_SUB_TOPIC environment variable is used.

None
prefix Union[str, bytes, pykx.wrappers.CharVector]

Prefix to add to the hostname for RT cluster. By default, the prefix given by the $RT_TOPIC_PREFIX environment variable is used.

''
assembly Optional[str]

The KX Insights assembly to read from. By default, no assembly is used.

None
insights bool

Whether the stream being subscribed to uses Insights message formats.

True
index int

The position in the stream to replay from.

0

Returns:

Type Description
Reader

A from_stream reader, which can be joined to other operators or pipelines.

kxi.sp.read.from_amazon_s3

Reads a file from Amazon S3.

Parameters:

Name Type Description Default
path str

The path of an object to read from S3.

required
mode FileMode

How the content of the file should be interpreted by the reader.

<FileMode.binary: 'binary'>
offset int

How many bytes into the file reading should begin.

0
chunking FileChunking

A FileChunking enum value, or string equivalent.

<FileChunking.auto: 2>
chunk_size Union[int, str, bytes, pykx.wrappers.CharVector]

The size of chunks to read when chunking is enabled. Can be specified as an integer number of bytes, or as a string with the unit, e.g. '1MB'.

'1MB'
tenant Union[str, bytes, pykx.wrappers.CharVector]

The authorization tenant.

''
domain Union[str, bytes, pykx.wrappers.CharVector]

A custom Amazon S3 domain.

''
region Union[str, bytes, pykx.wrappers.CharVector]

The AWS region to authenticate against.

'us-east-1'
credentials Union[str, bytes, pykx.wrappers.CharVector]

The secret name for the Amazon S3 credentials. Refer to the authentication section of the .qsp.read.fromAmazon documentation for more information.

''

Returns:

Type Description
Reader

A from_amazon_s3 reader, which can be joined to other operators or pipelines.

kxi.sp.read.from_azure_storage

Reads a file from Azure Blob Storage.

Parameters:

Name Type Description Default
path str

The path of an object to read from Azure Blob Storage.

required
mode FileMode

How the content of the file should be interpreted by the reader.

<FileMode.binary: 'binary'>
offset int

How many bytes into the file reading should begin.

0
chunking FileChunking

A FileChunking enum value, or string equivalent.

<FileChunking.auto: 2>
chunk_size Union[int, str, bytes, pykx.wrappers.CharVector]

The size of chunks to read when chunking is enabled. Can be specified as an integer number of bytes, or as a string with the unit, e.g. '1MB'.

'1MB'
account Union[str, bytes, pykx.wrappers.CharVector]

The Azure account to read from.

''
tenant Union[str, bytes, pykx.wrappers.CharVector]

The authorization tenant.

''
domain Union[str, bytes, pykx.wrappers.CharVector]

A custom Azure domain.

''
credentials Union[str, bytes, pykx.wrappers.CharVector]

The secret name for the Azure credentials. Refer to the authentication section of the .qsp.read.fromAzureStorage documentation for more information.

''

Returns:

Type Description
Reader

A from_azure_storage reader, which can be joined to other operators or pipelines.

kxi.sp.read.from_google_storage

Read a file hosted on Google Cloud Storage.

Parameters:

Name Type Description Default
path str

The URL of the file hosted on Google Cloud Storage.

required
mode FileMode

A FileMode enum value, or the string equivalent.

<FileMode.binary: 'binary'>
offset int

How many bytes into the file reading should begin.

0
chunking FileChunking

A FileChunking enum value, or string equivalent.

<FileChunking.auto: 2>
chunk_size Union[int, str, bytes, pykx.wrappers.CharVector]

The size of chunks to read when chunking is enabled. Can be specified as an integer number of bytes, or as a string with the unit, e.g. '1MB'.

'1MB'
tenant Union[str, bytes, pykx.wrappers.CharVector]

The authentication tenant.

None

Returns:

Type Description
Reader

A from_google_storage reader, which can be joined to other operators or pipelines.

kxi.sp.read.from_http

Requests data from an HTTP endpoint.

Parameters:

Name Type Description Default
url Union[str, bytes, pykx.wrappers.CharVector]

The URL to send a request to.

required
method Union[str, bytes, pykx.wrappers.CharVector]

The HTTP method for the HTTP request (ex. GET, POST, etc.).

'GET'
body Union[str, bytes, pykx.wrappers.CharVector]

The payload of the HTTP request.

''
header dict

A map of header fields to their corresponding values.

None
on_response Union[Callable, str]

After a response, allows the response to be preprocessed or to trigger another request. Returning 'None' will process the return from the original request immediately. A return of a string will issue another request with the return value as the URL. A return of a dictionary allows for any of the operator parameters to be reset and a new HTTP request issued. A special 'response' key can be used in the return dictionary to change the payload of the response. If the response key is set to 'None', no data is pushed into the pipeline.

None
follow_redirects bool

If set, any redirect return will automatically be followed up to the maximum number of redirects.

True
max_redirects int

The maximum number of redirects to follow before reporting an error.

5
max_retry_attempts int

The number of times to retry a connection after a request timeout.

10
timeout int

The duration in milliseconds to wait for a request to be completed before reporting an error.

None
tenant Union[str, bytes, pykx.wrappers.CharVector]

The request tenant to use for providing request authentication details.

''
insecure bool

Indicates if unverified server SSL/TLS certificates should be trusted.

False
binary bool

Indicates that the resulting payload should be returned as binary data, otherwise text is assumed.

False
sync bool

Indicates if this request should be made synchronously or asynchronously. Setting the request to be synchronous will block the process until the request is completed.

False
reject_errors bool

Non-successful response codes will generate an error and stop the pipeline.

True

Returns:

Type Description
Reader

A pipeline comprised of the from_http reader, which can be joined to other pipelines.