Static file

This example reads a static file, parses the data, performs a filter and a reduction on the data, then writes the average to a function numberAvg on another kdb+ process.

The same pipeline API is used for both static/batch and stream processing within the Stream Processor. Replacing the .qsp.read.fromFile with .qsp.read.fromCallback would result in the same logic being applied to data sent to a stream callback.

Setup

Before running the pipeline, create a file to process. Here we have a simple file containing numbers.

`:/tmp/numbers.txt 0: string -500 + 10?1000;

Pipeline spec

Run the pipeline to read and process the file.

stream:
    // The file containing a number on each line is read in as a single batch
    .qsp.read.fromFile[":/tmp/numbers.txt"]
    .qsp.map["J"$]      // Parse the string numbers to q long integers
    .qsp.filter[0<=]    // This will filter individual records out of the batch
    .qsp.map[avg]       // .qsp.map can return data of any shape, so it is able to do reductions
    .qsp.write.toProcess[`handle`mode`target!`::1234`function`numberAvg];

// Note: This could also be written as a single step
// .qsp.map[{x: "J"$x; avg x where 0 <= x}

// When the file has been read, the pipeline will close automatically
.qsp.onTeardown[{show "Pipeline Complete"; exit 0}];

.qsp.run stream