Writers
Stream Processor writers.
Writers are a specialized type of operator that allow users to push data from a streaming pipeline to different external data sources.
Each writer has its own custom setup
and teardown
functions to handle different streaming
lifecycle events.
kxi.sp.write.ConsoleWriter (Writer)
kxi.sp.write.KafkaWriter (Writer)
kxi.sp.write.ProcessWriter (Writer)
kxi.sp.write.StreamWriter (Writer)
kxi.sp.write.VariableWriter (Writer)
kxi.sp.write.Writer (Operator)
kxi.sp.write.ConsoleTimestamp (AutoNameEnum)
Enum for to_console
timestamp options.
These enum values can be provided as enum member objects (e.g. ConsoleTimestamp.utc
), or as
strings matching the names of the members (e.g. 'utc'
).
kxi.sp.write.ConsoleTimestamp.default
Equivalent to 'none'
if using qlog, and equivalent to 'utc'
otherwise. The default
option allows qlog to use its own timestamps, instead of ones provided by the writer.
kxi.sp.write.ConsoleTimestamp.local
Prefix each output line with a local timestamp.
kxi.sp.write.ConsoleTimestamp.none
Do not prefix the any output lines with a timestamp.
kxi.sp.write.ConsoleTimestamp.utc
Prefix each output line with a utc timestamp.
kxi.sp.write.TargetMode (AutoNameEnum)
The kind of object a specified target in a kdb+ process is.
These enum values can be provided as enum member objects (e.g. TargetMode.table
), or as
strings matching the names of members (e.g. 'table'
).
kxi.sp.write.TargetMode.function
The target is a function defined in the kdb+ process. It will be called with the data being written to the process.
kxi.sp.write.TargetMode.table
The target is a table defined in the kdb+ process. It will be upsert with the data being written to the process.
kxi.sp.write.VariableMode (AutoNameEnum)
How to set/update the specified kdb+ variable.
These enum values can be provided as enum member objects (e.g. VariableMode.upsert
), or as
strings matching the names of members (e.g. 'upsert'
).
kxi.sp.write.VariableMode.append
The data from the stream will be appended to the variable.
kxi.sp.write.VariableMode.overwrite
The variable will be set to the last output of the pipeline.
kxi.sp.write.VariableMode.upsert
The tabular data from the stream will be upserted to the variable, which must be a table.
kxi.sp.write.to_console
Write to the console.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
prefix |
Union[str, bytes] |
A prefix for each line of output. |
b'' |
split |
bool |
Whether a vector should be printed on a single line, or split across multiple lines with one element per line. |
False |
timestamp |
ConsoleTimestamp |
A |
<ConsoleTimestamp.default: 'default'> |
qlog |
bool |
Whether the output should be through qlog. |
False |
Returns:
Type | Description |
---|---|
Writer |
A |
kxi.sp.write.to_kafka
Publish data on a Kafka topic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
topic |
str |
The name of a topic. |
required |
brokers |
Union[str, List[str]] |
Brokers identified a |
'localhost:9092' |
retries |
int |
Maximum number of retries that will be attempted for Kafka API calls. |
10 |
retry_wait |
Union[datetime.timedelta, numpy.timedelta64, pykx.wrappers.TimespanAtom, kxi.sp.types.TimedeltaSpec] |
How long to wait between retry attempts. |
(1, 's') |
topic_config |
Dict[str, Any] |
Dictionary of Kafka topic configuration options. |
required |
Returns:
Type | Description |
---|---|
Writer |
A |
kxi.sp.write.to_process
Write data to a kdb+ process over IPC.
Worker is halted until an IPC connection is established or given up on
During a retry loop (on connection loss and on startup), all processing on the Worker is halted until the connection to the output process is re-established or the max number of retries is reached.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
handle |
str |
The kdb+ connection handle in the form |
required |
target |
str |
The name of the function to be called, or the table to be upserted, as defined in the remote kdb+ process. |
required |
mode |
TargetMode |
Whether the target is a function to be called, or a table to be upserted. |
<TargetMode.function: 'function'> |
sync |
bool |
Whether writing should be synchronous or asynchronous. |
False |
queue_length |
int |
Maximum number of async messages permitted in the queue before flushing. |
9223372036854775807 |
queue_size |
int |
Maximum number of bytes to buffer before flushing the queue. Defaults to one mebibyte. |
1048576 |
spread |
bool |
Whether the pipeline data should be treated as a list of arguments for the target
function. This option can only be set if in function mode, and cannot be set if
|
False |
params |
List[str] |
A list of parameters that should appear before the message data in function mode. |
() |
retries |
int |
Maximum number of retries that will be attempted for establishing the initial connection, or reconnecting after the connection has been lost. |
5 |
retry_wait |
Union[datetime.timedelta, numpy.timedelta64, pykx.wrappers.TimespanAtom, kxi.sp.types.TimedeltaSpec] |
How long to wait between retry attempts. |
(1, 's') |
Returns:
Type | Description |
---|---|
Writer |
A |
kxi.sp.write.to_stream
Write data using a KX Insights stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
Optional[str] |
Name of the table to filter the stream on. By default, no filtering is performed. |
None |
stream |
Optional[str] |
Name of stream to publish to. By default, the stream specified by the
|
None |
prefix |
Union[str, bytes, pykx.wrappers.CharVector] |
Prefix to add to the hostname for RT cluster. By default, the prefix given by the
|
'' |
assembly |
Optional[str] |
The KX Insights assembly to write to. By default, no assembly is used. |
None |
insights |
bool |
Whether the stream being published to uses Insights message formats. |
True |
deduplicate |
bool |
Whether the outbound stream should drop duplicate messages that may have
been created during a failure event. If enabled, the pipeline must produce
deterministic data. If |
True |
Returns:
Type | Description |
---|---|
Writer |
A |
kxi.sp.write.to_variable
Write to a local q variable.
The variable will be created when the pipeline is initially run. If it already exists, it will be overwritten. The variable will continue to exist after the pipeline is torn down.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
variable |
str |
The name of the q variable the data will be written to. |
required |
mode |
VariableMode |
How to set/update the specified q variable. |
<VariableMode.append: 'append'> |
Returns:
Type | Description |
---|---|
Writer |
A |
Examples:
Stream data into a list by appending to it:
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish') | sp.write.to_variable('output'))
>>> kx.q('publish').each(range(-10, 0, 2))
pykx.SymbolVector(pykx.q('`.`.`.`.`.'))
>>> kx.q('output')
pykx.LongVector(pykx.q('-10 -8 -6 -4 -2'))
>>> kx.q('publish').each(range(0, 11, 2))
pykx.SymbolVector(pykx.q('`.`.`.`.`.'))
>>> kx.q('output')
pykx.LongVector(pykx.q('-10 -8 -6 -4 -2 0 2 4 6 8 10'))
Stream data into a table by upserting it:
>>> sp.run(sp.read.from_callback('publish') | sp.write.to_variable('output', 'upsert'))
>>> kx.q('publish each (([] x: 1 2 3; y: "abc");([] x: 4 5; y: "de"))')
pykx.SymbolAtom(pykx.q('`output'))
>>> print( kx.q('output'))
x y
---
1 a
2 b
3 c
4 d
5 e