Skip to content

Static file

Read a static file, parse the data, perform a filter and a reduction on the data, then write the average to a function numberAvg on another kdb+ process

The same pipeline API is used for both static/batch and stream processing within the Stream Processor.

Replacing the with would result in the same logic being applied to data sent to a stream callback.

Pipeline spec

Run the pipeline to read and process the file. Both specs create a sample file containing numbers to process.

`:/tmp/numbers.txt 0: string -500 + 10?1000;

  // The file containing a number on each line is read in as a single batch[":/tmp/numbers.txt"]["J"$]      // Parse the string numbers to q long integers
  .qsp.filter[0<=]    // This will filter individual records out of the batch[avg]       // can return data of any shape, so can do reductions
  .qsp.write.toProcess[.qsp.use `handle`mode`target!`::1234`function`numberAvg]

// Note: This could also be written as a single step
//[{x: "J"$x; avg x where 0 <= x}

// When the file has been read, the pipeline will close automatically
.qsp.onFinish[{ -1 "Pipeline Complete"; exit 0 }] stream
from kxi import sp
import numpy as np

with open('/tmp/numbers.txt', 'w') as f:
    f.writelines(str(x) + '\n' for x in np.random.randint(-500, 500, 500))'/tmp/numbers.txt', 'text')
    | sp.filter(lambda data: 0 <= data)
    | data:
    | sp.write.to_process(handle='::1234', mode='function', target='numberAvg'))