Skip to content

Writers

Stream Processor writers.

Writers are a specialized type of operator that allow users to push data from a streaming pipeline to different external data sources.

Each writer has its own custom setup and teardown functions to handle different streaming lifecycle events.

kxi.sp.write.__all__ = ['ConsoleTimestamp', 'TargetMode', 'VariableMode', 'Writer', 'to_amazon_s3', 'to_console', 'to_kafka', 'to_kdb', 'to_process', 'to_stream', 'to_variable'] module-attribute

kxi.sp.write.Writer

Bases: Operator

kxi.sp.write.ConsoleTimestamp

Bases: AutoNameEnum

Enum for to_console timestamp options.

These enum values can be provided as enum member objects (e.g. ConsoleTimestamp.utc), or as strings matching the names of the members (e.g. 'utc').

kxi.sp.write.ConsoleTimestamp.local = auto() class-attribute

Prefix each output line with a local timestamp.

kxi.sp.write.ConsoleTimestamp.utc = auto() class-attribute

Prefix each output line with a utc timestamp.

kxi.sp.write.ConsoleTimestamp.none = auto() class-attribute

Do not prefix the any output lines with a timestamp.

kxi.sp.write.ConsoleTimestamp.default = auto() class-attribute

Equivalent to 'none' if using qlog, and equivalent to 'utc' otherwise. The default option allows qlog to use its own timestamps, instead of ones provided by the writer.

kxi.sp.write.AmazonS3Teardown

Bases: AutoNameEnum

Enum for to_amazon_s3 teardown options

These enum values can be provided as enum member objects (e.g. AmazonS3Teardown.complete), or as strings matching the names of the members (e.g. 'complete').

kxi.sp.write.AmazonS3Teardown.none = auto() class-attribute

Leave any partial uploads in a pending state to be resumed by a future pipeline.

kxi.sp.write.AmazonS3Teardown.abort = auto() class-attribute

Abort an pending partial uploads. This means any processed data that is still pending will be lost on teardown.

kxi.sp.write.AmazonS3Teardown.complete = auto() class-attribute

Mark any partial uploads as complete. This will flush any partial data buffers to S3 ensure that any in-flight data is saved. However, once the data is saved, it cannot be appended to

kxi.sp.write.TargetMode

Bases: AutoNameEnum

The kind of object a specified target in a kdb+ process is.

These enum values can be provided as enum member objects (e.g. TargetMode.table), or as strings matching the names of members (e.g. 'table').

kxi.sp.write.TargetMode.function = auto() class-attribute

The target is a function defined in the kdb+ process. It will be called with the data being written to the process.

kxi.sp.write.TargetMode.table = auto() class-attribute

The target is a table defined in the kdb+ process. It will be upsert with the data being written to the process.

kxi.sp.write.VariableMode

Bases: AutoNameEnum

How to set/update the specified kdb+ variable.

These enum values can be provided as enum member objects (e.g. VariableMode.upsert), or as strings matching the names of members (e.g. 'upsert').

kxi.sp.write.VariableMode.append = auto() class-attribute

The data from the stream will be appended to the variable.

kxi.sp.write.VariableMode.overwrite = auto() class-attribute

The variable will be set to the last output of the pipeline.

kxi.sp.write.VariableMode.upsert = auto() class-attribute

The tabular data from the stream will be upserted to the variable, which must be a table.

kxi.sp.write.__dir__

kxi.sp.write.to_amazon_s3

Writes data to an object in an Amazon S3 bucket

Parameters:

Name Type Description Default
path Union[CharString, OperatorFunction]

The path to write objects to in S3 or a function to change the path for each message that is processed in the stream. If a function is provided, the return must be a string of the file path to write the current batch of data to. When a function is provided, message data is the argument by default, however parameters can be configured to include metadata.

required
is_complete Optional[Union[CharString, OperatorFunction]]

A binary function that accepts a metadata dictionary as the first argument and data as its second. This function is invoked on each message that is processed and must return a boolean indicating if the current file processing is complete.

None
on_teardown AmazonS3Teardown

An AmazonS3Teardown behavior indicating what to do with any partial uploads that are incomplete when a teardown occurs.

AmazonS3Teardown.none
tenant CharString

The authorization tenant.

''
domain CharString

A custom Amazon S3 domain.

''
region CharString

The AWS region to authenticate against.

'us-east-1'
credentials CharString

The secret name for the Amazon S3 credentials. Refer to the authentication section of the .qsp.write.toAmazonS3 documentation for more information.

''

Returns:

Type Description
Writer

A to_amazon_s3 writer, which can be joined to other operators or pipelines.

kxi.sp.write.to_console

Write to the console.

Parameters:

Name Type Description Default
prefix Union[str, bytes]

A prefix for each line of output.

b''
split bool

Whether a vector should be printed on a single line, or split across multiple lines with one element per line.

False
timestamp ConsoleTimestamp

A ConsoleTimestamp enum value, or the string equivalent.

ConsoleTimestamp.default
qlog bool

Whether the output should be through qlog.

False

Returns:

Type Description
Writer

A to_console writer, which can be joined to other operators or pipelines.

kxi.sp.write.to_kafka

Publish data on a Kafka topic.

Parameters:

Name Type Description Default
topic str

The name of a topic.

required
brokers Union[str, List[str]]

Brokers identified a 'host:port' string, or a list of 'host:port' strings.

'localhost:9092'
retries int

Maximum number of retries that will be attempted for Kafka API calls.

10
retry_wait Timedelta

How long to wait between retry attempts.

(1, 's')
topic_config Dict[str, Any]

Dictionary of Kafka topic configuration options.

required

Returns:

Type Description
Writer

A to_kafka writer, which can be joined to other operators or pipelines.

kxi.sp.write.to_kdb

Writes an on-disk partition table.

Parameters:

Name Type Description Default
path str

The database path.

required
prtn_col str

The name of the partition column.

required
table str

The name of the table.

required

Returns:

Type Description
Writer

A to_kdb writer, that writes an on-disk partition table.

kxi.sp.write.to_process

Write data to a kdb+ process over IPC.

Worker is halted until an IPC connection is established or given up on

During a retry loop (on connection loss and on startup), all processing on the Worker is halted until the connection to the output process is re-established or the max number of retries is reached.

Parameters:

Name Type Description Default
handle str

The kdb+ connection handle in the form ':host:port'. Note the leading colon. The host can be omitted to use localhost, e.g. '::5000'.

required
target str

The name of the function to be called, or the table to be upserted, as defined in the remote kdb+ process.

required
mode TargetMode

Whether the target is a function to be called, or a table to be upserted.

TargetMode.function
sync bool

Whether writing should be synchronous or asynchronous.

False
queue_length int

Maximum number of async messages permitted in the queue before flushing.

2 ** 63 - 1
queue_size int

Maximum number of bytes to buffer before flushing the queue. Defaults to one mebibyte.

1024 * 1024
spread bool

Whether the pipeline data should be treated as a list of arguments for the target function. This option can only be set if in function mode, and cannot be set if params is set.

False
params List[str]

A list of parameters that should appear before the message data in function mode.

()
retries int

Maximum number of retries that will be attempted for establishing the initial connection, or reconnecting after the connection has been lost.

5
retry_wait Timedelta

How long to wait between retry attempts.

(1, 's')

Returns:

Type Description
Writer

A to_process writer, which can be joined to other operators or pipelines.

kxi.sp.write.to_stream

Write data using a KX Insights stream.

Parameters:

Name Type Description Default
table Optional[str]

Name of the table to filter the stream on. By default, no filtering is performed.

None
stream Optional[str]

Name of stream to publish to. By default, the stream specified by the $RT_PUB_TOPIC environment variable is used.

None
prefix CharString

Prefix to add to the hostname for RT cluster. By default, the prefix given by the $RT_TOPIC_PREFIX environment variable is used.

''
assembly Optional[str]

The KX Insights assembly to write to. By default, no assembly is used.

None
insights bool

Whether the stream being published to uses Insights message formats.

True
deduplicate bool

Whether the outbound stream should drop duplicate messages that may have been created during a failure event. If enabled, the pipeline must produce deterministic data. If $KXI_ALLOW_NONDETERMINISM is set, this value will be forced to be false, potentially resulting in duplicate data after failure recover events.

True

Returns:

Type Description
Writer

A to_stream writer, which can be joined to other operators or pipelines.

kxi.sp.write.to_variable

Write to a local q variable.

The variable will be created when the pipeline is initially run. If it already exists, it will be overwritten. The variable will continue to exist after the pipeline is torn down.

Parameters:

Name Type Description Default
variable str

The name of the q variable the data will be written to.

required
mode VariableMode

How to set/update the specified q variable.

VariableMode.append

Returns:

Type Description
Writer

A to_variable writer, which can be joined to other operators or pipelines.

Examples:

Stream data into a list by appending to it:

>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish') | sp.write.to_variable('output'))
>>> kx.q('publish').each(range(-10, 0, 2))
pykx.SymbolVector(pykx.q('`.`.`.`.`.'))
>>> kx.q('output')
pykx.LongVector(pykx.q('-10 -8 -6 -4 -2'))
>>> kx.q('publish').each(range(0, 11, 2))
pykx.SymbolVector(pykx.q('`.`.`.`.`.'))
>>> kx.q('output')
pykx.LongVector(pykx.q('-10 -8 -6 -4 -2 0 2 4 6 8 10'))

Stream data into a table by upserting it:

>>> sp.run(sp.read.from_callback('publish') | sp.write.to_variable('output', 'upsert'))
>>> kx.q('publish each (([] x: 1 2 3; y: "abc");([] x: 4 5; y: "de"))')
pykx.SymbolAtom(pykx.q('`output'))
>>> print( kx.q('output'))
x y
---
1 a
2 b
3 c
4 d
5 e