Skip to content

Decoders

Stream Processor decoders.

kxi.sp.decode.__all__ = ['Decoder', 'arrow', 'csv', 'json', 'protobuf'] module-attribute

kxi.sp.decode.Decoder

Bases: Operator

kxi.sp.decode.CSVHeader

Bases: AutoNameEnum

Enum for csv header options.

These enum values can be provided as enum member objects (e.g. CSVHeader.always), or as strings matching the names of the members (e.g. 'always').

kxi.sp.decode.CSVHeader.none = auto() class-attribute

Encoded data never starts with a header row.

kxi.sp.decode.CSVHeader.always = auto() class-attribute

Encoded data always starts with a header row.

kxi.sp.decode.__dir__

kxi.sp.decode.arrow

Decodes Arrow streams.

Parameters:

Name Type Description Default
as_list Union[bool, kx.BooleanAtom]

A boolean where if true, the decoded result is a list of arrays, corresponding only to the Arrow stream data. If false, the decoded result is a table corresponding to both the schema and data in the Arrow stream.

False

Returns:

Type Description
Decoder

An arrow decoder, which can be joined to other operators or pipelines.

kxi.sp.decode.csv

Parses CSV data to a table.

Parameters:

Name Type Description Default
schema Union[str, kx.Table, dict[str, Union[str, bytes, kx.CharAtom]]]

A dictionary of column names where values are the desired output type, a dictionary of columns and their type characters, or a list of type characters.

required
delimiter Union[str, bytes, kx.CharAtom]

A field separator for the records in the encoded data, defaults to comma.

kx.q('","')
header CSVHeader

Whether encoded data starts with a header row.

CSVHeader.always
exclude Union[List[str], List[int], kx.SymbolVector, kx.IntegralVector]

Columns to exclude from the output, specified by name or by index.

kx.q('`long$()')
schema_type Union[str, bytes, kx.CharVector]

Either 'literal' or 'schema' indicating how to interpret the schema parameter.

'literal'

Returns:

Type Description
Decoder

A csv decoder, which can be joined to other operators or pipelines.

kxi.sp.decode.gzip

Inflates (decompresses) gzipped data.

(Beta Feature) Inflates (decompresses) gzipped data

Note

To enable beta features, set the environment variable KXI_SP_BETA_FEATURES to true.

Inflates a gzipped stream of bytes into an uncompressed stream of bytes. This decoder will inflate as much data as is available in the inbound stream and buffer any trailing data until the next byte buffer is received. Once data has been inflated, it is passed to the next node in the pipeline.

Warning

The gzip decoder is currently marked as beta as it does not currently support fault tolerant replay. If a pipeline fails and is forced to replay data, the gzip decoder will fail with an incomplete byte stream. Fault tolerance support is coming in a future release.

Returns:

Type Description
Decoder

A gzip decoder, which can be joined to other operators or pipelines.

kxi.sp.decode.json

Parse JSON data.

JSON data should be a char vector or byte vector, and will be parsed using the .j.k function from q.

Parameters:

Name Type Description Default
decode_each bool

Whether the data consists of a single JSON object, or a list of JSON objects that should each be decoded separately.

False

Returns:

Type Description
Decoder

A json decoder, which can be joined to other operators or pipelines.

kxi.sp.decode.protobuf

Parse Protocol Buffer messages.

Parameters:

Name Type Description Default
message Union[str, bytes, kx.SymbolAtom]

The name of the Protocol Buffer message type to decode.

required
path Optional[Union[Path, str, kx.SymbolAtom]]

A path to a .proto file containing the message type definition.

None
format Optional[CharString]

The Protocol Buffer message format to decode.

None
as_list Union[bool, kx.BooleanAtom]

Whether the output should be a list of values instead of a dict.

False

Returns:

Type Description
Decoder

A protobuf decoder, which can be joined to other operators or pipelines.