Decoders
Decode an external data format into Stream Processor
Decoding allows data to be converted into a format that can be processed directly within the Stream Processor. Decoders need to be used when ingesting data from an external data format before performing other transformations.
Interfaces
A Python interface is included along side the q interface and can be used if PyKX is enabled. See the Python API for equivalent APIs.
A drag and drop UI is included with kdb Insights Enterprise for building pipelines. See the UI documentation for equivalent UI functions.
Table of Contents
.qsp.decode arrow (Beta) decode Arrow streams csv decode CSV data into tables gzip (Beta) decode gzipped data json decode JSON data pcap decode pcap data protobuf decode Protocol Buffer messages
.qsp.decode.arrow
(Beta Feature) Decodes Arrow streams
Beta Features
To enable beta features, set the environment variable KXI_SP_BETA_FEATURES
to true
.
.qsp.decode.arrow[]
.qsp.decode.arrow[.qsp.use (!) . flip enlist (`asList; asList)]
options:
name | type | description | default |
---|---|---|---|
asList | bool | If true, the decoded result is a list of arrays, corresponding only to the Arrow stream data. Otherwise, by default the decoded result is a table corresponding to both the schema and data in the Arrow stream. | 0b |
For all common arguments, refer to configuring operators
This operator decodes an Arrow stream into either a kdb+ table or list of arrays.
Decode an Arrow stream into a kdb+ table:
/ -- Reading from Arrow-encoded byte vector.
input:0xffffffffa80000001000000000000a000c000600050008000a000000000104000c00000008000800000004000800000004000000020000004000000004000000d8ffffff000001051000000018000000040000000000000002000000783100000400040004000000100014000800060007000c00000010001000000000000102100000001c0000000400000000000000010000007800000008000c000800070008000000000000014000000000000000ffffffffc800000014000000000000000c0016000600050008000c000c0000000003040018000000400000000000000000000a0018000c00040008000a0000006c000000100000000300000000000000000000000500000000000000000000000000000000000000000000000000000018000000000000001800000000000000000000000000000018000000000000001000000000000000280000000000000015000000000000000000000002000000030000000000000000000000000000000300000000000000000000000000000001000000000000000200000000000000030000000000000000000000070000000e00000015000000737472696e6731737472696e6732737472696e6733000000ffffffff00000000
/ -- Reading to a kdb+ table
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.decode.arrow[]
.qsp.write.toVariable[`output]
publish input
output
x x1
-----------
1 "string1"
2 "string2"
3 "string3"
Decode an Arrow stream into a list of arrays:
/ -- Reading from Arrow-encoded byte vector.
input:0xffffffffa80000001000000000000a000c000600050008000a000000000104000c00000008000800000004000800000004000000020000004000000004000000d8ffffff000001051000000018000000040000000000000002000000783100000400040004000000100014000800060007000c00000010001000000000000102100000001c0000000400000000000000010000007800000008000c000800070008000000000000014000000000000000ffffffffc800000014000000000000000c0016000600050008000c000c0000000003040018000000400000000000000000000a0018000c00040008000a0000006c000000100000000300000000000000000000000500000000000000000000000000000000000000000000000000000018000000000000001800000000000000000000000000000018000000000000001000000000000000280000000000000015000000000000000000000002000000030000000000000000000000000000000300000000000000000000000000000001000000000000000200000000000000030000000000000000000000070000000e00000015000000737472696e6731737472696e6732737472696e6733000000ffffffff00000000
/ -- Reading to a kdb+ table
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.decode.arrow[.qsp.use (!) . flip enlist (`asList; 1b)]
.qsp.write.toVariable[`output]
publish input
output
1 2 3
"string1" "string2" "string3"
.qsp.decode.csv
Parse CSV data to a table
.qsp.decode.csv[schema]
.qsp.decode.csv[schema; delimiter]
.qsp.decode.csv[schema; delimiter; .qsp.use (!) . flip (
(`header ; header);
(`exclude ; exclude);
(`schemaType ; schemaType);
(`encoding ; encoding);
(`newlines ; newlines))]
Parameters:
name | type | description | default |
---|---|---|---|
schema | dict or string | A table with the desired output schema, a list of types to support as type characters, or "" to treat all columns as strings. | "" |
delimiter | character | Field separator for the records in the encoded data. | "," |
options:
name | type | description | default |
---|---|---|---|
header | symbol | Whether encoded data starts with a header row, either none , first or always . |
first |
exclude | symbol[] or int[] | A list of columns to exclude from the output. | () |
schemaType | symbol | How to interpret the provided schema object. By default the schema is treated as the desired literal output, alternatively, this can be set to schema. When using schema the provided schema object should be in a special table format of ([] name: $(); datatype: short$()). |
literal |
encoding | symbol | How the data is expected to be encoded when being consumed. Currently supported options for this are ASCII and UTF8 |
UTF8 |
newlines | boolean | Indicates whether line-returns may be embedded in strings | 0b |
For all common arguments, refer to configuring operators
This operator decodes CSV data from strings or bytes with delimiter-separated values into tables in the given schema.
Byte Order Marks
When dealing with non-ASCII encoding schemes, the CSV decoding logic will check for and remove byte order mark prefixes on the incoming data. Depending on how string data is presented, the byte order mark may or may not be visible. This can lead to mysterious errors that are hard to track down, so be sure to use the UTF8 encoding option when processing data prefixed with a byte order mark.
q)bom: "c"$0xEFBBBF;
q)ascii: "Hello, World!";
q)utf8: bom,"Hello, World!";
q)`$ascii
`Hello, World!
q)`$utf8
`Hello, World!
q)(`$ascii) ~ `$utf8
0b
q)ascii
"Hello, World!"
q)utf8
"\357\273\277Hello, World!"
Accepted type formats
Certain target types have strict requirements around the string format in order to be parsed correctly. Please see the Apply Schema documentation for details.
Newlines Parameter
Due to a performance penalty when the input data does not contain newlines in strings the newlines parameter is disabled by default
Decode from a CSV file:
// Generate a random table of data and store it in an inventory file.
n: 10000
t: ([]
date: n?.z.d + neg til 10;
price: (n?1000)div 100;
item: n?`3;
description: {rand[100]?.Q.an}each til n;
quantity: n?10000)
`:/tmp/inventory.csv 0: csv 0: t
// Read and parse the data from a file
schema: ([] date: `date$(); price:`int$();
item:`symbol$(); description: (); quantity:`long$());
.qsp.run
.qsp.read.fromFile["/tmp/inventory.csv"]
.qsp.decode.csv[schema]
.qsp.write.toConsole[]
| date price item description quantity
-----------------------------| ---------------------------------------------------------------
2021.07.16D19:45:03.929480200| 2021.07.16 7 ehm "3qupWqmNh6y8TeTzJlW49NlRzv0_0" 2659
2021.07.16D19:45:03.929480200| 2021.07.14 2 iif "_eB_lq" 8257
2021.07.16D19:45:03.929480200| 2021.07.12 7 eod "GhUgGe3PH9Ie2NOw" 3907
2021.07.16D19:45:03.929480200| 2021.07.11 0 goj "Dvmemf3H2P" 6100
2021.07.16D19:45:03.929480200| 2021.07.09 1 bpm "GbSjldDmUprmfiBa0UI8I" 367
..
Decode from a CSV file using a dictionary schema:
// Generate a random table of data and store it in an inventory file.
n: 10000
t: ([]
date: n?.z.d + neg til 10;
price: (n?1000)div 100;
item: n?`3;
description: {rand[100]?.Q.an}each til n;
quantity: n?10000)
`:/tmp/inventory.csv 0: csv 0: t
// Read and parse the data from a file
schema: `date`price`item`description`quantity!"dis*j";
.qsp.run
.qsp.read.fromFile["/tmp/inventory.csv"]
.qsp.decode.csv[schema]
.qsp.write.toConsole[]
| date price item description quantity
-----------------------------| ---------------------------------------------------------------
2021.07.16D19:45:03.929480200| 2021.07.16 7 ehm "3qupWqmNh6y8TeTzJlW49NlRzv0_0" 2659
2021.07.16D19:45:03.929480200| 2021.07.14 2 iif "_eB_lq" 8257
2021.07.16D19:45:03.929480200| 2021.07.12 7 eod "GhUgGe3PH9Ie2NOw" 3907
2021.07.16D19:45:03.929480200| 2021.07.11 0 goj "Dvmemf3H2P" 6100
2021.07.16D19:45:03.929480200| 2021.07.09 1 bpm "GbSjldDmUprmfiBa0UI8I" 367
..
Decode from a CSV file using schemaType schema:
// Generate a random table of data and store it in an inventory file.
n: 10000
t: ([]
date: n?.z.d + neg til 10;
price: (n?1000)div 100;
item: n?`3;
description: {rand[100]?.Q.an}each til n;
quantity: n?10000)
`:/tmp/inventory.csv 0: csv 0: t
// Read and parse the data from a file
schema: ([] name:`date`price`item`description`quantity; datatype:-14 -7 -11 0 -7h);
.qsp.run
.qsp.read.fromFile["inventory.csv"]
.qsp.decode.csv[schema;.qsp.use``schemaType!(::;`schema)]
.qsp.write.toConsole[]
| date price item description quantity
-----------------------------| ---------------------------------------------------------------
2021.07.16D19:45:03.929480200| 2021.07.16 7 ehm "3qupWqmNh6y8TeTzJlW49NlRzv0_0" 2659
2021.07.16D19:45:03.929480200| 2021.07.14 2 iif "_eB_lq" 8257
2021.07.16D19:45:03.929480200| 2021.07.12 7 eod "GhUgGe3PH9Ie2NOw" 3907
2021.07.16D19:45:03.929480200| 2021.07.11 0 goj "Dvmemf3H2P" 6100
2021.07.16D19:45:03.929480200| 2021.07.09 1 bpm "GbSjldDmUprmfiBa0UI8I" 367
..
.qsp.decode.gzip
(Beta Feature) Inflates (decompresses) gzipped data
Beta Features
To enable beta features, set the environment variable KXI_SP_BETA_FEATURES
to true
.
Inflates a gzipped stream of bytes into an uncompressed stream of bytes. This decoder will inflate as much data as is available in the inbound stream and buffer any trailing data until the next byte buffer is received. Once data has been inflated, it is passed to the next node in the pipeline.
Fault Tolerance
The gzip decoder is currently marked as beta as it does not currently support fault tolerant replay. If a pipeline fails and is forced to replay data, the gzip decoder will fail with an incomplete byte stream. Fault tolerance support is coming in a future release.
.qsp.decode.gzip[]
For all common arguments, refer to configuring operators
This operator inflates a gzipped byte or char stream while preserving the shape and type of the incoming data.
Decode a stream of gzipped CSV data:
Permissions
When running this example inside a kxi-sp-worker
image, you must first run
system "cd /tmp"
to avoid encountering permissions errors.
`:table.csv 0: csv 0: ([] date: .z.d; sym: 100?3?`3; price: 100?1000f; quantity: 100?100)
system "gzip table.csv"
.qsp.run
.qsp.read.fromFile[`:table.csv.gz]
.qsp.decode.gzip[]
.qsp.decode.csv["DSFJ"]
.qsp.write.toVariable[`output]
output
date sym price quantity
--------------------------------
2022.10.26 aci 106.9924 90
2022.10.26 lcn 422.2224 73
2022.10.26 lcn 767.486 90
2022.10.26 bdp 885.1612 43
2022.10.26 bdp 435.8676 90
2022.10.26 lcn 77.88199 84
..
.qsp.decode.json
Parse JSON data
.qsp.decode.json[]
.qsp.decode.json[.qsp.use enlist[`decodeEach]!enlist decodeEach]
options:
name | type | description | default |
---|---|---|---|
decodeEach | boolean | By default messages passed to the decoder are treated as a single JSON object. Setting decodeEach to true indicates that parsing should be done on each value of a message. This is useful when decoding data that has objects separated by newlines. This allows the pipeline to process partial sets of the JSON file without requiring the entire block to be in memory. |
0b |
For all common arguments, refer to configuring operators
This operator parses JSON strings to native q types, usually either a dictionary or a table.
Decode JSON from a file:
// Generate a random table of data and write it as JSON data
n: 10000;
t: ([]
date: n?.z.d + neg til 10;
price: (n?1000)div 100;
item: n?`3;
description: {rand[100]?.Q.an}each til n;
quantity: n?10000);
`:/tmp/inventory.json 0: enlist .j.j t;
.qsp.run
.qsp.read.fromFile["/tmp/inventory.json"]
.qsp.decode.json[]
.qsp.write.toConsole[];
| date price item description quantity
-----------------------------| ------------------------------------------------------------------------------------
2021.10.05D19:40:04.536274000| "2021-10-01" 8 "eke" "PlND7JnZejE5j8aKJxSmqLTJycOsxkgTgqz2dB6mH3Q" 5963
2021.10.05D19:40:04.536274000| "2021-10-05" 0 "ldc" "ngctTMTD5PkkTSTOZ_3pwgy2vISuvnJYy" 3057
2021.10.05D19:40:04.536274000| "2021-10-05" 7 "ikb" "nFBU7" 8986
2021.10.05D19:40:04.536274000| "2021-09-28" 9 "lhp" "JH9NSxL7UNBGRZ49MYDX9qu_BUYmZoGu11G_GSV" 9488
2021.10.05D19:40:04.536274000| "2021-10-05" 3 "eoi" "E0hp_zZUBAfKERSPvdz_UZnKX07iBe2sd9TgH4mJmFtsLyap" 1301
..
Decode a stream of JSON:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.decode.json[.qsp.use``decodeEach!11b]
.qsp.write.toConsole[];
publish .j.j each ([] date: .z.d + til 10; price: 10?100f)
| date price
-----------------------------| ---------------------
2021.10.05D19:47:01.576948900| "2021-10-05" 22.56381
2021.10.05D19:47:01.576948900| "2021-10-06" 51.2789
2021.10.05D19:47:01.576948900| "2021-10-07" 34.48978
2021.10.05D19:47:01.576948900| "2021-10-08" 69.06853
2021.10.05D19:47:01.576948900| "2021-10-09" 71.53166
..
.qsp.decode.pcap
Decodes pcap files
.qsp.decode.pcap[columns]
options:
name | type | description | default |
---|---|---|---|
columns | symbol[] | The columns to include. (Omit to include all columns) | () |
Parameters: columns: the column names of specific columns the user wishes to convert. The available columns are: ack, dstip, dstmac, dstport, flags, id, ip_checksum, ip_length, offset, payload, proto, proto_checksum, seq, srcmac, srcport, timestamp, tos, ttl, udp_length, urgptr, windowsize
By not supplying any columns, pcap will select all columns as the default.
For all common arguments, refer to configuring operators
Limitations: Currently, IPV6 packet reading is not yet supported by the pcap decoder. What this means is that any and all IPV6 packets inside the file will be skipped over. Additionally, chunking does not appear to be possible with the current decoder. Since pcap files include a file header which is mandatory with each file, a file cannot be parsed by simply partitioning the file. If one wishes to partition pcap files, one might go to https://www.wireshark.org/ and follow the guide to split pcap files.
This operator decodes a pcap file into a kdb+ table.
Decode a pcap file into a kdb+ table:
// Reading to a kdb+ table
.qsp.run
.qsp.read.fromFile["test/data/decode/test_data/20180127_IEXTP1_DEEP1.0.pcap"]
.qsp.decode.pcap[`timestamp`tos`ip_length`id`offset`proto`ip_checksum]
.qsp.write.toVariable[`output]
1#output
timestamp tos ip_length id offset proto ip_checksum
--------------------------------------------------------------------------
2023.05.08D20:17:50.720432000 00 200 9155 0 11 -10827
.qsp.decode.protobuf
Parse Protocol Buffer messages to a dictionary or list
.qsp.decode.protobuf[message]
.qsp.decode.protobuf[message; .qsp.use (!) . flip (
(`file ; file);
(`format; format);
(`asList; asList))]
Parameters:
name | type | description | default |
---|---|---|---|
message | string or symbol | The name of the Protocol Buffer message type to decode. | Required |
options:
name | type | description | default |
---|---|---|---|
file | symbol | The path to a .proto file containing the message type definition. Either format or file must be provided. |
` |
format | string | A string definition of the Protocol Buffer message format to decode. | "" |
asList | boolean | By default, the output is a dictionary which includes the field names. But if this option is set to True, then only the list of values is outputted. | 0b |
For all common arguments, refer to configuring operators
This operator decodes protobuf-encoded messages of the chosen message type, given a protobuf
schema containing that message type. The protobuf schema can be provided either as a file using
the file
parameter, or as a string using the format
option. Decoded messages are outputted as
either a dictionary or list depending on the value of the asList
option.
Import paths
To import your .proto
file, the folder containing the .proto
file is added as an import
path. This means the folder will be scanned when importing future .proto
files, so it is
important that you avoid having .proto
files with the same filename present in import
paths you use.
Decode protobuf messages using a Person.proto
file:
// Person.proto
syntax="proto3";
message Person {
string name = 1;
int32 id = 2;
string email = 3;
}
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.decode.protobuf[`Person;"Person.proto"]
.qsp.write.toConsole[];
// The bytes listed below are an example encoded Protocol Buffer payload
publish 0x0a046e616d6510651a0f656d61696c40656d61696c2e636f6d;
2021.11.09D19:05:29.933331149 | name | "name"
2021.11.09D19:05:29.933331149 | id | 101i
2021.11.09D19:05:29.933331149 | email| "email@email.com"
Decode protobuf messages using format
into lists:
format: "syntax=\"proto3\";
message Person {
string name = 1;
int32 id = 2;
string email = 3;
}";
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.decode.protobuf[.qsp.use `message`format`asList!(`Person;format;1b)]
.qsp.write.toConsole[];
// The bytes listed below are an example encoded Protocol Buffer payload
publish 0x0a046e616d6510651a0f656d61696c40656d61696c2e636f6d;
2021.11.09D19:11:40.422536637 | "name"
2021.11.09D19:11:40.422536637 | 101i
2021.11.09D19:11:40.422536637 | "email@email.com"