kxi.sp.encode
Stream Processor encoders.
PayloadType Objects
class PayloadType(AutoNameEnum)
Enum to specify the payload type for a protobuf encoding.
table
table.
dict
dictionary.
array
array.
arrays
arrays.
ArrowPayloadType Objects
class ArrowPayloadType(AutoNameEnum)
Enum to specify the payload type for Arrow encoding.
table
table.
arrays
arrays.
CSVHeader Objects
class CSVHeader(AutoNameEnum)
Enum for csv
header options.
These enum values can be provided as enum member objects (e.g. CSVHeader.always
), or as
strings matching the names of the members (e.g. 'always'
).
first
Only first batch starts with a header row.
none
Encoded data never starts with a header row.
always
Encoded data always starts with a header row.
arrow
@Encoder
def arrow(*,
schema: Optional[Union[kx.Table, Dict[str,
Union[str, int,
kx.CharAtom]]]] = None,
match_cols: Union[bool, kx.BooleanAtom] = True,
payload_type: ArrowPayloadType = ArrowPayloadType.auto) -> Encoder
(Beta Feature)Encodes data as Arrow.
Notes:
To enable beta features, set the environment variable KXI_SP_BETA_FEATURES
to
true
.
Arguments:
schema
- A table from which the schema is extracted and used when serializing each batch of data. If not provided, schema is inferred at runtime for each batch.match_cols
- When set to True and a schema is provided, tables passed in have their columns rearranged to match the column ordering of the schema. No reordering is done otherwise.payload_type
- A symbol indicating the message payload that will be encoded (one of auto, table, or arrays).
Returns:
An arrow
encoder, which can be joined to other operators or pipelines.
>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> import pyarrow as pa
>>> kx.q('.spdecode.arrow.i.loadPackage[]')
>>> arrays = [
np.array([1, 2, 3], dtype=np.longlong),
np.array([10.1, 10.2, 10.3], dtype=np.float32),
np.array([10, 20, 30], dtype=np.int16)]
>>> tempTable = pd.DataFrame.from_records(arrays, columns=['x', 'x1', 'x2'])
>>> sp.run(sp.read.from_callback('publish')
| sp.encode.arrow()
| sp.write.to_variable('out'))
>>> kx.q('publish', tempTable)
>>> kx.q('out')
0xffffffffc80000001000000000000a000c000600050008000a000000000104000c000000080008000000040...
csv
@Encoder
def csv(delimiter: Union[str, bytes, kx.CharAtom] = kx.q('","'),
*,
header: CSVHeader = CSVHeader.first) -> Encoder
Encodes data as CSV.
Arguments:
delimiter
- A field separator for the records in the encoded data, defaults to comma.header
- Whether encoded data starts with a header row.
Returns:
A csv
encoder, which can be joined to other operators or pipelines.
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish')
| sp.encode.csv('|')
| sp.write.to_variable('out'))
>>> data = pd.DataFrame({
'name': ['a', 'b', 'c'],
'id': [1, 2, 3],
'quantity': [4, 5, 6]
})
>>> kx.q('publish', data)
name|id|quantity
a|1|4
b|2|5
c|3|6
json
@Encoder
def json(*, split: bool = False) -> Encoder
Encodes data as JSON.
Arguments:
split
- Whether a batch should be encoded as a single JSON object, or split into a separate JSON object for each value in the batch. When the input is a table, this encodes each row as a JSON object with the column names as keys mapping to the corresponding row values.
Returns:
A json
encoder, which can be joined to other operators or pipelines.
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish')
| sp.encode.json()
| sp.write.to_variable('out'))
>>> data = pd.DataFrame({
'name': ['a', 'b', 'c'],
'id': [1, 2, 3],
'quantity': [4, 5, 6]
})
>>> kx.q('publish', data)
{"name": "a", "id": 1, "quantity": 4}
{"name": "b", "id": 2, "quantity": 5}
{"name": "c", "id": 3, "quantity": 6}
protobuf
@Encoder
def protobuf(message: str,
path: Optional[str] = None,
*,
format: Optional[str] = None,
payload_type: Optional[PayloadType] = None) -> Encoder
Encodes data as per protobuf.
Arguments:
message
- The name of the Protocol Buffer message type to encode.path
- The path to a.proto
file containing the message type definition.format
- A string definition of the Protocol Buffer message format to encode.payload_type
- APayloadType
enum value, or the string equivalent.
Returns:
A protobuf
encoder, which can be joined to other operators or pipelines.
Person.proto
file
syntax="proto3";
message Person {
string name = 1;
int32 id = 2;8
string email = 3;
}
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> from enum import Enum
>>> from kxi.sp.encode import PayloadType
>>> proto_file = open('/tmp/Person.proto', 'w')
>>> proto_file.writen = (
'syntax="proto3";message Person { string name = 1;int32 id = 2;string email = 3;}')
>>> proto_file.close()
>>> format_string = ('syntax="proto3";message PersonA '
'{ string name = 1;int32 id = 2;string email = 3;}')
>>> sp.run(sp.read.from_callback('publish')
| sp.encode.protobuf("PersonA",format=format_string,payload_type=PayloadType.auto)
| sp.write.to_variable('out'))
>>> # Define the name and email as byte strings, pykx by default converts strings into symbols
>>> data = {
'name': b'name',
'id': kx.IntAtom(101),
'email': b'email@email.com',
}
>>> kx.q('publish', data)
>>> kx.q('out')
"\n\004name\020e\032\017email@email.com"