Skip to content

Writers

Stream Processor writers.

Writers are a specialized type of operator that allow users to push data from a streaming pipeline to different external data sources.

Each writer has its own custom setup and teardown functions to handle different streaming lifecycle events.

kxi.sp.write.ConsoleWriter (Writer)

kxi.sp.write.KafkaWriter (Writer)

kxi.sp.write.ProcessWriter (Writer)

kxi.sp.write.StreamWriter (Writer)

kxi.sp.write.VariableWriter (Writer)

kxi.sp.write.Writer (Operator)

kxi.sp.write.ConsoleTimestamp (AutoNameEnum)

Enum for to_console timestamp options.

These enum values can be provided as enum member objects (e.g. ConsoleTimestamp.utc), or as strings matching the names of the members (e.g. 'utc').

kxi.sp.write.ConsoleTimestamp.default

Equivalent to 'none' if using qlog, and equivalent to 'utc' otherwise. The default option allows qlog to use its own timestamps, instead of ones provided by the writer.

kxi.sp.write.ConsoleTimestamp.local

Prefix each output line with a local timestamp.

kxi.sp.write.ConsoleTimestamp.none

Do not prefix the any output lines with a timestamp.

kxi.sp.write.ConsoleTimestamp.utc

Prefix each output line with a utc timestamp.

kxi.sp.write.TargetMode (AutoNameEnum)

The kind of object a specified target in a kdb+ process is.

These enum values can be provided as enum member objects (e.g. TargetMode.table), or as strings matching the names of members (e.g. 'table').

kxi.sp.write.TargetMode.function

The target is a function defined in the kdb+ process. It will be called with the data being written to the process.

kxi.sp.write.TargetMode.table

The target is a table defined in the kdb+ process. It will be upsert with the data being written to the process.

kxi.sp.write.VariableMode (AutoNameEnum)

How to set/update the specified kdb+ variable.

These enum values can be provided as enum member objects (e.g. VariableMode.upsert), or as strings matching the names of members (e.g. 'upsert').

kxi.sp.write.VariableMode.append

The data from the stream will be appended to the variable.

kxi.sp.write.VariableMode.overwrite

The variable will be set to the last output of the pipeline.

kxi.sp.write.VariableMode.upsert

The tabular data from the stream will be upserted to the variable, which must be a table.

kxi.sp.write.to_console

Write to the console.

Parameters:

Name Type Description Default
prefix Union[str, bytes]

A prefix for each line of output.

b''
split bool

Whether a vector should be printed on a single line, or split across multiple lines with one element per line.

False
timestamp ConsoleTimestamp

A ConsoleTimestamp enum value, or the string equivalent.

<ConsoleTimestamp.default: 'default'>
qlog bool

Whether the output should be through qlog.

False

Returns:

Type Description
Writer

A to_console writer, which can be joined to other operators or pipelines.

kxi.sp.write.to_kafka

Publish data on a Kafka topic.

Parameters:

Name Type Description Default
topic str

The name of a topic.

required
brokers Union[str, List[str]]

Brokers identified a 'host:port' string, or a list of 'host:port' strings.

'localhost:9092'
retries int

Maximum number of retries that will be attempted for Kafka API calls.

10
retry_wait Union[datetime.timedelta, numpy.timedelta64, pykx.wrappers.TimespanAtom, kxi.sp.types.TimedeltaSpec]

How long to wait between retry attempts.

(1, 's')
topic_config Dict[str, Any]

Dictionary of Kafka topic configuration options.

required

Returns:

Type Description
Writer

A to_kafka writer, which can be joined to other operators or pipelines.

kxi.sp.write.to_process

Write data to a kdb+ process over IPC.

Worker is halted until an IPC connection is established or given up on

During a retry loop (on connection loss and on startup), all processing on the Worker is halted until the connection to the output process is re-established or the max number of retries is reached.

Parameters:

Name Type Description Default
handle str

The kdb+ connection handle in the form ':host:port'. Note the leading colon. The host can be omitted to use localhost, e.g. '::5000'.

required
target str

The name of the function to be called, or the table to be upserted, as defined in the remote kdb+ process.

required
mode TargetMode

Whether the target is a function to be called, or a table to be upserted.

<TargetMode.function: 'function'>
sync bool

Whether writing should be synchronous or asynchronous.

False
queue_length int

Maximum number of async messages permitted in the queue before flushing.

9223372036854775807
queue_size int

Maximum number of bytes to buffer before flushing the queue. Defaults to one mebibyte.

1048576
spread bool

Whether the pipeline data should be treated as a list of arguments for the target function. This option can only be set if in function mode, and cannot be set if params is set.

False
params List[str]

A list of parameters that should appear before the message data in function mode.

()
retries int

Maximum number of retries that will be attempted for establishing the initial connection, or reconnecting after the connection has been lost.

5
retry_wait Union[datetime.timedelta, numpy.timedelta64, pykx.wrappers.TimespanAtom, kxi.sp.types.TimedeltaSpec]

How long to wait between retry attempts.

(1, 's')

Returns:

Type Description
Writer

A to_process writer, which can be joined to other operators or pipelines.

kxi.sp.write.to_stream

Write data using a KX Insights stream.

Parameters:

Name Type Description Default
table Optional[str]

Name of the table to filter the stream on. By default, no filtering is performed.

None
stream Optional[str]

Name of stream to publish to. By default, the stream specified by the $RT_PUB_TOPIC environment variable is used.

None
prefix Union[str, bytes, pykx.wrappers.CharVector]

Prefix to add to the hostname for RT cluster. By default, the prefix given by the $RT_TOPIC_PREFIX environment variable is used.

''
assembly Optional[str]

The KX Insights assembly to write to. By default, no assembly is used.

None
insights bool

Whether the stream being published to uses Insights message formats.

True
deduplicate bool

Whether the outbound stream should drop duplicate messages that may have been created during a failure event. If enabled, the pipeline must produce deterministic data. If $KXI_ALLOW_NONDETERMINISM is set, this value will be forced to be false, potentially resulting in duplicate data after failure recover events.

True

Returns:

Type Description
Writer

A to_stream writer, which can be joined to other operators or pipelines.

kxi.sp.write.to_variable

Write to a local q variable.

The variable will be created when the pipeline is initially run. If it already exists, it will be overwritten. The variable will continue to exist after the pipeline is torn down.

Parameters:

Name Type Description Default
variable str

The name of the q variable the data will be written to.

required
mode VariableMode

How to set/update the specified q variable.

<VariableMode.append: 'append'>

Returns:

Type Description
Writer

A to_variable writer, which can be joined to other operators or pipelines.

Examples:

Stream data into a list by appending to it:

>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish') | sp.write.to_variable('output'))
>>> kx.q('publish').each(range(-10, 0, 2))
pykx.SymbolVector(pykx.q('`.`.`.`.`.'))
>>> kx.q('output')
pykx.LongVector(pykx.q('-10 -8 -6 -4 -2'))
>>> kx.q('publish').each(range(0, 11, 2))
pykx.SymbolVector(pykx.q('`.`.`.`.`.'))
>>> kx.q('output')
pykx.LongVector(pykx.q('-10 -8 -6 -4 -2 0 2 4 6 8 10'))

Stream data into a table by upserting it:

>>> sp.run(sp.read.from_callback('publish') | sp.write.to_variable('output', 'upsert'))
>>> kx.q('publish each (([] x: 1 2 3; y: "abc");([] x: 4 5; y: "de"))')
pykx.SymbolAtom(pykx.q('`output'))
>>> print( kx.q('output'))
x y
---
1 a
2 b
3 c
4 d
5 e