Skip to content

kxi.sp.encode

Stream Processor encoders.

PayloadType Objects

class PayloadType(AutoNameEnum)

Enum to specify the payload type for a protobuf encoding.

table

table.

dict

dictionary.

array

array.

arrays

arrays.

ArrowPayloadType Objects

class ArrowPayloadType(AutoNameEnum)

Enum to specify the payload type for Arrow encoding.

table

table.

arrays

arrays.

CSVHeader Objects

class CSVHeader(AutoNameEnum)

Enum for csv header options.

These enum values can be provided as enum member objects (e.g. CSVHeader.always), or as strings matching the names of the members (e.g. 'always').

first

Only first batch starts with a header row.

none

Encoded data never starts with a header row.

always

Encoded data always starts with a header row.

arrow

@Encoder
def arrow(*,
          schema: Optional[Union[kx.Table, Dict[str,
                                                Union[str, int,
                                                      kx.CharAtom]]]] = None,
          match_cols: Union[bool, kx.BooleanAtom] = True,
          payload_type: ArrowPayloadType = ArrowPayloadType.auto) -> Encoder

(Beta Feature)Encodes data as Arrow.

Notes:

To enable beta features, set the environment variable KXI_SP_BETA_FEATURES to true.

Arguments:

  • schema - A table from which the schema is extracted and used when serializing each batch of data. If not provided, schema is inferred at runtime for each batch.
  • match_cols - When set to True and a schema is provided, tables passed in have their columns rearranged to match the column ordering of the schema. No reordering is done otherwise.
  • payload_type - A symbol indicating the message payload that will be encoded (one of auto, table, or arrays).

Returns:

An arrow encoder, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> import pyarrow as pa

>>> kx.q('.spdecode.arrow.i.loadPackage[]')

>>> arrays = [
        np.array([1, 2, 3], dtype=np.longlong),
        np.array([10.1, 10.2, 10.3], dtype=np.float32),
        np.array([10, 20, 30], dtype=np.int16)]

>>> tempTable = pd.DataFrame.from_records(arrays, columns=['x', 'x1', 'x2'])
>>> sp.run(sp.read.from_callback('publish')
        | sp.encode.arrow()
        | sp.write.to_variable('out'))
>>> kx.q('publish', tempTable)
>>> kx.q('out')
0xffffffffc80000001000000000000a000c000600050008000a000000000104000c000000080008000000040...

csv

@Encoder
def csv(delimiter: Union[str, bytes, kx.CharAtom] = kx.q('","'),
        *,
        header: CSVHeader = CSVHeader.first) -> Encoder

Encodes data as CSV.

Arguments:

  • delimiter - A field separator for the records in the encoded data, defaults to comma.
  • header - Whether encoded data starts with a header row.

Returns:

A csv encoder, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx

>>> sp.run(sp.read.from_callback('publish')
        | sp.encode.csv('|')
        | sp.write.to_variable('out'))

>>> data = pd.DataFrame({
          'name': ['a', 'b', 'c'],
          'id': [1, 2, 3],
          'quantity': [4, 5, 6]
      })

>>> kx.q('publish', data)
      name|id|quantity
      a|1|4
      b|2|5
      c|3|6

json

@Encoder
def json(*, split: bool = False) -> Encoder

Encodes data as JSON.

Arguments:

  • split - Whether a batch should be encoded as a single JSON object, or split into a separate JSON object for each value in the batch. When the input is a table, this encodes each row as a JSON object with the column names as keys mapping to the corresponding row values.

Returns:

A json encoder, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx

>>> sp.run(sp.read.from_callback('publish')
        | sp.encode.json()
        | sp.write.to_variable('out'))

>>> data = pd.DataFrame({
          'name': ['a', 'b', 'c'],
          'id': [1, 2, 3],
          'quantity': [4, 5, 6]
      })

>>> kx.q('publish', data)
      {"name": "a", "id": 1, "quantity": 4}
      {"name": "b", "id": 2, "quantity": 5}
      {"name": "c", "id": 3, "quantity": 6}

protobuf

@Encoder
def protobuf(message: str,
             path: Optional[str] = None,
             *,
             format: Optional[str] = None,
             payload_type: Optional[PayloadType] = None) -> Encoder

Encodes data as per protobuf.

Arguments:

  • message - The name of the Protocol Buffer message type to encode.
  • path - The path to a .proto file containing the message type definition.
  • format - A string definition of the Protocol Buffer message format to encode.
  • payload_type - A PayloadType enum value, or the string equivalent.

Returns:

A protobuf encoder, which can be joined to other operators or pipelines.

Person.proto file

syntax="proto3";
message Person {
  string name = 1;
  int32 id = 2;8
  string email = 3;
}
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> from enum import Enum
>>> from kxi.sp.encode import PayloadType

>>> proto_file = open('/tmp/Person.proto', 'w')
>>> proto_file.writen = (
    'syntax="proto3";message Person { string name = 1;int32 id = 2;string email = 3;}')
>>> proto_file.close()

>>> format_string = ('syntax="proto3";message PersonA '
                     '{ string name = 1;int32 id = 2;string email = 3;}')

>>> sp.run(sp.read.from_callback('publish')
        | sp.encode.protobuf("PersonA",format=format_string,payload_type=PayloadType.auto)
        | sp.write.to_variable('out'))

>>> # Define the name and email as byte strings, pykx by default converts strings into symbols
>>> data = {
        'name': b'name',
        'id': kx.IntAtom(101),
        'email': b'email@email.com',
        }

>>> kx.q('publish', data)
>>> kx.q('out')
"\n\004name\020e\032\017email@email.com"