Skip to content

kxi.sp.write

Stream Processor writers.

Writers are a specialized type of operator that allow users to push data from a streaming pipeline to different external data sources.

Each writer has its own custom setup and teardown functions to handle different streaming lifecycle events.

ConsoleTimestamp Objects

class ConsoleTimestamp(AutoNameEnum)

Enum for to_console timestamp options.

These enum values can be provided as enum member objects (e.g. ConsoleTimestamp.utc), or as strings matching the names of the members (e.g. 'utc').

local

Prefix each output line with a local timestamp.

utc

Prefix each output line with a utc timestamp.

none

Do not prefix the any output lines with a timestamp.

default

Equivalent to 'none' if using qlog, and equivalent to 'utc' otherwise. The default option allows qlog to use its own timestamps, instead of ones provided by the writer.

AmazonS3Teardown Objects

class AmazonS3Teardown(AutoNameEnum)

Enum for to_amazon_s3 teardown options

These enum values can be provided as enum member objects (e.g. AmazonS3Teardown.complete), or as strings matching the names of the members (e.g. 'complete').

none

Leave any partial uploads in a pending state to be resumed by a future pipeline.

abort

Abort an pending partial uploads. This means any processed data that is still pending will be lost on teardown.

complete

Mark any partial uploads as complete. This will flush any partial data buffers to S3 ensure that any in-flight data is saved. However, once the data is saved, it cannot be appended to

to_amazon_s3

@Writer
def to_amazon_s3(path: Union[CharString, OperatorFunction],
                 *,
                 is_complete: Optional[Union[CharString,
                                             OperatorFunction]] = None,
                 on_teardown: AmazonS3Teardown = AmazonS3Teardown.none,
                 tenant: CharString = '',
                 domain: CharString = '',
                 region: CharString = 'us-east-1',
                 credentials: CharString = '') -> Writer

Writes data to an object in an Amazon S3 bucket

Arguments:

  • path - The path to write objects to in S3 or a function to change the path for each message that is processed in the stream. If a function is provided, the return must be a string of the file path to write the current batch of data to. When a function is provided, message data is the argument by default, however parameters can be configured to include metadata.
  • is_complete - A binary function that accepts a metadata dictionary as the first argument and data as its second. This function is invoked on each message that is processed and must return a boolean indicating if the current file processing is complete.
  • on_teardown - An AmazonS3Teardown behavior indicating what to do with any partial uploads that are incomplete when a teardown occurs.
  • tenant - The authorization tenant.
  • domain - A custom Amazon S3 domain.
  • region - The AWS region to authenticate against.
  • credentials - The secret name for the Amazon S3 credentials. Refer to the authentication section of the .qsp.write.toAmazonS3 documentation for more information.

Returns:

A to_amazon_s3 writer, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx

>>> sp.run(sp.read.from_callback('publish')
        | sp.encode.csv(',')
        | sp.write.to_amazon_s3('s3://mybucket/file.csv'), region='us-east-1')

>>> data = pd.DataFrame({
        'name': ['a', 'b', 'c'],
        'id': [1, 2, 3],
        'quantity': [4, 5, 6]
    })

>>> kx.q('publish', data)
name,id,quantity
a,1,4
b,2,5
c,3,6

to_console

@Writer
def to_console(prefix: Union[str, bytes] = b'',
               *,
               split: bool = False,
               timestamp: ConsoleTimestamp = ConsoleTimestamp.default,
               qlog: bool = False) -> Writer

Write to the console.

Arguments:

  • prefix - A prefix for each line of output.
  • split - Whether a vector should be printed on a single line, or split across multiple lines with one element per line.
  • timestamp - A ConsoleTimestamp enum value, or the string equivalent.
  • qlog - Whether the output should be through qlog.

Returns:

A to_console writer, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx

>>> sp.run(sp.read.from_callback('publish')
        | sp.write.to_console(timestamp = 'none'))

>>> data = pd.DataFrame({
        'name': ['a', 'b', 'c'],
        'id': [1, 2, 3],
        'quantity': [4, 5, 6]
    })

>>> kx.q('publish', data)
name id quantity
----------------
a    1  4
b    2  5
c    3  6

to_database

@Writer
def to_database(table: str,
                assembly: str = None,
                *,
                retries: int = 60,
                retry_wait: Timedelta = (3, 's'),
                timeout: kx.IntAtom = kx.q('0Ni'),
                deduplicate: bool = True,
                directWrite: bool = False,
                overwrite: bool = True,
                mountName: str = "hdb",
                statusTable: str = "") -> Writer

Write to the kdb Insights Database.

Arguments:

  • table - Table in kdb Insights Database.
  • assembly - Assembly defining kdb Insights Database. When a pipeline is deployed within an assembly this parameter will default to the name of that assembly.
  • retries - Maximum number of retries that will be attempted for establishing the initial connection, or reconnecting after the connection has been lost.
  • retry_wait - How long to wait between retry attempts.
  • timeout - Timeout value.
  • deduplicate - If the outbound stream should drop duplicate messages that may have been created during a failure event. If enabled, the pipeline must produce deterministic data. If $KXI_ALLOW_NONDETERMINISM is set, this value will be forced to be false, potentially resulting in duplicate data after failure recover events.
  • directWrite - Bypass RDB & IDB and write to HDB directly for historical data. Direct table writing is useful for large backfill ingestion where data is mostly historical. When using direct write, the system will use significantly less resources but the data will not be available for query until the entire ingest is complete.
  • overwrite - When using direct write, indicates if data should overwrite existing date partitions. If disabled, data will be merged with existing data.
  • mountName - Name of mount.
  • statusTable - Required when using direct write with an unbounded stream, this is the name of a table to store session statuses.

Returns:

A to_database writer, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx

>>> sp.run(sp.read.from_callback('publish')
        | sp.write.to_database('tableName', 'assembly'))

>>> data = pd.DataFrame({
        'name': ['a', 'b', 'c'],
        'id': [1, 2, 3],
        'quantity': [4, 5, 6]
    })

>>> kx.q('publish', data)
"name,id,quantity"
"a,1,4"
"b,2,5"
"c,3,6"

to_kafka

@Writer
def to_kafka(topic: str = '',
             brokers: Union[str, List[str]] = 'localhost:9092',
             *,
             retries: int = 10,
             retry_wait: Timedelta = (1, 's'),
             options: Dict[str, Any] = None,
             topic_config: Dict[str, Any] = None,
             registry: str = "",
             subject: str = "",
             auto_register: bool = True,
             schema_type: str = "JSON") -> Writer

Publish data on a Kafka topic.

Note: Reserved keys

metadata.broker.list and producer.id are reserved values and are maintained directly by the Kafka writer. The Kafka writer will error if these options are used.

Note: Kafka configuration options

All available Kafka producer configuration options are supported using the options dictionary. This includes properties such as socket.timeout.ms, fetch.message.max.bytes, etc.

To configure the Kafka topic, you should use the topic configuration options.

Arguments:

  • topic - The name of a topic.
  • brokers - Brokers identified a 'host:port' string, or a list of 'host:port' strings.
  • retries - Maximum number of retries that will be attempted for Kafka API calls.
  • retry_wait - How long to wait between retry attempts.
  • options - Dictionary of Kafka publisher configuration options
  • topic_config - Dictionary of Kafka topic configuration options.
  • registry - Optional URL to a Kafka Schema Registry. When provided, Kafka Schema Registry mode is enabled, allowing for automatic payload encoding.
  • subject - A Kafka subject to read schemas from and publish schemas to. If none is provided, uses "-value".
  • autoRegister - Controls whether or not to generate and publish schemas automatically.
  • schemaType - Schema type to generate, one of: "JSON", "PROTOBUF", "AVRO".

Returns:

A to_kafka writer, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx

>>> sp.run(sp.read.from_callback('publish')
        | sp.write.to_kafka('topicName'))

    >>> data = pd.DataFrame({
            'name': ['a', 'b', 'c'],
            'id': [1, 2, 3],
            'quantity': [4, 5, 6]
        })

>>> kx.q('publish', data)
"name,id,quantity"
"a,1,4"
"b,2,5"
"c,3,6"

Advanced Kafka configuration

>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx

>>> sp.run(sp.read.from_callback('publish')
        | sp.write.to_kafka('topicName', options = {
            'socket.timeout.ms': 60000,
            'socket.send.buffer.bytes': 4194304,
            'max.in.flight': 100000
          },
          topic_config = {
            'compression.type': 'gzip',
            'delivery.timeout.ms': 100000
          }
        ))
>>> data = pd.DataFrame({
        'name': ['a', 'b', 'c'],
        'id': [1, 2, 3],
        'quantity': [4, 5, 6]
        })

>>> kx.q('publish', data)
"name,id,quantity"
"a,1,4"
"b,2,5"
"c,3,6"

TargetMode Objects

class TargetMode(AutoNameEnum)

The kind of object a specified target in a kdb+ process is.

These enum values can be provided as enum member objects (e.g. TargetMode.table), or as strings matching the names of members (e.g. 'table').

function

The target is a function defined in the kdb+ process. It will be called with the data being written to the process.

table

The target is a table defined in the kdb+ process. It will be upsert with the data being written to the process.

to_kdb

@Writer
def to_kdb(path: str, prtn_col: str, table: str) -> Writer

(Beta Feature)Writes an on-disk partition table.

Notes:

To enable beta features, set the environment variable KXI_SP_BETA_FEATURES to true.

Arguments:

  • path - The database path.
  • prtn_col - The name of the partition column.
  • table - The name of the table.

Returns:

A to_kdb writer, that writes an on-disk partition table.

>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx

>>> sp.run(sp.read.from_callback('publish')
        | sp.write.to_kdb('database', 'id', 'tableName'))

>>> data = pd.DataFrame({
        'name': ['a', 'b', 'c'],
        'id': [1, 2, 3],
        'quantity': [4, 5, 6]
        })

>>> kx.q('publish', data)
"name,id,quantity"
"a,1,4"
"b,2,5"
"c,3,6"

to_process

@Writer
def to_process(handle: str,
               target: str,
               mode: TargetMode = TargetMode.function,
               *,
               sync: bool = False,
               queue_length: int = 2**63 - 1,
               queue_size: int = 1024 * 1024,
               spread: bool = False,
               params: List[str] = (),
               retries: int = 60,
               retry_wait: Timedelta = (3, 's')) -> Writer

Write data to a kdb+ process over IPC.

Note: Worker is halted until an IPC connection is established or given up on During a retry loop (on connection loss and on startup), all processing on the Worker is halted until the connection to the output process is re-established or the max number of retries is reached.

Arguments:

  • handle - The kdb+ connection handle in the form ':host:port'. Note the leading colon. The host can be omitted to use localhost, e.g. '::5000'.
  • target - The name of the function to be called, or the table to be upserted, as defined in the remote kdb+ process.
  • mode - Whether the target is a function to be called, or a table to be upserted.
  • sync - Whether writing should be synchronous or asynchronous.
  • queue_length - Maximum number of async messages permitted in the queue before flushing.
  • queue_size - Maximum number of bytes to buffer before flushing the queue. Defaults to one mebibyte.
  • spread - Whether the pipeline data should be treated as a list of arguments for the target function. This option can only be set if in function mode, and cannot be set if params is set.
  • params - A list of parameters that should appear before the message data in function mode.
  • retries - Maximum number of retries that will be attempted for establishing the initial connection, or reconnecting after the connection has been lost.
  • retry_wait - How long to wait between retry attempts.

Returns:

A to_process writer, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx

>>> sp.run(sp.read.from_callback('publish')
        | sp.write.to_process('::1234', 'tableName'))

>>> data = pd.DataFrame({
        'name': ['a', 'b', 'c'],
        'id': [1, 2, 3],
        'quantity': [4, 5, 6]
    })

>>> kx.q('publish', data)
name id quantity
----------------
a    1  4
b    2  5
c    3  6

to_stream

@Writer
def to_stream(table: Optional[str] = None,
              stream: Optional[str] = None,
              *,
              prefix: CharString = '',
              assembly: Optional[str] = None,
              insights: bool = True,
              deduplicate: bool = True) -> Writer

Write data using a kdb Insights stream.

Notes:

This functionality will write directly to a reliable transport stream. If wishing to persist to a database you should consider using the to_database writer.

Arguments:

  • table - Name of the table to filter the stream on. By default, no filtering is performed.
  • stream - Name of stream to publish to. By default, the stream specified by the $RT_PUB_TOPIC environment variable is used.
  • prefix - Prefix to add to the hostname for RT cluster. By default, the prefix given by the $RT_TOPIC_PREFIX environment variable is used.
  • assembly DEPRECATED - Kdb Insights assembly to write to. By default, no assembly is used.
  • insights DEPRECATED - Whether the stream being published to uses Insights message formats.
  • deduplicate - Whether the outbound stream should drop duplicate messages that may have been created during a failure event. If enabled, the pipeline must produce deterministic data. If $KXI_ALLOW_NONDETERMINISM is set, this value will be forced to be false, potentially resulting in duplicate data after failure recover events.

Returns:

A to_stream writer, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx

>>> sp.run(sp.read.from_callback('publish')
        | sp.write.to_stream('tableName'))

>>> data = pd.DataFrame({
        'name': ['a', 'b', 'c'],
        'id': [1, 2, 3],
        'quantity': [4, 5, 6]
    })

>>> kx.q('publish', data)
name id quantity
----------------
a    1  4
b    2  5
c    3  6

to_subscriber

@Writer
def to_subscriber(table: str,
                  key_col: Union[str, List[str]],
                  *,
                  publishFrequency: int = 500) -> Writer

Write data to a subscriber through IPC connection

Writes an incoming stream of data to a subscriber based on filtering via subscription to given values of a keyed column (stock symbols in the below example). Data is written to subscriber via IPC connection by a timer function rather than sending all relevant data to the subscriber as it becomes available. In order to receive data on the subscriber process, a upd function must be defined to show the implicit 'z' parameter (example below).

For more information on using the Subscriber writer in an end-to-end deployment, see either the kdb Insights Microservices example or the Enterprise example

Arguments:

  • table - Name of the subscription. Used to subscribe to data emitted from this node.
  • key_col - The keyed column(s) applied against the table.
  • publishFrequency - Interval at which the node sends data to its subscribers (milliseconds)

Returns:

A to_subscriber writer, that writes updates across IPC connections.

>>> from kxi import sp
>>> import pandas as pd

>>> kx.q('upd:{show z}')
>>> sp.run(sp.read.from_callback('publish')
        | sp.write.to_subscriber('data', 'sym'))

>>> updData = pd.DataFrame({
        'sym': ['FDP', 'AAPL', 'MSFT'],
        'id': [1, 2, 3],
        'size': [100, 150, 200]
})

>>> kx.q('publish', updData)
sym  | id   size
-----|------------
FDP  | 1    100
AAPL | 2    150
MSFT | 3    200

VariableMode Objects

class VariableMode(AutoNameEnum)

How to set/update the specified kdb+ variable.

These enum values can be provided as enum member objects (e.g. VariableMode.upsert), or as strings matching the names of members (e.g. 'upsert').

append

The data from the stream will be appended to the variable.

overwrite

The variable will be set to the last output of the pipeline.

upsert

The tabular data from the stream will be upserted to the variable, which must be a table.

to_variable

@Writer
def to_variable(variable: str,
                mode: VariableMode = VariableMode.append) -> Writer

Write to a local q variable.

The variable will be created when the pipeline is initially run. If it already exists, it will be overwritten. The variable will continue to exist after the pipeline is torn down.

Arguments:

  • variable - The name of the q variable the data will be written to.
  • mode - How to set/update the specified q variable.

Returns:

A to_variable writer, which can be joined to other operators or pipelines.

Examples:

Stream data into a list by appending to it:

>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish') | sp.write.to_variable('output'))
>>> kx.q('publish').each(range(-10, 0, 2))
>>> kx.q('output')
-10 -8 -6 -4 -2
>>> kx.q('publish').each(range(0, 11, 2))
>>> kx.q('output')
-10 -8 -6 -4 -2 0 2 4 6 8 10

Stream data into a table with upsert to get a type error if there's a schema mismatch:

>>> sp.run(sp.read.from_callback('publish') | sp.write.to_variable('output', 'upsert'))
>>> kx.q('publish each (([] x: 1 2 3; y: "abc");([] x: 4 5; y: "de"))')
>>> kx.q('output')
x y
---
1 a
2 b
3 c
4 d
5 e