Skip to content

Readers

Stream Processor readers.

Readers are a specialized type of operator that allow users to feed data from different external data sources into a streaming pipeline. Each reader has its own start, stop, setup, and teardown functions to handle different streaming lifecycle events. Readers are assumed to be asynchronous and push data into a pipeline using sp.push.

Some readers, such as from_callback and from_kafka, have implemented a stream-partitioning interface. When using these readers, the partitions are distributed over multiple Workers, orchestrated by a Controller.

kxi.sp.read.FileChunking (IntEnum)

Enum for file chunking options.

Chunking a file splits the file into smaller batches, and streams the batches through the pipeline.

These enum values can be provided as True or False for enabled and disabled respectively.

kxi.sp.read.FileChunking.auto

Automatically determine the size of the target file, and if it is sufficiently large (more than a few megabytes) it will be read in chunks.

kxi.sp.read.FileChunking.disabled

Do not split the file into chunks.

kxi.sp.read.FileChunking.enabled

Split the file into chunks.

kxi.sp.read.FileMode (AutoNameEnum)

Enum for file mode options.

These enum values can be provided as enum member objects (e.g. FileMode.binary), or as strings matching the names of members (e.g. 'binary').

kxi.sp.read.FileMode.binary

Read the content of the file into a byte vector.

kxi.sp.read.FileMode.text

Read the content of the file into strings, and split on newlines.

kxi.sp.read.from_callback

Read from a callback function defined in the global q namespace.

Parameters:

Name Type Description Default
callback Union[str, pykx.wrappers.SymbolAtom]

The name of the callback function that will be defined in the global q namespace.

required
partitions List

A list of parition identifiers to distribute over available workers.

()

Returns:

Type Description
Pipeline

A pipeline comprised of the from_callback reader, which can be joined to other pipelines.

Examples:

Calling a callback multiple times, with a window to batch the data:

>>> from kxi import sp
>>> import numpy as np
>>> import pykx as kx
>>> sp.init()
>>> sp.run(sp.read.from_callback('publish')
    | sp.window.timer(np.timedelta64(10, 's'), count_trigger=12)
    | sp.map(lambda x: x * x)
    | sp.write.to_console(timestamp='none'))
>>> kx.q('publish', range(4))
pykx.Identity(pykx.q('::'))
>>> kx.q('publish', range(4))
pykx.Identity(pykx.q('::'))
>>> kx.q('publish', range(10))
0 1 4 9 0 1 4 9 0 1 4 9 16 25 36 49 64 81
pykx.Identity(pykx.q('::'))

kxi.sp.read.from_expr

Evaluate expression or function into the pipeline.

Parameters:

Name Type Description Default
expr Union[str, bytes, Callable]

Either a q expression as a string, which will be evaluated to produce data for the pipeline, or a nullary function, which will be called to produce data for the pipeline.

required

Returns:

Type Description
Pipeline

A pipeline comprised of the from_expr reader, which can be joined to other pipelines.

kxi.sp.read.from_file

Read file contents into the pipeline.

Parameters:

Name Type Description Default
path PathLike

The path to the file.

required
mode FileMode

How the content of the file should be interpreted by the reader.

<FileMode.binary: 'binary'>
offset int

How many bytes into the file reading should begin.

0
chunking FileChunking

If/how the file should be split into chunks.

<FileChunking.auto: 2>
chunk_size Union[int, str, bytes, pykx.wrappers.CharVector]

The size of chunks to read when chunking is enabled. Can be specified as an integer number of bytes, or as a string with the unit, e.g. '1MB'.

'1MB'

Returns:

Type Description
Pipeline

A pipeline comprised of the from_file reader, which can be joined to other pipelines.

kxi.sp.read.from_google_storage

Read a file hosted on Google Cloud Storage.

Parameters:

Name Type Description Default
path str

The URL of the file hosted on Google Cloud Storage.

required
mode FileMode

A FileMode enum value, or the string equivalent.

<FileMode.binary: 'binary'>
offset int

How many bytes into the file reading should begin.

0
chunking FileChunking

A FileChunking enum value, or string equivalent.

<FileChunking.auto: 2>
chunk_size Union[int, str, bytes, pykx.wrappers.CharVector]

The size of chunks to read when chunking is enabled. Can be specified as an integer number of bytes, or as a string with the unit, e.g. '1MB'.

'1MB'
tenant Union[str, bytes, pykx.wrappers.CharVector]

The authentication tenant.

None

Returns:

Type Description
Pipeline

A pipeline comprised of the from_google_storage reader, which can be joined to other pipelines.

kxi.sp.read.from_kafka

Consume data from a Kafka topic.

Maximum poll limit

The Kafka reader reads multiple messages in a single poll iteration. A global limit is imposed on the number of messages to read in a single cycle to avoid locking the process in a read loop only serving Kafka messages. By default this limit is set to 1000 messages. Setting the configuration option pollLimit will change this value. This limit is global so if multiple readers set this value, only one will be used.

Parameters:

Name Type Description Default
topic str

The name of a topic.

required
brokers Union[str, List[str]]

Brokers identified a 'host:port' string, or a list of 'host:port' strings.

'localhost:9092'
retries int

Maximum number of retries that will be attempted for Kafka API calls.

10
retry_wait Union[datetime.timedelta, numpy.timedelta64, pykx.wrappers.TimespanAtom, kxi.sp.types.TimedeltaSpec]

How long to wait between retry attempts.

(1, 's')
poll_limit int

Maximum number of records to process in a single poll loop.

1000
offset Optional[Dict[int, kxi.sp.read.KafkaOffset]]

Dictionary mapping from parition IDs to their offsets.

None
options Optional[Dict[str, Any]]

Dictionary of Kafka consumer options.

None

Returns:

Type Description
Pipeline

A pipeline comprised of the from_kafka reader, which can be joined to other pipelines.

kxi.sp.read.from_postgres

Execute query on a PostgreSQL database.

Any parameter (except for the query) can be omitted, in which case it will be sourced from an environment variable following the format $KXI_SP_POSTGRES_<param> where <param> is the parameter name in uppercase.

The pipeline will be torn down after processing a Postgres query

After the query has completed processing, the Postgres reader will signal a 'finish' command which will teardown the pipeline if there are no other pending requests.

Parameters:

Name Type Description Default
query Union[str, bytes, pykx.wrappers.CharVector]

Query to execute on the Postgres database.

required
database Union[str, bytes, pykx.wrappers.CharVector]

Name of the database to connect to. Defaults to $KXI_SP_POSTGRES_DATABASE.

''
server Union[str, bytes, pykx.wrappers.CharVector]

Address of the database to connect to. Defaults to $KXI_SP_POSTGRES_SERVER.

''
port Union[str, bytes, pykx.wrappers.CharVector]

Port of the database. Defaults to $KXI_SP_POSTGRES_PORT.

''
username Union[str, bytes, pykx.wrappers.CharVector]

Username to authenticate with. Defaults to $KXI_SP_POSTGRES_USERNAME.

''
password Union[str, bytes, pykx.wrappers.CharVector]

Password to authenticate with. Defaults to $KXI_SP_POSTGRES_PASSWORD.

''

Returns:

Type Description
Pipeline

A pipeline comprised of the from_postgres reader, which can be joined to other pipelines.

kxi.sp.read.from_stream

Read data using a KX Insights Stream.

Parameters:

Name Type Description Default
table Optional[str]

Name of the table to filter the stream on. By default, no filtering is performed.

None
stream Optional[str]

Name of stream to subscribe to. By default, the stream specified by the $RT_SUB_TOPIC environment variable is used.

None
prefix Union[str, bytes, pykx.wrappers.CharVector]

Prefix to add to the hostname for RT cluster. By default, the prefix given by the $RT_TOPIC_PREFIX environment variable is used.

''
assembly Optional[str]

The KX Insights assembly to read from. By default, no assembly is used.

None
insights bool

Whether the stream being subscribed to uses Insights message formats.

True
index int

The position in the stream to replay from.

0

Returns:

Type Description
Pipeline

A pipeline comprised of the from_stream reader, which can be joined to other pipelines.

kxi.sp.read.from_amazon_s3

Reads a file from Amazon S3.

Parameters:

Name Type Description Default
path str

The path of an object to read from S3.

required
mode FileMode

How the content of the file should be interpreted by the reader.

<FileMode.binary: 'binary'>
offset int

How many bytes into the file reading should begin.

0
chunking FileChunking

A FileChunking enum value, or string equivalent.

<FileChunking.auto: 2>
chunk_size Union[int, str, bytes, pykx.wrappers.CharVector]

The size of chunks to read when chunking is enabled. Can be specified as an integer number of bytes, or as a string with the unit, e.g. '1MB'.

'1MB'
tenant Union[str, bytes, pykx.wrappers.CharVector]

The authorization tenant.

''
domain Union[str, bytes, pykx.wrappers.CharVector]

A custom Amazon S3 domain.

''
region Union[str, bytes, pykx.wrappers.CharVector]

The AWS region to authenticate against.

'us-east-1'
credentials Union[str, bytes, pykx.wrappers.CharVector]

The secret name for the Amazon S3 credentials. Refer to the authentication section of the .qsp.read.fromAmazon documentation for more information.

''

Returns:

Type Description
Pipeline

A pipeline comprised of the from_amazon_s3 reader, which can be joined to other pipelines.

kxi.sp.read.from_azure_storage

Reads a file from Azure Blob Storage.

Parameters:

Name Type Description Default
path str

The path of an object to read from Azure Blob Storage.

required
mode FileMode

How the content of the file should be interpreted by the reader.

<FileMode.binary: 'binary'>
offset int

How many bytes into the file reading should begin.

0
chunking FileChunking

A FileChunking enum value, or string equivalent.

<FileChunking.auto: 2>
chunk_size Union[int, str, bytes, pykx.wrappers.CharVector]

The size of chunks to read when chunking is enabled. Can be specified as an integer number of bytes, or as a string with the unit, e.g. '1MB'.

'1MB'
account Union[str, bytes, pykx.wrappers.CharVector]

The Azure account to read from.

''
tenant Union[str, bytes, pykx.wrappers.CharVector]

The authorization tenant.

''
domain Union[str, bytes, pykx.wrappers.CharVector]

A custom Azure domain.

''
credentials Union[str, bytes, pykx.wrappers.CharVector]

The secret name for the Azure credentials. Refer to the authentication section of the .qsp.read.fromAzureStorage documentation for more information.

''

Returns:

Type Description
Pipeline

A pipeline comprised of the from_azure_storage reader, which can be joined to other pipelines.