Skip to content

Decoders

Decode an external data format into Stream Processor

Decoding allows data to be converted into a format that can be processed directly within the Stream Processor. Decoders need to be used when ingesting data from an external data format before performing other transformations.

Interfaces

A Python interface is included along side the q interface and can be used if PyKX is enabled. See the Python API for equivalent APIs.

A drag and drop UI is included with kdb Insights Enterprise for building pipelines. See the UI documentation for equivalent UI functions.

Table of Contents

.qsp.decode.arrow

(Beta Feature) Decodes Arrow streams

Beta Features

To enable beta features, set the environment variable KXI_SP_BETA_FEATURES to true.

.qsp.decode.arrow[]
.qsp.decode.arrow[.qsp.use (!) . flip enlist (`asList; asList)]

options:

name type description default
asList bool If true, the decoded result is a list of arrays, corresponding only to the Arrow stream data. Otherwise, by default the decoded result is a table corresponding to both the schema and data in the Arrow stream. 0b
> For all common arguments, refer to configuring operators

This operator decodes an Arrow stream into either a kdb+ table or list of arrays.

Decode an Arrow stream into a kdb+ table:

table:([]c1: til 5; c2: 10.1 10.2 10.3 10.4 10.5; c3: (enlist "a";"bbb";"ccc";"ddd";enlist "e"))
serialized: .arrowkdb.ipc.serializeArrowFromTable[table;::]

/ -- Reading to a kdb+ table
input: serialized

.qsp.run
    .qsp.read.fromCallback[`publish]
    .qsp.decode.arrow[]
    .qsp.write.toVariable[`output]

publish input
output
c1 c2   c3
-------------
0  10.1 ,"a"
1  10.2 "bbb"
2  10.3 "ccc"
3  10.4 "ddd"
4  10.5 ,"e"

Decode an Arrow stream into a list of arrays:

table:([]c1: til 5; c2: 10.1 10.2 10.3 10.4 10.5; c3: (enlist "a";"bbb";"ccc";"ddd";enlist "e"))
serialized: .arrowkdb.ipc.serializeArrowFromTable[table;::]

/ -- Reading to a kdb+ table
input: serialized

.qsp.run
    .qsp.read.fromCallback[`publish]
    .qsp.decode.arrow[.qsp.use (!) . flip enlist (`asList; 1b)]
    .qsp.write.toVariable[`output]

publish input
output
0    1     2     3     4
10.1 10.2  10.3  10.4  10.5
,"a" "bbb" "ccc" "ddd" ,"e"

.qsp.decode.csv

Parse CSV data to a table

.qsp.decode.csv[schema]
.qsp.decode.csv[schema; delimiter]
.qsp.decode.csv[schema; delimiter; .qsp.use (!) . flip (
    (`header     ; header);
    (`exclude    ; exclude);
    (`schemaType ; schemaType);
    (`encoding   ; encoding))]

Parameters:

name type description default
schema dict or string A table with the desired output schema; or a list of types to support as type characters. Required
delimiter character Field separator for the records in the encoded data. ","

options:

name type description default
header symbol Whether encoded data starts with a header row, either none, first or always. first
exclude symbol[] or int[] A list of columns to exclude from the output. ()
schemaType symbol How to interpret the provided schema object. By default the schema is treated as the desired literal output. Alternatively, this can be set to be schema and a special table of ([] name: `$(); datatype: `short$()) can be provided describing the desired output. literal
encoding symbol How the data is expected to be encoded when being consumed. Currently supported options for this are ASCII and UTF8 UTF8

For all common arguments, refer to configuring operators

This operator decodes CSV data from strings or bytes with delimiter-separated values into tables in the given schema.

Note

When dealing with non-ASCII encoding schemes, the CSV decoding logic will check for and remove byte order mark prefixes on the incoming data

Byte Order Marks

Depending on how string data is presented, the byte order mark may or may not be visible. This can lead to mysterious errors that are hard to track down, so be sure to use the UTF8 encoding option when processing data prefixed with a byte order mark.

q)bom: "c"$0xEFBBBF;
q)ascii: "Hello, World!";
q)utf8: bom,"Hello, World!";
q)`$ascii
`Hello, World!
q)`$utf8
`Hello, World!
q)(`$ascii) ~ `$utf8
0b
q)ascii
"Hello, World!"
q)utf8
"\357\273\277Hello, World!"

Decode from a CSV file:

// Generate a random table of data and store it in an inventory file.
n: 10000
t: ([]
  date: n?.z.d + neg til 10;
  price: (n?1000)div 100;
  item: n?`3;
  description: {rand[100]?.Q.an}each til n;
  quantity: n?10000)

`:/tmp/inventory.csv 0: csv 0: t

// Read and parse the data from a file
schema: ([] date: `date$(); price:`int$();
  item:`symbol$(); description: (); quantity:`long$());

.qsp.run
  .qsp.read.fromFile["/tmp/inventory.csv"]
  .qsp.decode.csv[schema]
  .qsp.write.toConsole[]
                             | date       price item description                      quantity
-----------------------------| ---------------------------------------------------------------
2021.07.16D19:45:03.929480200| 2021.07.16 7     ehm  "3qupWqmNh6y8TeTzJlW49NlRzv0_0"  2659
2021.07.16D19:45:03.929480200| 2021.07.14 2     iif  "_eB_lq"                         8257
2021.07.16D19:45:03.929480200| 2021.07.12 7     eod  "GhUgGe3PH9Ie2NOw"               3907
2021.07.16D19:45:03.929480200| 2021.07.11 0     goj  "Dvmemf3H2P"                     6100
2021.07.16D19:45:03.929480200| 2021.07.09 1     bpm  "GbSjldDmUprmfiBa0UI8I"          367
..

Decode from a CSV file using a dictionary schema:

// Generate a random table of data and store it in an inventory file.
n: 10000
t: ([]
  date: n?.z.d + neg til 10;
  price: (n?1000)div 100;
  item: n?`3;
  description: {rand[100]?.Q.an}each til n;
  quantity: n?10000)

`:/tmp/inventory.csv 0: csv 0: t

// Read and parse the data from a file
schema: `date`price`item`description`quantity!"dis*j";

.qsp.run
  .qsp.read.fromFile["/tmp/inventory.csv"]
  .qsp.decode.csv[schema]
  .qsp.write.toConsole[]
                             | date       price item description                      quantity
-----------------------------| ---------------------------------------------------------------
2021.07.16D19:45:03.929480200| 2021.07.16 7     ehm  "3qupWqmNh6y8TeTzJlW49NlRzv0_0"  2659
2021.07.16D19:45:03.929480200| 2021.07.14 2     iif  "_eB_lq"                         8257
2021.07.16D19:45:03.929480200| 2021.07.12 7     eod  "GhUgGe3PH9Ie2NOw"               3907
2021.07.16D19:45:03.929480200| 2021.07.11 0     goj  "Dvmemf3H2P"                     6100
2021.07.16D19:45:03.929480200| 2021.07.09 1     bpm  "GbSjldDmUprmfiBa0UI8I"          367
..

.qsp.decode.gzip

(Beta Feature) Inflates (decompresses) gzipped data

Beta Features

To enable beta features, set the environment variable KXI_SP_BETA_FEATURES to true.

Inflates a gzipped stream of bytes into an uncompressed stream of bytes. This decoder will inflate as much data as is available in the inbound stream and buffer any trailing data until the next byte buffer is received. Once data has been inflated, it is passed to the next node in the pipeline.

Fault Tolerance

The gzip decoder is currently marked as beta as it does not currently support fault tolerant replay. If a pipeline fails and is forced to replay data, the gzip decoder will fail with an incomplete byte stream. Fault tolerance support is coming in a future release.

.qsp.decode.gzip[]

For all common arguments, refer to configuring operators

This operator inflates a gzipped byte or char stream while preserving the shape and type of the incoming data.

Decode a stream of gzipped CSV data:

Permissions

When running this example inside a kxi-sp-worker image, you must first run system "cd /tmp" to avoid encountering permissions errors.

`:table.csv 0: csv 0: ([] date: .z.d; sym: 100?3?`3; price: 100?1000f; quantity: 100?100)
system "gzip table.csv"

.qsp.run
    .qsp.read.fromFile[`:table.csv.gz]
    .qsp.decode.gzip[]
    .qsp.decode.csv["DSFJ"]
    .qsp.write.toVariable[`output]

output
date       sym price    quantity
--------------------------------
2022.10.26 aci 106.9924 90
2022.10.26 lcn 422.2224 73
2022.10.26 lcn 767.486  90
2022.10.26 bdp 885.1612 43
2022.10.26 bdp 435.8676 90
2022.10.26 lcn 77.88199 84
..

.qsp.decode.json

Parse JSON data

.qsp.decode.json[]
.qsp.decode.json[.qsp.use enlist[`decodeEach]!enlist decodeEach]

options:

name type description default
decodeEach boolean By default messages passed to the decoder are treated as a single JSON object. Setting decodeEach to true indicates that parsing should be done on each value of a message. This is useful when decoding data that has objects separated by newlines. This allows the pipeline to process partial sets of the JSON file without requiring the entire block to be in memory. 0b

For all common arguments, refer to configuring operators

This operator parses JSON strings to native q types, usually either a dictionary or a table.

Decode JSON from a file:

// Generate a random table of data and write it as JSON data
n: 10000;
t: ([]
  date: n?.z.d + neg til 10;
  price: (n?1000)div 100;
  item: n?`3;
  description: {rand[100]?.Q.an}each til n;
  quantity: n?10000);

`:/tmp/inventory.json 0: enlist .j.j t;

.qsp.run
  .qsp.read.fromFile["/tmp/inventory.json"]
  .qsp.decode.json[]
  .qsp.write.toConsole[];
                             | date         price item  description                                        quantity
-----------------------------| ------------------------------------------------------------------------------------
2021.10.05D19:40:04.536274000| "2021-10-01" 8     "eke" "PlND7JnZejE5j8aKJxSmqLTJycOsxkgTgqz2dB6mH3Q"      5963
2021.10.05D19:40:04.536274000| "2021-10-05" 0     "ldc" "ngctTMTD5PkkTSTOZ_3pwgy2vISuvnJYy"                3057
2021.10.05D19:40:04.536274000| "2021-10-05" 7     "ikb" "nFBU7"                                            8986
2021.10.05D19:40:04.536274000| "2021-09-28" 9     "lhp" "JH9NSxL7UNBGRZ49MYDX9qu_BUYmZoGu11G_GSV"          9488
2021.10.05D19:40:04.536274000| "2021-10-05" 3     "eoi" "E0hp_zZUBAfKERSPvdz_UZnKX07iBe2sd9TgH4mJmFtsLyap" 1301
..

Decode a stream of JSON:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.decode.json[.qsp.use``decodeEach!11b]
  .qsp.write.toConsole[];

publish .j.j each ([] date: .z.d + til 10; price: 10?100f)
                             | date         price
-----------------------------| ---------------------
2021.10.05D19:47:01.576948900| "2021-10-05" 22.56381
2021.10.05D19:47:01.576948900| "2021-10-06" 51.2789
2021.10.05D19:47:01.576948900| "2021-10-07" 34.48978
2021.10.05D19:47:01.576948900| "2021-10-08" 69.06853
2021.10.05D19:47:01.576948900| "2021-10-09" 71.53166
..

.qsp.decode.protobuf

Parse Protocol Buffer messages to a dictionary or list

.qsp.decode.protobuf[message]
.qsp.decode.protobuf[message; .qsp.use (!) . flip (
    (`file         ; file);
    (`format       ; format);
    (`asList       ; asList))]

Parameters:

name type description default
message string or symbol The name of the Protocol Buffer message type to decode. Required

options:

name type description default
file symbol The path to a .proto file containing the message type definition. Either format or file must be provided. `
format string A string definition of the Protocol Buffer message format to decode. ""
asList boolean By default, the output is a dictionary which includes the field names. But if this option is set to True, then only the list of values is outputted. 0b

For all common arguments, refer to configuring operators

This operator decodes protobuf-encoded messages of the chosen message type, given a protobuf schema containing that message type. The protobuf schema can be provided either as a file using the file parameter, or as a string using the format option. Decoded messages are outputted as either a dictionary or list depending on the value of the asList option.

Import paths

To import your .proto file, the folder containing the .proto file is added as an import path. This means the folder will be scanned when importing future .proto files, so it is important that you avoid having .proto files with the same filename present in import paths you use.

Decode protobuf messages using a Person.proto file:

// Person.proto
syntax="proto3";
message Person {
  string name = 1;
  int32 id = 2;
  string email = 3;
}

.qsp.run
    .qsp.read.fromCallback[`publish]
    .qsp.decode.protobuf[`Person;"Person.proto"]
    .qsp.write.toConsole[];

// The bytes listed below are an example encoded Protocol Buffer payload
publish 0x0a046e616d6510651a0f656d61696c40656d61696c2e636f6d;
2021.11.09D19:05:29.933331149 | name | "name"
2021.11.09D19:05:29.933331149 | id   | 101i
2021.11.09D19:05:29.933331149 | email| "email@email.com"

Decode protobuf messages using format into lists:

format: "syntax=\"proto3\";
 message Person {
    string name = 1;
    int32 id = 2;
    string email = 3;
 }";

.qsp.run
    .qsp.read.fromCallback[`publish]
    .qsp.decode.protobuf[.qsp.use `message`format`asList!(`Person;format;1b)]
    .qsp.write.toConsole[];

// The bytes listed below are an example encoded Protocol Buffer payload
publish 0x0a046e616d6510651a0f656d61696c40656d61696c2e636f6d;
2021.11.09D19:11:40.422536637 | "name"
2021.11.09D19:11:40.422536637 | 101i
2021.11.09D19:11:40.422536637 | "email@email.com"