Encoders
This page details methods for encoding data into external formats in the Stream Processor.
Arrow
(Beta Feature) Serializes data into Arrow format.
Beta Features
To enable beta features, set the environment variable KXI_SP_BETA_FEATURES
to true
. See Beta Feature Usage Terms.
.qsp.encode.arrow[]
.qsp.encode.arrow[.qsp.use (!) . flip (
(`schema ; schema);
(`matchCols ; matchCols);
(`payloadType ; payloadType))]
Parameters:
options:
name | type | description | default |
---|---|---|---|
schema | table or dict | If a schema is provided, then it is used when serializing for every batch of data, giving improved performance. If not provided, schema is inferred at runtime for each batch of data. | :: |
matchCols | bool | This option is only used if a schema is provided and table data is passed in. If set to true, tables can be passed in with any column ordering, and the columns are reordered to match the provided schema. If false, no reordering is done, improving performance. | 1b |
payloadType | symbol | Indicates the message payload that will be encoded (one of auto, table, or arrays). | `auto |
For all common arguments, refer to configuring operators
This operator encodes kdb+ table or array data into an Arrow stream.
Only data made up of inferred list types is supported, these types can be found here. Importantly, there is support for data containing only vector columns/arrays. However for general list type column/arrays, support is only provided for string or byte list columns/arrays.
Encode a kdb+ table to an Arrow stream, where the schema and payload type are inferred:
input:([]c1: til 5; c2: 10.1 10.2 10.3 10.4 10.5; c3: (enlist "a";"bbb";"ccc";"ddd";enlist "e"))
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.encode.arrow[]
.qsp.write.toConsole[]
publish input
0xffffffffd80000001000000000000a000c0006000500..
Encode data to an Arrow stream, where the schema is specified and payload type is arrays
:
schema: ([] c1: "j"$(); c2: "f"$(); c3: "h"$())
payLoadType: `arrays
input: (1 2 3j; 10.1 10.2 10.3f; 10 20 30h)
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.encode.arrow[.qsp.use (!) . flip (
(`schema; schema);
(`payloadType; payLoadType)
)]
.qsp.write.toConsole[]
publish input
0xffffffffe00000001000000000000a000c0006000500..
sp.encode.arrow()
Parameters:
options:
name | type | description | default |
---|---|---|---|
schema | table or dict | A table from which the schema is extracted and used when serializing each batch of data. If not provided, schema is inferred at runtime for each batch. | :: |
match_cols | bool | When set to True and a schema is provided, tables passed in have their columns rearranged to match the column ordering of the schema. No reordering is done otherwise. | 1b |
payload_type | symbol | A symbol indicating the message payload that will be encoded (one of auto, table, or arrays). | `auto |
Returns:
An arrow
encoder, which can be joined to other operators or pipelines.
Examples:
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> arrays = [
np.array([1, 2, 3], dtype=np.longlong),
np.array([10.1, 10.2, 10.3], dtype=np.float32),
np.array([10, 20, 30], dtype=np.int16)]
>>> tempTable = pd.DataFrame.from_records(arrays, columns=['x', 'x1', 'x2'])
>>> sp.run(sp.read.from_callback('publish')
| sp.encode.arrow()
| sp.write.to_variable('out'))
>>> kx.q('publish', tempTable)
>>> kx.q('out')
0xffffffffc80000001000000000000a000c000600050008000a000000000104000c000000080008000000040...
CSV
Converts data into CSV format.
.qsp.encode.csv[]
.qsp.encode.csv[delimiter]
.qsp.encode.csv[delimiter; .qsp.use enlist[`header]!enlist header]
Parameters:
name | type | description | default |
---|---|---|---|
delimiter | character | Field separator for the records in the encoded data. | "," |
options:
name | type | description | default |
---|---|---|---|
header | symbol | Whether encoded data starts with a header row, either none , first or always . |
first |
For all common arguments, refer to configuring operators
This operator encodes tables into CSV data - strings with delimiter-separated values
Encode to CSV from a table:
// Generate a random table of data.
n: 10000
t: ([]
date: n?.z.d + neg til 10;
price: (n?1000)div 100;
item: n?`3;
description: {rand[100]?.Q.an}each til n;
quantity: n?10000)
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.encode.csv["|"]
.qsp.write.toConsole[]
publish t
2022.11.02D09:08:38.553535126 | "date|price|item|description|quantity"
2022.11.02D09:08:38.553535126 | "2022-11-01|2|mkp|F5e|9406"
2022.11.02D09:08:38.553535126 | "2022-10-31|1|igb|1lYkM405Ugfdk_VZLS6XW9zgtkqyNzgoB603WpmS9xV6Hx3Mtjd|4648"
2022.11.02D09:08:38.553535126 | "2022-10-28|9|kij|41EYGoQFLXlPVw50EWPvnkjSyB9HaB7qYtRcu69n|2261"
2022.11.02D09:08:38.553535126 | "2022-10-30|5|dih|DDBVC3uUbhpwdgEQsIt|8552"
2022.11.02D09:08:38.553535126 | "2022-11-02|4|dpb|zXYM60LQRdCQliV1HTtkZmgLrSNLqcKqj0teyswl61vmFkiC1MyatnwgGiAhtXGmX5QBVrOjFHQUt9s|8941"
2022.11.02D09:08:38.553535126 | "2022-10-26|1|hfa|HZ6fSzhC8rt_Zqd18|316"
..
We also allow Encoding to CSV from a dictionary where all values have the same count:
show d: flip 3?t
date | 2022.10.30 2022.10.30 2022.10.29
price | 4 4 1
item | gih gih gdi
description| "tFO1lLCuD" "tFO1lLCuD" "E3XKfjSRYr4MJW3RYtbinQTB3vjVIWlXbvRhUllaoIbviWXBYEb4OH2CsOvWaJd6Cqiyndg"
quantity | 3815 3815 4770
publish d
2022.11.03D11:26:45.226049104 | "2022-10-30|4|gih|tFO1lLCuD|3815"
2022.11.03D11:26:45.226049104 | "2022-10-30|4|gih|tFO1lLCuD|3815"
2022.11.03D11:26:45.226049104 | "2022-10-29|1|gdi|E3XKfjSRYr4MJW3RYtbinQTB3vjVIWlXbvRhUllaoIbviWXBYEb4OH2CsOvWaJd6Cqiyndg|4770"
sp.encode.csv()
sp.encode.csv('|')
Parameters:
name | type | description | default |
---|---|---|---|
delimiter | character | A field separator for the records in the encoded data, defaults to comma. | "," |
options:
name | type | description | default |
---|---|---|---|
header | symbol | Whether encoded data starts with a header row. | first |
Returns:
A csv
encoder, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish')
| sp.encode.csv('|')
| sp.write.to_variable('out'))
>>> data = pd.DataFrame({
'name': ['a', 'b', 'c'],
'id': [1, 2, 3],
'quantity': [4, 5, 6]
})
>>> kx.q('publish', data)
name|id|quantity
a|1|4
b|2|5
c|3|6
JSON
Serializes data into JSON format.
.qsp.encode.json[]
.qsp.encode.json[.qsp.use enlist[`split]!enlist split]
Parameters:
options:
name | type | description | default |
---|---|---|---|
split | boolean | By default, batches are encoded as single JSON objects. Split encodes each value in a given batch separately. When the input is a table, this encodes each row as its own JSON object. | 0b |
For all common arguments, refer to configuring operators
This operator encodes a table as a JSON array of dictionaries.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.encode.json[]
.qsp.write.toVariable[`output]
publish ([] date: .z.d; time: .z.t; price: 3?100f; quantity: 3?100f)
// Format the output to help with viewing
-1 "},\n {" sv "},{" vs output;
[{"date":"2022-02-10","time":"17:46:36.545","price":48.29742,"quantity":73.15636},
{"date":"2022-02-10","time":"17:46:36.545","price":51.24386,"quantity":3.691923},
{"date":"2022-02-10","time":"17:46:36.545","price":51.5731,"quantity":97.58372}]
Encodes a table row by row in JSON
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.encode.json[.qsp.use``split!11b]
.qsp.write.toVariable[`output]
publish ([] date: .z.d; time: .z.t; price: 3?100f; quantity: 3?100f)
// View the output
-1 output;
{"date":"2022-02-10","time":"17:46:36.545","price":48.29742,"quantity":73.15636}
{"date":"2022-02-10","time":"17:46:36.545","price":51.24386,"quantity":3.691923}
{"date":"2022-02-10","time":"17:46:36.545","price":51.5731,"quantity":97.58372}
Parameters:
options:
name | type | description | default |
---|---|---|---|
split | boolean | Whether a batch should be encoded as a single JSON object, or split into a separate JSON object for each value in the batch. When the input is a table, this encodes each row as a JSON object with the column names as keys mapping to the corresponding row values. | 0b |
Returns:
A json
encoder, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish')
| sp.encode.json()
| sp.write.to_variable('out'))
>>> data = pd.DataFrame({
'name': ['a', 'b', 'c'],
'id': [1, 2, 3],
'quantity': [4, 5, 6]
})
>>> kx.q('publish', data)
{"name": "a", "id": 1, "quantity": 4}
{"name": "b", "id": 2, "quantity": 5}
{"name": "c", "id": 3, "quantity": 6}
Protocol Buffer
Serializes data into Protocol Buffers.
.qsp.encode.protobuf[message; file]
.qsp.encode.protobuf[message; file; .qsp.use (!) . flip (
(`format ; format);
(`payloadType; payloadType))]
Parameters:
name | type | description | default |
---|---|---|---|
message | string or symbol | The name of the Protocol Buffer message type to encode. | Required |
file | symbol | The path to a .proto file containing the message type definition. |
Required |
options:
name | type | description | default |
---|---|---|---|
format | string | A string definition of the Protocol Buffer message format to encode. | "" |
payloadType | symbol | Indicates the message payload that will be encoded (one of auto, table, dict, array, or arrays). | auto |
For all common arguments, refer to configuring operators
This operator encodes Protocol Buffer encoded messages from dictionaries or arrays of values.
Import paths
To import your .proto
file, the folder containing the .proto
file is added as an import
path. This means the folder will be scanned when importing future .proto
files, so it is
important that you avoid having .proto
files with the same filename present in import
paths you use.
Encode Protobuf messages using a Person.proto
file:
// Person.proto
syntax="proto3";
message Person {
string name = 1;
int32 id = 2;
string email = 3;
}
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.encode.protobuf[`Person;"Person.proto"]
.qsp.write.toConsole[];
publish `name`id`email!("name"; 101i; "email@email.com")
2021.11.10D12:01:49.089788900 | "\n\004name\020e\032\017email@email.com"
Encode Protobuf messages using format
from arrays:
format: "syntax=\"proto3\";
message Person {
string name = 1;
int32 id = 2;
string email = 3;
}";
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.encode.protobuf[.qsp.use `message`format!(`Person;format)]
.qsp.write.toConsole[];
publish ("name"; 101i; "email@email.com")
2021.11.10D12:01:49.089788900 | "\n\004name\020e\032\017email@email.com"
sp.encode.protobuf("MessageA",format=format_string,payload_type=PayloadType.auto)
Parameters:
name | type | description | default |
---|---|---|---|
message | string or symbol | The name of the Protocol Buffer message type to encode. | Required |
path | symbol | The path to a .proto file containing the message type definition. |
Required |
options:
name | type | description | default |
---|---|---|---|
format | string | A string definition of the Protocol Buffer message format to encode. | "" |
payload_type | symbol | A PayloadType enum value, or the string equivalent. |
auto |
Returns:
A protobuf
encoder, which can be joined to other operators or pipelines.
Examples:
Person.proto
file
syntax="proto3";
message Person {
string name = 1;
int32 id = 2;8
string email = 3;
}
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> from enum import Enum
>>> from kxi.sp.encode import PayloadType
>>> proto_file = open('/tmp/Person.proto', 'w')
>>> proto_file.writen = (
'syntax="proto3";message Person { string name = 1;int32 id = 2;string email = 3;}')
>>> proto_file.close()
>>> format_string = ('syntax="proto3";message PersonA '
'{ string name = 1;int32 id = 2;string email = 3;}')
>>> sp.run(sp.read.from_callback('publish')
| sp.encode.protobuf("PersonA",format=format_string,payload_type=PayloadType.auto)
| sp.write.to_variable('out'))
>>> # Define the name and email as byte strings, pykx by default converts strings into symbols
>>> data = {
'name': b'name',
'id': kx.IntAtom(101),
'email': b'email@email.com',
}
>>> kx.q('publish', data)
>>> kx.q('out')
"\\n\\004name\\020e\\032\\017email@email.com"