Skip to content

Static file

Read a static file, parse the data, perform a filter and a reduction on the data, then write the average to a function numberAvg on another kdb+ process

The same pipeline API is used for both static/batch and stream processing within the Stream Processor.

Replacing the .qsp.read.fromFile with .qsp.read.fromCallback would result in the same logic being applied to data sent to a stream callback.

Pipeline spec

Run the pipeline to read and process the file. Both specs create a sample file containing numbers to process.

`:/tmp/numbers.txt 0: string -500 + 10?1000;

stream:
  // The file containing a number on each line is read in as a single batch
  .qsp.read.fromFile[":/tmp/numbers.txt"]
  .qsp.map["J"$]      // Parse the string numbers to q long integers
  .qsp.filter[0<=]    // This will filter individual records out of the batch
  .qsp.map[avg]       // .qsp.map can return data of any shape, so can do reductions
  .qsp.write.toProcess[.qsp.use `handle`mode`target!`::1234`function`numberAvg]

// Note: This could also be written as a single step
// .qsp.map[{x: "J"$x; avg x where 0 <= x}

// When the file has been read, the pipeline will close automatically
.qsp.onFinish[{ -1 "Pipeline Complete"; exit 0 }]

.qsp.run stream
from kxi import sp
import numpy as np


with open('/tmp/numbers.txt', 'w') as f:
    f.writelines(str(x) + '\n' for x in np.random.randint(-500, 500, 500))

sp.run(sp.read.from_file('/tmp/numbers.txt', 'text')
    | sp.map('"J"$')
    | sp.filter(lambda data: 0 <= data)
    | sp.map(lambda data: data.np().mean())
    | sp.write.to_process(handle='::1234', mode='function', target='numberAvg'))