Skip to content

Readers

Stream Processor readers.

Readers are a specialized type of operator that allow users to feed data from different external data sources into a streaming pipeline. Each reader has its own start, stop, setup, and teardown functions to handle different streaming lifecycle events. Readers are assumed to be asynchronous and push data into a pipeline using sp.push.

Some readers, such as from_callback and from_kafka, have implemented a stream-partitioning interface. When using these readers, the partitions are distributed over multiple Workers, orchestrated by a Controller.

kxi.sp.read.__all__ = ['FileChunking', 'FileMode', 'from_amazon_s3', 'from_azure_storage', 'from_callback', 'from_expr', 'from_file', 'from_google_storage', 'from_http', 'from_kafka', 'from_postgres', 'from_sqlserver', 'from_stream'] module-attribute

kxi.sp.read.Reader

Bases: Operator

kxi.sp.read.FileChunking

Bases: IntEnum

Enum for file chunking options.

Chunking a file splits the file into smaller batches, and streams the batches through the pipeline.

These enum values can be provided as True or False for enabled and disabled respectively.

kxi.sp.read.FileChunking.disabled = 0 class-attribute

Do not split the file into chunks.

kxi.sp.read.FileChunking.enabled = 1 class-attribute

Split the file into chunks.

kxi.sp.read.FileChunking.auto = 2 class-attribute

Automatically determine the size of the target file, and if it is sufficiently large (more than a few megabytes) it will be read in chunks.

kxi.sp.read.FileMode

Bases: AutoNameEnum

Enum for file mode options.

These enum values can be provided as enum member objects (e.g. FileMode.binary), or as strings matching the names of members (e.g. 'binary').

kxi.sp.read.FileMode.binary = auto() class-attribute

Read the content of the file into a byte vector.

kxi.sp.read.FileMode.text = auto() class-attribute

Read the content of the file into strings, and split on newlines.

kxi.sp.read.KafkaOffset

Bases: IntEnum

Where to start consuming a Kafka partition.

kxi.sp.read.KafkaOffset.beginning = -2 class-attribute

Start consumption at the beginning of the partition.

kxi.sp.read.KafkaOffset.end = -1 class-attribute

Start consumption at the end of the partition.

kxi.sp.read.__dir__

kxi.sp.read.from_callback

Read from a callback function defined in the global q namespace.

Parameters:

Name Type Description Default
callback Union[str, kx.SymbolAtom]

The name of the callback function that will be defined in the global q namespace.

required
partitions List

A list of parition identifiers to distribute over available workers.

()

Returns:

Type Description
Reader

A from_callback reader, which can be joined to other operators or pipelines.

Examples:

Calling a callback multiple times, with a window to batch the data:

>>> from kxi import sp
>>> import numpy as np
>>> import pykx as kx
>>> sp.init()
>>> sp.run(sp.read.from_callback('publish')
    | sp.window.timer(np.timedelta64(10, 's'), count_trigger=12)
    | sp.map(lambda x: x * x)
    | sp.write.to_console(timestamp='none'))
>>> kx.q('publish', range(4))
pykx.Identity(pykx.q('::'))
>>> kx.q('publish', range(4))
pykx.Identity(pykx.q('::'))
>>> kx.q('publish', range(10))
0 1 4 9 0 1 4 9 0 1 4 9 16 25 36 49 64 81
pykx.Identity(pykx.q('::'))

kxi.sp.read.from_expr

Evaluate expression or function into the pipeline.

Parameters:

Name Type Description Default
expr Union[str, bytes, Callable]

Either a q expression as a string, which will be evaluated to produce data for the pipeline, or a nullary function, which will be called to produce data for the pipeline.

required

Returns:

Type Description
Reader

A from_expr reader, which can be joined to other operators or pipelines.

kxi.sp.read.from_file

Read file contents into the pipeline.

Parameters:

Name Type Description Default
path Union[os.PathLike, List[os.PathLike]]

A filepath or list of file paths.

required
mode FileMode

How the content of the file should be interpreted by the reader.

FileMode.binary
offset int

How many bytes into the file reading should begin.

0
chunking FileChunking

If/how the file should be split into chunks.

FileChunking.auto
chunk_size Union[int, CharString]

The size of chunks to read when chunking is enabled. Can be specified as an integer number of bytes, or as a string with the unit, e.g. '1MB'.

'1MB'

Returns:

Type Description
Reader

A from_file reader, which can be joined to other operators or pipelines.

kxi.sp.read.from_kafka

Consume data from a Kafka topic.

Maximum poll limit

The Kafka reader reads multiple messages in a single poll iteration. A global limit is imposed on the number of messages to read in a single cycle to avoid locking the process in a read loop only serving Kafka messages. By default this limit is set to 1000 messages. Setting the configuration option pollLimit will change this value. This limit is global so if multiple readers set this value, only one will be used.

Parameters:

Name Type Description Default
topic str

The name of a topic.

required
brokers Union[str, List[str]]

Brokers identified a 'host:port' string, or a list of 'host:port' strings.

'localhost:9092'
retries int

Maximum number of retries that will be attempted for Kafka API calls.

10
retry_wait Timedelta

How long to wait between retry attempts.

(1, 's')
poll_limit int

Maximum number of records to process in a single poll loop.

1000
offset Optional[Dict[int, KafkaOffset]]

Dictionary mapping from parition IDs to their offsets.

None
options Optional[Dict[str, Any]]

Dictionary of Kafka consumer options.

None

Returns:

Type Description
Reader

A from_kafka reader, which can be joined to other operators or pipelines.

kxi.sp.read.from_postgres

Execute query on a PostgreSQL database.

Any parameter (except for the query) can be omitted, in which case it will be sourced from an environment variable following the format $KXI_SP_POSTGRES_<param> where <param> is the parameter name in uppercase.

The pipeline will be torn down after processing a Postgres query

After the query has completed processing, the Postgres reader will signal a 'finish' command which will teardown the pipeline if there are no other pending requests.

Parameters:

Name Type Description Default
query CharString

Query to execute on the Postgres database.

required
database Optional[CharString]

Name of the database to connect to. Defaults to $KXI_SP_POSTGRES_DATABASE.

None
server Optional[CharString]

Address of the database to connect to. Defaults to $KXI_SP_POSTGRES_SERVER.

None
port Optional[CharString]

Port of the database. Defaults to $KXI_SP_POSTGRES_PORT.

None
username Optional[CharString]

Username to authenticate with. Defaults to $KXI_SP_POSTGRES_USERNAME.

None
password Optional[CharString]

Password to authenticate with. Defaults to $KXI_SP_POSTGRES_PASSWORD.

None

Returns:

Type Description
Reader

A from_postgres reader, which can be joined to other operators or pipelines.

kxi.sp.read.from_sqlserver

Execute query on a SQLServer database.

Any parameter (except for the query) can be omitted, in which case it will be sourced from an environment variable following the format $KXI_SP_SQLSERVER_<param> where <param> is the parameter name in uppercase.

Parameters:

Name Type Description Default
query CharString

Query to execute on the SQLServer database.

required
database Optional[CharString]

Name of the database to connect to. Defaults to $KXI_SP_SQLSERVER_DATABASE.

None
server Optional[CharString]

Address of the database to connect to. Defaults to $KXI_SP_SQLSERVER_SERVER.

None
port Optional[Union[int, CharString]]

Port of the database. Defaults to $KXI_SP_SQLSERVER_PORT.

None
username Optional[CharString]

Username to authenticate with. Defaults to $KXI_SP_SQLSERVER_USERNAME.

None
password Optional[CharString]

Password to authenticate with. Defaults to $KXI_SP_SQLSERVER_PASSWORD.

None

Returns:

Type Description
Reader

A from_sqlserver reader, which can be joined to other operators or pipelines.

kxi.sp.read.from_stream

Read data using a KX Insights Stream.

Parameters:

Name Type Description Default
table Optional[str]

Name of the table to filter the stream on. By default, no filtering is performed.

None
stream Optional[str]

Name of stream to subscribe to. By default, the stream specified by the $RT_SUB_TOPIC environment variable is used.

None
prefix CharString

Prefix to add to the hostname for RT cluster. By default, the prefix given by the $RT_TOPIC_PREFIX environment variable is used.

''
assembly Optional[str]

The KX Insights assembly to read from. By default, no assembly is used.

None
insights bool

Whether the stream being subscribed to uses Insights message formats.

True
index int

The position in the stream to replay from.

0

Returns:

Type Description
Reader

A from_stream reader, which can be joined to other operators or pipelines.

kxi.sp.read.from_amazon_s3

Reads a file from Amazon S3.

Parameters:

Name Type Description Default
path Union[CharString, List[CharString]]

The path of an object or multiple objects to read from S3.

required
mode FileMode

How the content of the file should be interpreted by the reader.

FileMode.binary
offset int

How many bytes into the file reading should begin.

0
chunking FileChunking

A FileChunking enum value, or string equivalent.

FileChunking.auto
chunk_size Union[int, CharString]

The size of chunks to read when chunking is enabled. Can be specified as an integer number of bytes, or as a string with the unit, e.g. '1MB'.

'1MB'
tenant CharString

The authorization tenant.

''
domain CharString

A custom Amazon S3 domain.

''
region CharString

The AWS region to authenticate against.

'us-east-1'
credentials CharString

The secret name for the Amazon S3 credentials. Refer to the authentication section of the .qsp.read.fromAmazonS3 documentation for more information.

''

Returns:

Type Description
Reader

A from_amazon_s3 reader, which can be joined to other operators or pipelines.

kxi.sp.read.from_azure_storage

Reads a file from Azure Blob Storage.

Parameters:

Name Type Description Default
path Union[CharString, List[CharString]]

The path of an object or multiple objects to read from Microsoft Azure Storage.

required
mode FileMode

How the content of the file should be interpreted by the reader.

FileMode.binary
offset int

How many bytes into the file reading should begin.

0
chunking FileChunking

A FileChunking enum value, or string equivalent.

FileChunking.auto
chunk_size Union[int, CharString]

The size of chunks to read when chunking is enabled. Can be specified as an integer number of bytes, or as a string with the unit, e.g. '1MB'.

'1MB'
account CharString

The Azure account to read from.

''
tenant CharString

The authorization tenant.

''
domain CharString

A custom Azure domain.

''
credentials CharString

The secret name for the Azure credentials. Refer to the authentication section of the .qsp.read.fromAzureStorage documentation for more information.

''

Returns:

Type Description
Reader

A from_azure_storage reader, which can be joined to other operators or pipelines.

kxi.sp.read.from_google_storage

Read a file hosted on Google Cloud Storage.

Parameters:

Name Type Description Default
path Union[CharString, List[CharString]]

The path of an object or multiple objects to read from Google Cloud Storage.

required
mode FileMode

A FileMode enum value, or the string equivalent.

FileMode.binary
offset int

How many bytes into the file reading should begin.

0
chunking FileChunking

A FileChunking enum value, or string equivalent.

FileChunking.auto
chunk_size Union[int, CharString]

The size of chunks to read when chunking is enabled. Can be specified as an integer number of bytes, or as a string with the unit, e.g. '1MB'.

'1MB'
tenant Optional[CharString]

The authentication tenant.

None

Returns:

Type Description
Reader

A from_google_storage reader, which can be joined to other operators or pipelines.

kxi.sp.read.from_http

Requests data from an HTTP endpoint.

Parameters:

Name Type Description Default
url CharString

The URL to send a request to.

required
method CharString

The HTTP method for the HTTP request (ex. GET, POST, etc.).

'GET'
body CharString

The payload of the HTTP request.

''
header dict

A map of header fields to their corresponding values.

None
on_response OperatorFunction

After a response, allows the response to be preprocessed or to trigger another request. Returning 'None' will process the return from the original request immediately. A return of a string will issue another request with the return value as the URL. A return of a dictionary allows for any of the operator parameters to be reset and a new HTTP request issued. A special 'response' key can be used in the return dictionary to change the payload of the response. If the response key is set to 'None', no data is pushed into the pipeline.

None
follow_redirects bool

If set, any redirect return will automatically be followed up to the maximum number of redirects.

True
max_redirects int

The maximum number of redirects to follow before reporting an error.

5
max_retry_attempts int

The number of times to retry a connection after a request timeout.

10
timeout int

The duration in milliseconds to wait for a request to be completed before reporting an error.

None
tenant CharString

The request tenant to use for providing request authentication details.

''
insecure bool

Indicates if unverified server SSL/TLS certificates should be trusted.

False
binary bool

Indicates that the resulting payload should be returned as binary data, otherwise text is assumed.

False
sync bool

Indicates if this request should be made synchronously or asynchronously. Setting the request to be synchronous will block the process until the request is completed.

False
reject_errors bool

Non-successful response codes will generate an error and stop the pipeline.

True

Returns:

Type Description
Reader

A pipeline comprised of the from_http reader, which can be joined to other

Reader

pipelines.

kxi.sp.read.from_mqtt

Read from an MQTT broker.

Parameters:

Name Type Description Default
topic

The name of the topic to subscribe to.

required
broker

The address of the MQTT broker.

required
username str

Username for the MQTT broker.

''
password str

Password for the MQTT broker.

''

Returns:

Type Description
Reader

A from_mqtt reader, which can be joined to other operators or pipelines.