Skip to content

kxi.sp.decode

Stream Processor decoders.

CSVHeader Objects

class CSVHeader(AutoNameEnum)

Enum for csv header options.

These enum values can be provided as enum member objects (e.g. CSVHeader.always), or as strings matching the names of the members (e.g. 'always').

none

Encoded data never starts with a header row.

always

Encoded data always starts with a header row.

CSVEncoding Objects

class CSVEncoding(AutoNameEnum)

Enum for csv encoding formats.

These enum values can be provided as enum member objects (e.g. CSVEncoding.UTF8), or as strings matching the names of the members (e.g. 'UTF8').

UTF8

Data is expected to be encoded in UTF8 format.

ASCII

Data is expected to be encoded in ASCII format.

arrow

@Decoder
def arrow(*, as_list: Union[bool, kx.BooleanAtom] = False) -> Decoder

(Beta Feature)Decodes Arrow streams.

Notes:

To enable beta features, set the environment variable KXI_SP_BETA_FEATURES to true.

Arguments:

  • as_list - A boolean where if true, the decoded result is a list of arrays, corresponding only to the Arrow stream data. If false, the decoded result is a table corresponding to both the schema and data in the Arrow stream.

Returns:

An arrow decoder, which can be joined to other operators or pipelines.

csv

@Decoder
def csv(schema: Union[str, kx.Table, dict[str, Union[str, bytes,
                                                     kx.CharAtom]]],
        delimiter: Union[str, bytes, kx.CharAtom] = kx.q('","'),
        *,
        header: CSVHeader = CSVHeader.always,
        exclude: Union[List[str], List[int], kx.SymbolVector,
                       kx.IntegralVector] = kx.q('`long$()'),
        schema_type: Union[str, bytes, kx.CharVector] = 'literal',
        encoding: CSVEncoding = CSVEncoding.UTF8) -> Decoder

Parses CSV data to a table.

Arguments:

  • schema - A dictionary of column names where values are the desired output type, a dictionary of columns and their type characters, or a list of type characters.
  • delimiter - A field separator for the records in the encoded data, defaults to comma.
  • header - Whether encoded data starts with a header row.
  • exclude - Columns to exclude from the output, specified by name or by index.
  • schema_type - Either 'literal' or 'schema' indicating how to interpret the schema parameter.
  • encoding - How the data is expected to be encoded when being consumed. Currently supported options for this are ASCII and UTF8

Notes:

When dealing with non-ASCII encoding schemes, the CSV decoding logic will check for and remove byte order mark prefixes on the incoming data. This may or may not be visible to the user depending on how data is being viewed and can lead to unexpected errors.

Returns:

A csv decoder, which can be joined to other operators or pipelines.

json

@Decoder
def json(*, decode_each: bool = False) -> Decoder

Parse JSON data.

JSON data should be a char vector or byte vector, and will be parsed using the .j.k function from q.

Arguments:

  • decode_each - By default messages passed to the decoder are treated as a single JSON object. Setting decode_each to true indicates that parsing should be done on each value of a message. This is useful when decoding data that has objects separated by newlines. This allows the pipeline to process partial sets of the JSON file without requiring the entire block to be in memory.

Returns:

A json decoder, which can be joined to other operators or pipelines.

protobuf

@Decoder
def protobuf(message: Union[str, bytes, kx.SymbolAtom],
             *,
             path: Optional[Union[Path, str, kx.SymbolAtom]] = None,
             format: Optional[CharString] = None,
             as_list: Union[bool, kx.BooleanAtom] = False) -> Decoder

Parse Protocol Buffer messages.

Arguments:

  • message - The name of the Protocol Buffer message type to decode.
  • path - A path to a .proto file containing the message type definition.
  • format - The Protocol Buffer message format to decode.
  • as_list - Whether the output should be a list of values instead of a dict.

Returns:

A protobuf decoder, which can be joined to other operators or pipelines.