Static file
Read a static file, parse the data, perform a filter and a reduction on the data, then write the average to a function numberAvg on another kdb+ process
The same pipeline API is used for both static/batch and stream processing within the Stream Processor.
Replacing the .qsp.read.fromFile with .qsp.read.fromCallback would result in the same logic being applied to data sent to a stream callback.
Pipeline spec
Run the pipeline to read and process the file. Both specs create a sample file containing numbers to process.
`:/tmp/numbers.txt 0: string -500 + 10?1000;
stream:
  // The file containing a number on each line is read in as a single batch
  .qsp.read.fromFile[":/tmp/numbers.txt"]
  .qsp.map["J"$]      // Parse the string numbers to q long integers
  .qsp.filter[0<=]    // This will filter individual records out of the batch
  .qsp.map[avg]       // .qsp.map can return data of any shape, so can do reductions
  .qsp.write.toProcess[.qsp.use `handle`mode`target!`::1234`function`numberAvg]
// Note: This could also be written as a single step
// .qsp.map[{x: "J"$x; avg x where 0 <= x}
// When the file has been read, the pipeline will close automatically
.qsp.onFinish[{ -1 "Pipeline Complete"; exit 0 }]
.qsp.run stream
from kxi import sp
import numpy as np
with open('/tmp/numbers.txt', 'w') as f:
    f.writelines(str(x) + '\n' for x in np.random.randint(-500, 500, 500))
sp.run(sp.read.from_file('/tmp/numbers.txt', 'text')
    | sp.map('"J"$')
    | sp.filter(lambda data: 0 <= data)
    | sp.map(lambda data: data.np().mean())
    | sp.write.to_process(handle='::1234', mode='function', target='numberAvg'))