kxi.sp.decode
Stream Processor decoders.
CSVHeader Objects
class CSVHeader(AutoNameEnum)
Enum for csv
header options.
These enum values can be provided as enum member objects (e.g. CSVHeader.always
), or as
strings matching the names of the members (e.g. 'always'
).
none
Encoded data never starts with a header row.
always
Encoded data always starts with a header row.
CSVEncoding Objects
class CSVEncoding(AutoNameEnum)
Enum for csv
encoding formats.
These enum values can be provided as enum member objects (e.g. CSVEncoding.UTF8
), or as
strings matching the names of the members (e.g. 'UTF8'
).
UTF8
Data is expected to be encoded in UTF8 format.
ASCII
Data is expected to be encoded in ASCII format.
arrow
@Decoder
def arrow(*, as_list: Union[bool, kx.BooleanAtom] = False) -> Decoder
(Beta Feature)Decodes Arrow streams.
Notes:
To enable beta features, set the environment variable KXI_SP_BETA_FEATURES
to
true
.
Arguments:
as_list
- A boolean where if true, the decoded result is a list of arrays, corresponding only to the Arrow stream data. If false, the decoded result is a table corresponding to both the schema and data in the Arrow stream.
Returns:
An arrow
decoder, which can be joined to other operators or pipelines.
csv
@Decoder
def csv(schema: Union[str, kx.Table, dict[str, Union[str, bytes,
kx.CharAtom]]],
delimiter: Union[str, bytes, kx.CharAtom] = kx.q('","'),
*,
header: CSVHeader = CSVHeader.always,
exclude: Union[List[str], List[int], kx.SymbolVector,
kx.IntegralVector] = kx.q('`long$()'),
schema_type: Union[str, bytes, kx.CharVector] = 'literal',
encoding: CSVEncoding = CSVEncoding.UTF8) -> Decoder
Parses CSV data to a table.
Arguments:
schema
- A dictionary of column names where values are the desired output type, a dictionary of columns and their type characters, or a list of type characters.delimiter
- A field separator for the records in the encoded data, defaults to comma.header
- Whether encoded data starts with a header row.exclude
- Columns to exclude from the output, specified by name or by index.schema_type
- Either 'literal' or 'schema' indicating how to interpret the schema parameter.encoding
- How the data is expected to be encoded when being consumed. Currently supported options for this areASCII
andUTF8
Notes:
When dealing with non-ASCII encoding schemes, the CSV decoding logic will check for and remove byte order mark prefixes on the incoming data. This may or may not be visible to the user depending on how data is being viewed and can lead to unexpected errors.
Returns:
A csv
decoder, which can be joined to other operators or pipelines.
json
@Decoder
def json(*, decode_each: bool = False) -> Decoder
Parse JSON data.
JSON data should be a char vector or byte vector, and will be parsed using the
.j.k
function from q.
Arguments:
decode_each
- By default messages passed to the decoder are treated as a single JSON object. Settingdecode_each
to true indicates that parsing should be done on each value of a message. This is useful when decoding data that has objects separated by newlines. This allows the pipeline to process partial sets of the JSON file without requiring the entire block to be in memory.
Returns:
A json
decoder, which can be joined to other operators or pipelines.
protobuf
@Decoder
def protobuf(message: Union[str, bytes, kx.SymbolAtom],
*,
path: Optional[Union[Path, str, kx.SymbolAtom]] = None,
format: Optional[CharString] = None,
as_list: Union[bool, kx.BooleanAtom] = False) -> Decoder
Parse Protocol Buffer messages.
Arguments:
message
- The name of the Protocol Buffer message type to decode.path
- A path to a.proto
file containing the message type definition.format
- The Protocol Buffer message format to decode.as_list
- Whether the output should be a list of values instead of a dict.
Returns:
A protobuf
decoder, which can be joined to other operators or pipelines.