Static file
Read a static file, parse the data, perform a filter and a reduction on the data, then write the average to a function numberAvg
on another kdb+ process
The same pipeline API is used for both static/batch and stream processing within the Stream Processor.
Replacing the .qsp.read.fromFile
with .qsp.read.fromCallback
would result in the same logic being applied to data sent to a stream callback.
Pipeline spec
Run the pipeline to read and process the file. Both specs create a sample file containing numbers to process.
`:/tmp/numbers.txt 0: string -500 + 10?1000;
stream:
// The file containing a number on each line is read in as a single batch
.qsp.read.fromFile[":/tmp/numbers.txt"]
.qsp.map["J"$] // Parse the string numbers to q long integers
.qsp.filter[0<=] // This will filter individual records out of the batch
.qsp.map[avg] // .qsp.map can return data of any shape, so can do reductions
.qsp.write.toProcess[.qsp.use `handle`mode`target!`::1234`function`numberAvg]
// Note: This could also be written as a single step
// .qsp.map[{x: "J"$x; avg x where 0 <= x}
// When the file has been read, the pipeline will close automatically
.qsp.onFinish[{ -1 "Pipeline Complete"; exit 0 }]
.qsp.run stream
from kxi import sp
import numpy as np
with open('/tmp/numbers.txt', 'w') as f:
f.writelines(str(x) + '\n' for x in np.random.randint(-500, 500, 500))
sp.run(sp.read.from_file('/tmp/numbers.txt', 'text')
| sp.map('"J"$')
| sp.filter(lambda data: 0 <= data)
| sp.map(lambda data: data.np().mean())
| sp.write.to_process(handle='::1234', mode='function', target='numberAvg'))