Writers
Stream Processor writers.
Writers are a specialized type of operator that allow users to push data from a streaming pipeline to different external data sources.
Each writer has its own custom setup
and teardown
functions to handle different streaming
lifecycle events.
kxi.sp.write.__all__ = ['ConsoleTimestamp', 'TargetMode', 'VariableMode', 'Writer', 'to_amazon_s3', 'to_console', 'to_kafka', 'to_kdb', 'to_process', 'to_stream', 'to_variable']
module-attribute
kxi.sp.write.Writer
Bases: Operator
kxi.sp.write.ConsoleTimestamp
Bases: AutoNameEnum
Enum for to_console
timestamp options.
These enum values can be provided as enum member objects (e.g. ConsoleTimestamp.utc
), or as
strings matching the names of the members (e.g. 'utc'
).
kxi.sp.write.ConsoleTimestamp.local = auto()
class-attribute
Prefix each output line with a local timestamp.
kxi.sp.write.ConsoleTimestamp.utc = auto()
class-attribute
Prefix each output line with a utc timestamp.
kxi.sp.write.ConsoleTimestamp.none = auto()
class-attribute
Do not prefix the any output lines with a timestamp.
kxi.sp.write.ConsoleTimestamp.default = auto()
class-attribute
Equivalent to 'none'
if using qlog, and equivalent to 'utc'
otherwise. The default
option allows qlog to use its own timestamps, instead of ones provided by the writer.
kxi.sp.write.AmazonS3Teardown
Bases: AutoNameEnum
Enum for to_amazon_s3
teardown options
These enum values can be provided as enum member objects (e.g. AmazonS3Teardown.complete
),
or as strings matching the names of the members (e.g. 'complete'
).
kxi.sp.write.AmazonS3Teardown.none = auto()
class-attribute
Leave any partial uploads in a pending state to be resumed by a future pipeline.
kxi.sp.write.AmazonS3Teardown.abort = auto()
class-attribute
Abort an pending partial uploads. This means any processed data that is still pending will be lost on teardown.
kxi.sp.write.AmazonS3Teardown.complete = auto()
class-attribute
Mark any partial uploads as complete. This will flush any partial data buffers to S3 ensure that any in-flight data is saved. However, once the data is saved, it cannot be appended to
kxi.sp.write.TargetMode
Bases: AutoNameEnum
The kind of object a specified target in a kdb+ process is.
These enum values can be provided as enum member objects (e.g. TargetMode.table
), or as
strings matching the names of members (e.g. 'table'
).
kxi.sp.write.TargetMode.function = auto()
class-attribute
The target is a function defined in the kdb+ process. It will be called with the data being written to the process.
kxi.sp.write.TargetMode.table = auto()
class-attribute
The target is a table defined in the kdb+ process. It will be upsert with the data being written to the process.
kxi.sp.write.VariableMode
Bases: AutoNameEnum
How to set/update the specified kdb+ variable.
These enum values can be provided as enum member objects (e.g. VariableMode.upsert
), or as
strings matching the names of members (e.g. 'upsert'
).
kxi.sp.write.VariableMode.append = auto()
class-attribute
The data from the stream will be appended to the variable.
kxi.sp.write.VariableMode.overwrite = auto()
class-attribute
The variable will be set to the last output of the pipeline.
kxi.sp.write.VariableMode.upsert = auto()
class-attribute
The tabular data from the stream will be upserted to the variable, which must be a table.
kxi.sp.write.__dir__
kxi.sp.write.to_amazon_s3
Writes data to an object in an Amazon S3 bucket
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[CharString, OperatorFunction]
|
The path to write objects to in S3 or a function to change the path for each message that is processed in the stream. If a function is provided, the return must be a string of the file path to write the current batch of data to. When a function is provided, message data is the argument by default, however parameters can be configured to include metadata. |
required |
is_complete |
Optional[Union[CharString, OperatorFunction]]
|
A binary function that accepts a metadata dictionary as the first argument and data as its second. This function is invoked on each message that is processed and must return a boolean indicating if the current file processing is complete. |
None
|
on_teardown |
AmazonS3Teardown
|
An |
AmazonS3Teardown.none
|
tenant |
CharString
|
The authorization tenant. |
''
|
domain |
CharString
|
A custom Amazon S3 domain. |
''
|
region |
CharString
|
The AWS region to authenticate against. |
'us-east-1'
|
credentials |
CharString
|
The secret name for the Amazon S3 credentials. Refer to the authentication
section of the |
''
|
Returns:
Type | Description |
---|---|
Writer
|
A |
kxi.sp.write.to_console
Write to the console.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
prefix |
Union[str, bytes]
|
A prefix for each line of output. |
b''
|
split |
bool
|
Whether a vector should be printed on a single line, or split across multiple lines with one element per line. |
False
|
timestamp |
ConsoleTimestamp
|
A |
ConsoleTimestamp.default
|
qlog |
bool
|
Whether the output should be through qlog. |
False
|
Returns:
Type | Description |
---|---|
Writer
|
A |
kxi.sp.write.to_kafka
Publish data on a Kafka topic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
topic |
str
|
The name of a topic. |
required |
brokers |
Union[str, List[str]]
|
Brokers identified a |
'localhost:9092'
|
retries |
int
|
Maximum number of retries that will be attempted for Kafka API calls. |
10
|
retry_wait |
Timedelta
|
How long to wait between retry attempts. |
(1, 's')
|
topic_config |
Dict[str, Any]
|
Dictionary of Kafka topic configuration options. |
required |
Returns:
Type | Description |
---|---|
Writer
|
A |
kxi.sp.write.to_kdb
Writes an on-disk partition table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str
|
The database path. |
required |
prtn_col |
str
|
The name of the partition column. |
required |
table |
str
|
The name of the table. |
required |
Returns:
Type | Description |
---|---|
Writer
|
A |
kxi.sp.write.to_process
Write data to a kdb+ process over IPC.
Worker is halted until an IPC connection is established or given up on
During a retry loop (on connection loss and on startup), all processing on the Worker is halted until the connection to the output process is re-established or the max number of retries is reached.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
handle |
str
|
The kdb+ connection handle in the form |
required |
target |
str
|
The name of the function to be called, or the table to be upserted, as defined in the remote kdb+ process. |
required |
mode |
TargetMode
|
Whether the target is a function to be called, or a table to be upserted. |
TargetMode.function
|
sync |
bool
|
Whether writing should be synchronous or asynchronous. |
False
|
queue_length |
int
|
Maximum number of async messages permitted in the queue before flushing. |
2 ** 63 - 1
|
queue_size |
int
|
Maximum number of bytes to buffer before flushing the queue. Defaults to one mebibyte. |
1024 * 1024
|
spread |
bool
|
Whether the pipeline data should be treated as a list of arguments for the target
function. This option can only be set if in function mode, and cannot be set if
|
False
|
params |
List[str]
|
A list of parameters that should appear before the message data in function mode. |
()
|
retries |
int
|
Maximum number of retries that will be attempted for establishing the initial connection, or reconnecting after the connection has been lost. |
5
|
retry_wait |
Timedelta
|
How long to wait between retry attempts. |
(1, 's')
|
Returns:
Type | Description |
---|---|
Writer
|
A |
kxi.sp.write.to_stream
Write data using a KX Insights stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
Optional[str]
|
Name of the table to filter the stream on. By default, no filtering is performed. |
None
|
stream |
Optional[str]
|
Name of stream to publish to. By default, the stream specified by the
|
None
|
prefix |
CharString
|
Prefix to add to the hostname for RT cluster. By default, the prefix given by the
|
''
|
assembly |
Optional[str]
|
The KX Insights assembly to write to. By default, no assembly is used. |
None
|
insights |
bool
|
Whether the stream being published to uses Insights message formats. |
True
|
deduplicate |
bool
|
Whether the outbound stream should drop duplicate messages that may have
been created during a failure event. If enabled, the pipeline must produce
deterministic data. If |
True
|
Returns:
Type | Description |
---|---|
Writer
|
A |
kxi.sp.write.to_variable
Write to a local q variable.
The variable will be created when the pipeline is initially run. If it already exists, it will be overwritten. The variable will continue to exist after the pipeline is torn down.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
variable |
str
|
The name of the q variable the data will be written to. |
required |
mode |
VariableMode
|
How to set/update the specified q variable. |
VariableMode.append
|
Returns:
Type | Description |
---|---|
Writer
|
A |
Examples:
Stream data into a list by appending to it:
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish') | sp.write.to_variable('output'))
>>> kx.q('publish').each(range(-10, 0, 2))
pykx.SymbolVector(pykx.q('`.`.`.`.`.'))
>>> kx.q('output')
pykx.LongVector(pykx.q('-10 -8 -6 -4 -2'))
>>> kx.q('publish').each(range(0, 11, 2))
pykx.SymbolVector(pykx.q('`.`.`.`.`.'))
>>> kx.q('output')
pykx.LongVector(pykx.q('-10 -8 -6 -4 -2 0 2 4 6 8 10'))
Stream data into a table by upserting it:
>>> sp.run(sp.read.from_callback('publish') | sp.write.to_variable('output', 'upsert'))
>>> kx.q('publish each (([] x: 1 2 3; y: "abc");([] x: 4 5; y: "de"))')
pykx.SymbolAtom(pykx.q('`output'))
>>> print( kx.q('output'))
x y
---
1 a
2 b
3 c
4 d
5 e