Skip to content

kxi.sp.decode

Stream Processor decoders.

CSVHeader Objects

class CSVHeader(AutoNameEnum)

Enum for csv header options.

These enum values can be provided as enum member objects (e.g. CSVHeader.always), or as strings matching the names of the members (e.g. 'always').

none

Encoded data never starts with a header row.

always

Encoded data always starts with a header row.

first

Encoded data initially starts with a header row.

CSVEncoding Objects

class CSVEncoding(AutoNameEnum)

Enum for csv encoding formats.

These enum values can be provided as enum member objects (e.g. CSVEncoding.UTF8), or as strings matching the names of the members (e.g. 'UTF8').

UTF8

Data is expected to be encoded in UTF8 format.

ASCII

Data is expected to be encoded in ASCII format.

arrow

@Decoder
def arrow(*, as_list: Union[bool, kx.BooleanAtom] = False) -> Decoder

(Beta Feature)Decodes Arrow streams.

Notes:

To enable beta features, set the environment variable KXI_SP_BETA_FEATURES to true.

Arguments:

  • as_list - A boolean where if true, the decoded result is a list of arrays, corresponding only to the Arrow stream data. If false, the decoded result is a table corresponding to both the schema and data in the Arrow stream.

Returns:

An arrow decoder, which can be joined to other operators or pipelines.

Reading from Arrow-encoded byte vector.

>>> from kxi import sp
>>> import pykx as kx
>>> pa_encoded=kx.q('0xffffffffa80000001000000000000a000c000600050008000a000000'
                    '000104000c000000080008000000040008000000040000000200000040'
                    '00000004000000d8ffffff000001051000000018000000040000000000'
                    '000002000000783100000400040004000000100014000800060007000c'
                    '00000010001000000000000102100000001c0000000400000000000000'
                    '010000007800000008000c000800070008000000000000014000000000'
                    '000000ffffffffc800000014000000000000000c001600060005000800'
                    '0c000c0000000003040018000000400000000000000000000a0018000c'
                    '00040008000a0000006c00000010000000030000000000000000000000'
                    '0500000000000000000000000000000000000000000000000000000018'
                    '0000000000000018000000000000000000000000000000180000000000'
                    '0000100000000000000028000000000000001500000000000000000000'
                    '0002000000030000000000000000000000000000000300000000000000'
                    '0000000000000000010000000000000002000000000000000300000000'
                    '00000000000000070000000e00000015000000737472696e6731737472'
                    '696e6732737472696e6733000000ffffffff00000000')
>>> bv=kx.ByteVector(pa_encoded)
>>> sp.run(sp.read.from_callback('publish')
         | sp.decode.arrow(as_list=True)
         | sp.write.to_variable('out'))
>>> kx.q('publish', bv)
>>> kx.q('out')
1         2         3
"string1" "string2" "string3"

csv

@Decoder
def csv(schema: Union[str, kx.Table, dict[str, Union[str, bytes,
                                                     kx.CharAtom]]] = "",
        delimiter: Union[str, bytes, kx.CharAtom] = kx.q('","'),
        *,
        header: CSVHeader = CSVHeader.always,
        exclude: Union[List[str], List[int], kx.SymbolVector,
                       kx.IntegralVector] = kx.q('`long$()'),
        schema_type: Union[str, bytes, kx.CharVector] = 'literal',
        encoding: CSVEncoding = CSVEncoding.UTF8,
        newlines: bool = False) -> Decoder

Parses CSV data to a table.

Arguments:

  • schema - A dictionary of column names where values are the desired output type, a dictionary of columns and their type characters, or a list of type characters.
  • delimiter - A field separator for the records in the encoded data, defaults to comma.
  • header - Whether encoded data starts with a header row.
  • exclude - Columns to exclude from the output, specified by name or by index.
  • schema_type - Either 'literal' or 'schema' indicating how to interpret the schema parameter.
  • encoding - How the data is expected to be encoded when being consumed. Currently supported options for this are ASCII and UTF8
  • newlines - Indicates whether newlines may be embedded in strings. Can impact performance when enabled.

Notes:

When dealing with non-ASCII encoding schemes, the CSV decoding logic will check for and remove byte order mark prefixes on the incoming data. This may or may not be visible to the user depending on how data is being viewed and can lead to unexpected errors.

Certain target types have strict requirements around the string format in order to be parsed correctly. Please see the Apply Schema documentation for details.

Returns:

A csv decoder, which can be joined to other operators or pipelines.

>>> import pykx as kx
>>> from kxi import sp

>>> sp.run(sp.read.from_callback('publish')
        | sp.decode.csv({'name': 'string', 'id': 'int', 'quantity': 'int'})
        | sp.write.to_variable('out'))

>>> data = kx.CharVector("\n".join(
        ["name,id,quantity",
        "a,1,4",
        "b,2,5",
        "c,3,6"]))

>>> kx.q('publish', data)
>>> kx.q('out')
name id quantity
----------------
,"a" 1  4
,"b" 2  5
,"c" 3  6

gzip

@Decoder
def gzip() -> Decoder

Inflates (decompresses) gzipped data.

(Beta Feature) Inflates (decompresses) gzipped data

Notes:

To enable beta features, set the environment variable KXI_SP_BETA_FEATURES to true.

Inflates a gzipped stream of bytes into an uncompressed stream of bytes. This decoder will inflate as much data as is available in the inbound stream and buffer any trailing data until the next byte buffer is received. Once data has been inflated, it is passed to the next node in the pipeline.

Warnings:

The gzip decoder is currently marked as beta as it does not currently support fault tolerant replay. If a pipeline fails and is forced to replay data, the gzip decoder will fail with an incomplete byte stream. Fault tolerance support is coming in a future release.

Returns:

A gzip decoder, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pykx as kx
>>> import random
>>> import string
>>> import gzip
>>> import io
>>> import os

>>> characters = string.ascii_letters + string.digits
>>> data = ''.join(random.choice(characters) for _ in range(10000))
>>> filepath='test/data/decode/test_data/data.gz'

>>> with gzip.open(filepath, 'wb') as file:
        file.write(data.encode('utf-8'))
>>> with open(filepath, 'rb') as gzfile:
        comp = gzfile.read()
>>> os.remove(filepath)

>>> sp.run(sp.read.from_expr(lambda: comp)
        | sp.decode.gzip()
        | sp.write.to_variable('out'))

>>> kx.q('out')
0x596d3139735378486d4d39595f46435a5435763847316f66665...

json

@Decoder
def json(*, decode_each: bool = False) -> Decoder

Parse JSON data.

JSON data should be a char vector or byte vector, and will be parsed using the .j.k function from q.

Arguments:

  • decode_each - By default messages passed to the decoder are treated as a single JSON object. Setting decode_each to true indicates that parsing should be done on each value of a message. This is useful when decoding data that has objects separated by newlines. This allows the pipeline to process partial sets of the JSON file without requiring the entire block to be in memory.

Returns:

A json decoder, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pykx as kx

>>> sp.run(sp.read.from_callback('publish')
        | sp.decode.json(decode_each=True)
        | sp.write.to_variable('out'))

>>> data = kx.CharVector("\n".join([
    '{"name": "a", "id": 1, "quantity": 4}',
    '{"name": "b", "id": 2, "quantity": 5}',
    '{"name": "c", "id": 3, "quantity": 6}']))

>>> kx.q('publish', data)
>>> kx.q('out')
name id quantity
----------------
,"a" 1  4
,"b" 2  5
,"c" 3  6

protobuf

@Decoder
def protobuf(message: Union[str, bytes, kx.SymbolAtom],
             *,
             path: Optional[Union[Path, str, kx.SymbolAtom]] = None,
             format: Optional[CharString] = None,
             as_list: Union[bool, kx.BooleanAtom] = False) -> Decoder

Parse Protocol Buffer messages.

Arguments:

  • message - The name of the Protocol Buffer message type to decode.
  • path - A path to a .proto file containing the message type definition.
  • format - The Protocol Buffer message format to decode.
  • as_list - Whether the output should be a list of values instead of a dict.

Returns:

A protobuf decoder, which can be joined to other operators or pipelines.

```file Person.proto syntax="proto3"; message Person { string name = 1; int32 id = 2; string email = 3; }

```python
>>> from kxi import sp
>>> import pykx as kx

>>> from textwrap import dedent
>>> def protobuf_schema(msg_name):
        return dedent(f'''
            syntax="proto3";
            message {msg_name} {{
                string name = 1;
                int32 id = 2;
                string email = 3;
            }}
        ''').strip()
>>> sp.run(sp.read.from_expr(lambda:
        bytes.fromhex('0a046e616d6510651a0f656d61696c40656d61696c2e636f6d'))
       | sp.decode.protobuf('Person2', format=protobuf_schema('Person2'), as_list=True)
       | sp.write.to_variable('out', mode='overwrite'))
>>> kx.q('out')
"name"
101i
"email@email.com"

pcap

@Decoder
def pcap(columns: List[str] = None) -> Decoder

Parse pcap data.

pcap data should be a char vector or byte vector, and will be read in as a byte vector from q.

options:

name type description default
columns string [] The columns to include. (Omit to include all columns) ()

Arguments:

  • columns - the column names of specific columns the user wishes to convert. The available columns are: ack, dstip, dstmac, dstport, flags, id, ip_checksum, ip_length, offset, payload, proto, proto_checksum, seq, srcmac, srcport, timestamp, tos, ttl, udp_length, urgptr, windowsize

By not supplying any columns, pcap will select all columns.

Returns:

A pcap decoder, which can be joined to other operators or pipelines.

Limitations: Currently, IPV6 packet reading is not yet supported by the pcap decoder. What this means is that any and all IPV6 packets inside the file will be skipped over. Additionally, chunking does not appear to be possible with the current decoder. Since pcap files include a file header which is mandatory with each file, a file cannot be parsed by simply partitioning the file. If one wishes to partition pcap files, one might go to https://www.wireshark.org/ and follow the guide to split pcap files.

>>> from kxi import sp
>>> import pykx as kx

>>> file_path = 'test/data/decode/test_data/20180127_IEXTP1_DEEP1.0.pcap'
>>> with open(file_path, 'rb') as file:
        comp = file.read()

>>> sp.run(sp.read.from_expr(lambda: comp)
        | sp.decode.pcap(['timestamp'])
        | sp.write.to_variable('out'))
>>> kx.q('out')
timestamp
-----------------------------
2018.01.27D13:00:15.910701000
2018.01.27D13:00:17.002966000
2018.01.27D13:00:17.009577000
2018.01.27D13:00:17.016564000
...