Lifecycle
This page provides an overview for managing actions upon various pipeline events.
Finish Task
Marks a task as finished.
.qsp.finishTask[op;taskID]
Indicate that the specified task in the specified operator has finished processing. If all tasks for the operator are finished, and the operator has been requested to finish, the operator is marked as finished.
Parameters:
| name | type | description | default |
|---|---|---|---|
| op | .qsp.op | A pipeline operator configuration, or the name of the operator as a symbol. | Required |
| taskID | int | A task identifier. | Required |
Examples:
Register an asynchronous task for true async using .qsp.apply.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.apply[
{[op; md; data]
// Register a task that represents the unfinished async kurl GET request
tid: .qsp.registerTask[op];
.kurl.async (
data`url;
"GET";
``callback!(
::;
{[op;md;data;tid;r] // GET request with a callback
.qsp.finishTask[op;tid]; // Mark the task as finished
data[`response]: r 1; // Add GET request response to data
.qsp.push[op;md;data]; // Push enriched data to the next operator
}[op;md;data;tid]
)
)
}
]
.qsp.write.toConsole[]
publish `url`response!("https://www.google.ca"; "");
2021.10.02D11:49:58.896623200 | url | "https://www.google.ca"
2021.10.02D11:49:58.896623200 | response| "<!doctype html><html ..
See Register Task for more information
sp.finish_task(operator, task_id)
Indicate that the specified task in the specified operator has finished processing. If all tasks for the operator are finished, and the operator has been requested to finish, the operator is marked as finished.
Parameters:
| name | type | description | default |
|---|---|---|---|
| operator | Operator | The operator in which the task was created. | Required |
| task_id | int | The task identifier. | Required |
Examples:
Register an asynchronous task using sp.apply.
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> data = kx.Dictionary({'url': kx.CharVector('https://www.google.com'), 'response': kx.CharVector('')})
>>> kx.q('''
.my.callback:{[op; md; data]
/ Register a task that represents the unfinished async kurl GET request
tid: .qsp.registerTask[op];
data: .pykx.toq data;
.kurl.async (
data`url;
"GET";
``callback!(
::;
{[op;md;data;tid;r] / GET request with a callback
.pykx.qeval["lambda x,y : sp.finish_task(x,y)"][op;tid];
data[`response]: r 1; / Add GET request response to data
.qsp.push[op;md;data]; / Push enriched data to the next operator
}[op;md;data;tid]
)
)
}
''')
>>> def process(op, md, data):
kx.q('.my.callback', op, md, data)
>>> sp.run(sp.read.from_callback('publish')
| sp.apply(process)
| sp.write.to_console())
>>> kx.q('publish', data)
See Register Task for more information.
On Error
Sets the onError handler for the pipeline - the global handler that will be called when an error occurs.
.qsp.onError[handler]
Parameters:
| name | type | description |
|---|---|---|
| handler | function | A ternary function that is passed any messages that cause an error in the pipeline. The first argument is the error message, the operator that threw the error and the batch of data that cause the error. |
The onError event handler is called when an error occurs in a pipeline. The handler
will be passed an error message, the operator that threw the error, and the
batch that caused the error.
Examples:
Cause a pipeline to throw an error by trying to add a symbol to int.
.qsp.onError {[msg; op; data]
`.errors.cache upsert (`time`msg`op`data)!(.z.p;msg;op;data)};
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.map[{[x] 1+`e}]
.qsp.write.toConsole[];
publish til 10
type - error in operator: map
.sp.compileOp@[37] 'errMsg;
^
.sp.compileOp@[0] {[fn; err; msg] .sputil.Qtrp[fn; msg; err msg]}
^
.sp.i.setPush@[0] {[id;md;data] Runtime[id]@\:(md;data); }
^
{}[0] {.sp.pushid[x`id;y;z]}
^
> publish til 10
^
The error information should be stored in .errors.cache.
time| 2025.12.03D15:34:08.229743758
msg | "type"
op | `id``function`inputs`outputs`config`metadata`edge!(`map;::;{[x] 1+`e};(,`)!,"*";(,`)!,"*";(,`allowPartials)!,1b;`plugin`params`initialState`version`stateful`type!(`map;`symbol$();::;1;0b;`operator);`)
data| ((,`)!,::;0 1 2 3 4 5 6 7 8 9)
Parameters:
| name | type | description |
|---|---|---|
| handler | function | A function that will be called when an error occurs in a pipeline. It will be provided three arguments: the error message (as a character vector), the operator (as a q dictionary which can be passed into other kxi.sp functions that take an operator), and the batch of data that was being processed when the error occurred. Its return value is ignored. |
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> errored = False
>>> def on_error(errMsg, op, data):
errored = True
>>> sp.lifecycle.on_error(on_error)
>>> sp.run(sp.read.from_callback('publish')
| sp.map(lambda x: 1 / 0)
| sp.write.to_console())
>>> kx.q('publish',range(10))
>>> print(errored)
True
On Checkpoint
Sets the onCheckpoint handler - the global handler that will be called when a checkpoint is being created.
.qsp.onCheckpoint[handler]
Parameters:
| name | type | description |
|---|---|---|
| handler | function | A nullary function to execute during checkpointing. |
This sets the onCheckpoint handler for the pipeline.
The onCheckpoint handler for the pipeline is called whenever a checkpoint is created.
The return value is saved to the checkpoint. This value will be passed into the
onPostCheckpoint and onRecover handlers, and can be used for tracking files.
Examples:
Set up a handler to run after checkpoint has been created.
checkpointed:: 0b;
stream:
.qsp.read.fromCallback[`publish]
.qsp.write.toConsole[];
.qsp.onCheckpoint[{ checkpointed:: 1b; til 10 }];
.qsp.run stream;
The value of checkpointed should be '1b' after checkpoint has been created.
checkpointed
1b
sp.lifecycle.on_checkpoint(my_on_checkpoint_function)
Parameters:
| name | type | description |
|---|---|---|
| handler | function | A function that will be called when a checkpoint is being created. It will not be provided any arguments. The return value is converted to a q type, then saved in the checkpoint. This value will be passed into the on_post_checkpoint and on_recover handlers, and can be used for tracking files. |
Examples:
Set up a handler to run when a checkpoint is being created.
>>> from kxi import sp
>>> import pykx as kx
>>> import datetime
>>> checkpointed = False
>>> def on_checkpoint():
checkpointed = True
return datetime.datetime.now()
>>> sp.lifecycle.on_checkpoint(on_checkpoint)
>>> sp.run(sp.read.from_callback('publish')
| sp.write.to_console())
The value of recovered should be 'True' after recovery.
print(checkpointed)
True
On Operator Checkpoint
Sets the onOperatorCheckpoint event handler for a specific operator - a per-operator handler that will be called when a checkpoint is being created.
.qsp.onOperatorCheckpoint[id;handler]
Parameters:
| name | type | description |
|---|---|---|
| id | symbol | An operator ID. |
| handler | function | A unary function which executes before an operator performs a checkpoint. |
The onOperatorCheckpoint event handler is called when a checkpoint is created, before operator
state is saved. This provides an opportunity for an operator to make updates to its state
before it is checkpointed. The return value of the handler is also added to the checkpoint,
and will be passed to the operators onOperatorRecover function on recovery.
Examples:
Set up a handler to run when a checkpoint is created, before operator state is saved.
operatorCheckpointed:: 0b;
stream:
.qsp.read.fromCallback[`publish]
.qsp.map[{x}; .qsp.use ``name!``identity]
.qsp.write.toConsole[];
.qsp.onOperatorCheckpoint[`identity; {[op] operatorCheckpointed:: 1b; til 45 }];
.qsp.run stream;
The value of operatorCheckpointed should be true after checkpoint has been created.
operatorCheckpointed
1b
The handler will be called before the specified operator's state is saved. This provides an opportunity to make updates to the state before it is checkpointed.
sp.lifecycle.on_operator_checkpoint(operator, handler)
Readers Unsupported
This cannot be used for reader operators.
Parameters:
| name | type | description |
|---|---|---|
| operator | OperatorSpecifier | An operator for which to assign the handler. |
| handler | Callable[[kx.Dictionary], Any] | A unary function which executes before an operator performs a checkpoint. |
The on_operator_checkpoint event handler is called when a checkpoint is created, before operator
state is saved. This provides an opportunity for an operator to make updates to its state
before it is checkpointed. The return value of the handler is also added to the checkpoint,
and will be passed to the operators on_operator_recover function on recovery.
Examples:
Set up a handler to run when a checkpoint is created, before the operator state is saved.
>>> from kxi import sp
>>> import pykx as kx
>>> operator_checkpointed = False
>>> def on_operator_checkpoint(op):
>>> operator_checkpointed = True
>>> sp.lifecycle.on_operator_checkpoint('callback', on_operator_checkpoint)
>>> sp.run(sp.read.from_callback('publish', name='callback')
| sp.write.to_console())
The value of operator_checkpointed should be 'True' after checkpoint has been created.
print(operator_checkpointed)
True
On Operator Post Checkpoint
Sets the onOperatorPostCheckpoint event handler for a specific operator - a per-operator handler that will be called when a checkpoint is being created.
.qsp.onOperatorPostCheckpoint[id;handler]
Parameters:
| name | type | description |
|---|---|---|
| id | symbol | An operator ID. |
| handler | function | A ternary function to execute after a checkpoint has been saved. |
Event Handler
The onOperatorPostCheckpoint event handler is called after a checkpoint has either been
saved to disk or written to the Controller.
It has arguments:
- operator
- checkpointed operator state (i.e. state set via Set)
- data returned by the operator's
onCheckpointhandler
Readers Unsupported
The operator post checkpoint hook cannot be used on a reader operator. This includes any operators from the .qsp.read.* collection.
Examples:
Set up a handler to run after a checkpoint has either been saved to disk or written to the Controller.
operatorPostCheckpointed:: 0b;
stream:
.qsp.read.fromCallback[`publish]
.qsp.map[{x}; .qsp.use ``name!``identity]
.qsp.write.toConsole[];
.qsp.onOperatorPostCheckpoint[`identity;{[x;y;z] operatorPostCheckpointed:: 1b}];
.qsp.run stream;
.spcp.WrittenCheckpoint: (!) . flip (
(`checkpoint; .sp.createCheckpoint[]);
(`id;0j));
The value of operatorPostCheckpointed should be '1b' after checkpoint has been saved.
operatorPostCheckpointed
1b
The handler will be called before the specified operator's state is saved. This provides an opportunity to make updates to the state before it is checkpointed.
sp.lifecycle.on_operator_post_checkpoint(operator, handler)
Readers Unsupported
This cannot be used for reader operators.
Parameters:
| name | type | description | default |
|---|---|---|---|
| operator | OperatorSpecifier | An operator for which to assign the handler. | Required |
| handler | Callable[[kx.Dictionary, kx.K, kx.K] , None] | A ternary function to execute after a checkpoint has been saved. | |
| Return value is converted to a q type, then saved in the checkpoint. | Required |
Event Handler
The on_operator_post_checkpoint event handler is called after a checkpoint has either been
saved to disk or written to the Controller.
It has arguments:
- operator
- checkpointed operator state (i.e. state set via Set)
- data returned by the operator's
on_checkpointhandler
Readers Unsupported
The operator post checkpoint hook cannot be used on a reader operator. This includes any operators from the sp.read.* collection.
Examples:
Set up a handler to run before the operator's state is saved.
>>> from kxi import sp
>>> import pykx as kx
>>> operator_post_checkpointed = False
>>> def on_operator_post_checkpoint(op, state, data):
operator_post_checkpointed = True
>>> sp.lifecycle.on_operator_post_checkpoint('callback', on_operator_post_checkpoint)
>>> sp.run(sp.read.from_callback('publish', name='callback')
| sp.write.to_console())
The value of operator_post_checkpointed should be 'True' after the checkpoint has been saved.
print(operator_post_checkpointed)
True
On Operator Recover
Sets the onOperatorRecover event handler for a specific operator - a per-operator handler that will be called when a pipeline recovers from a checkpoint.
.qsp.onOperatorRecover[id;handler]
Parameters:
| name | type | description | default |
|---|---|---|---|
| id | symbol | An operator ID. | Required |
| handler | Callable[[kx.Dictionary, kx.K], None] | A binary function to execute after an operator recovers. | Required |
Sets the onOperatorRecover event handler of the operator with the id.
The onOperatorRecover handler is called when a pipeline recovers from a checkpoint.
Operator state has already been restored once this handler has been called. This provides
an opportunity for operators to make updates to their state before the pipeline starts running.
Reader Unsupported
The operator recover hook cannot be used on a reader operator. This includes any operators from the .qsp.read.* collection."
Examples:
Set up handler to run on recovery before the pipeline starts.
TABLE: ();
.qsp.onOperatorCheckpoint[`map;{[op] TABLE }];
.qsp.onOperatorRecover[`map; {[op;t] TABLE:: t}];
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.map[{
`TABLE upsert x;
x
}; .qsp.use ``name!(::;`map)]
.qsp.write.toConsole[];
Operator state has already been restored by the time the recovery handler is called. This provides an opportunity to edit the state before the pipeline starts.
See also: on_operator_checkpoint, which can be used to set an operator's state before it is checkpointed.
sp.lifecycle.on_operator_recover(operator, handler)
Readers Unsupported
This cannot be used for reader operators.
Parameters:
| name | type | description | default |
|---|---|---|---|
| operator | OperatorSpecifier | An operator for which to assign the handler. | Required |
| handler | function | A binary function to execute after an operator recovers. It will be provided 2 arguments: the operator, and the checkpoint data provided by the operator's on_operator_checkpoint handler. Its return value is ignored. |
Required |
Examples:
Set up handler to run after an operator recovers.
>>> from kxi import sp
>>> import pykx as kx
>>> operator_recovered = False
>>> def on_operator_recover(op, data):
operator_recovered = True
>>> sp.lifecycle.on_operator_recover('callback', on_operator_recover)
>>> sp.run(sp.read.from_callback('publish', name='callback')
| sp.write.to_console())
The value of operator_recovered should be 'True' after recovery.
print(operator_recovered)
True
On Post Checkpoint
Sets the onPostCheckpoint handler - the global handler that will be called after a checkpoint has been created.
.qsp.onPostCheckpoint[handler]
Parameters:
| name | type | description | default |
|---|---|---|---|
| handler | function | A unary function to be called after a checkpoint has been written. | Required |
This sets the onPostCheckpoint handler for the pipeline.
The onPostCheckpoint handler for the pipeline is called after a checkpoint has been written
either to disk or to the controller, and has been acknowledged by the controller.
Its argument is the data returned by the pipeline's onCheckpoint handler.
Examples:
Set up a handler to run after the checkpoint has been written.
result:: "";
.qsp.onOperatorPostCheckpoint[`identity;{[op;state;custom] result:: (op;state;custom)}];
.spcp.WrittenCheckpoint: (!) . flip (
(`checkpoint; checkpoint`checkpoint);
(`id;0j));
The value of the result should be '1b' once the checkpoint has been acknowledged.
result ~ (.sp.getOperator identity; checkpoint[checkpoint][app;identity]; til 45)
```txt
1b
The handler will be called after the checkpoint has been written to disk and acknowledged by the controller.
sp.lifecycle.on_post_checkpoint(handler)
Parameters:
| name | type | description | default |
|---|---|---|---|
| handler | Callable[[kx.K], None] | A function that will be called after a checkpoint has been created. It will be provided one argument, which is the return value of the on_checkpoint handler. Its return value is ignored. |
Required |
Examples:
Set up a handler to run after the checkpoint has been written.
>>> from kxi import sp
>>> import pykx as kx
>>> import datetime
>>> def on_checkpoint():
return datetime.datetime.now()
>>> sp.lifecycle.on_checkpoint(on_checkpoint)
>>> post_checkpointed = False
>>> def on_post_checkpoint():
post_checkpointed = True
>>> sp.lifecycle.on_post_checkpoint(on_post_checkpoint)
>>> sp.run(sp.read.from_callback('publish')
| sp.write.to_console())
on_post_checkpoint should be 'True' after checkpoint has been created and acknowledged by the controller.
print(post_checkpointed)
True
On Recover
Sets the onRecover handler - the global handler that will be called when a pipeline recovers from a checkpoint.
.qsp.onRecover[handler]
Parameters:
| name | type | description | default |
|---|---|---|---|
| handler | function | A unary function to execute after recovery. | Required |
This sets the onRecover handler for the pipeline.
The onRecover handler for the pipeline is called when a pipeline recovers from a checkpoint.
Its argument is the data returned by the pipeline's onCheckpoint handler.
Examples:
Set up handler to run after recovery.
result:: (::);
stream:
.qsp.read.fromCallback[`publish]
.qsp.write.toConsole[];
.qsp.onRecover[{[state] result:: state; }];
.qsp.run stream;
publish til 10
checkpoint:.sp.createCheckpoint[];
Check the value of result after the pipeline has recovered from a checkpoint.
result ~ til 10
1b
sp.lifecycle.on_recover(handler)
Parameters:
| name | type | description | default |
|---|---|---|---|
| handler | function | A function that will be called when a pipeline recovers from a checkpoint. It will be provided one argument, which is whatever was returned by the last run checkpoint handler (see on_checkpoint). Its return value is ignored. |
Required |
Examples:
Set up handler to run after recovery.
>>> from kxi import sp
>>> import pykx as kx
>>> recovered = False
>>> def on_recover():
recovered = True
>>> sp.lifecycle.on_recover(on_recover)
>>> sp.run(sp.read.from_callback('publish')
| sp.write.to_console())
The value of recovered should be 'True' after recovery.
print(recovered)
True
On Setup
Sets the onSetup event handler - the global handler that will be called before a pipeline has finished initializing.
.qsp.onSetup[handler]
Parameters:
| name | type | description |
|---|---|---|
| handler | function | A nullary function to execute after all operators in a pipeline are setup. |
This sets the onSetup event handler for the pipeline.
The onSetup event handler is called before the pipeline is finished initializing.
Examples:
.test.setup: 0b;
.qsp.onSetup {.test.setup: 1b};
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toVariable[`test.cache];
/ .test.setup should be true after pipeline setup
.test.setup
1b
sp.on_setup(handler)
Parameters:
| name | type | description |
|---|---|---|
| handler | Callable[[], None] | A function that will be called before a pipeline has finished initializing, after all operator in a pipeline are setup. It will not be provided any arguments, and its return value is ignored. |
Examples:
>>> from kxi import sp
>>> setup = False
>>> def on_setup():
setup = True
>>> sp.lifecycle.on_setup(on_setup)
>>> sp.run(sp.read.from_callback('publish')
| sp.write.to_console())
>>> # setup should be True after pipeline has finished initalizing
>>> print(setup)
True
On Start
Sets the onStart event handler - the global handler that will be called when all operators in a pipeline have started.
.qsp.onStart[handler]
Parameters:
| name | type | description |
|---|---|---|
| handler | function | A nullary function to execute after all operators in a pipeline are started. |
This sets the onStart event handler for the pipeline.
The onStart event handler is called after the pipeline has invoked start on its readers; it returns generic null.
Examples:
.test.start: 0b;
.qsp.onStart {.test.start: 1b};
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toVariable[`test.cache];
/ .test.setup should be true after pipeline setup
.test.start
1b
sp.on_start(handler)
Parameters:
| name | type | description |
|---|---|---|
| handler | Callable[[], None] | A function that will be called when all operators in a pipeline have started. It will not be provided any arguments, and its return value is ignored. |
Examples:
>>> from kxi import sp
>>> started = False
>>> def on_start():
started = True
>>> sp.lifecycle.on_start(on_start)
>>> sp.run(sp.read.from_callback('publish')
| sp.write.to_console())
>>> # started should be True when all operators in the pipeline have started
>>> print(started)
True
On Finish
Sets the onFinish event handler.
.qsp.onFinish[handler]
Parameters:
| name | type | description |
|---|---|---|
| handler | function | A unary function that accepts an operator configuration to execute when a given operator is finished. |
This sets the onFinish event handler for the pipeline.
The onFinish event handler is called when an operator is complete; it returns generic null.
Examples:
.test.finish: 0b;
.qsp.onFinish[{.test.finish: 1b}];
.qsp.run
.qsp.read.fromExpr["til 5"]
.qsp.write.toConsole[];
.test.finish
1b
sp.on_finish(handler)
Parameters:
| name | type | description |
|---|---|---|
| handler | Callable[[kx.CharVector], None] | A unary function that accepts an operator configuration to execute when a given operator is finished. |
This sets the on_finish event handler for the pipeline.
The on_finish event handler is called when an operator is complete; it returns None.
Examples:
>>> from kxi import sp
>>> finished = False
>>> def on_finish(event: kx.Dictionary):
finished = True
>>> sp.lifecycle.on_finish(on_finish)
>>> sp.run(sp.read.from_expr('til 10')
| sp.write.to_console())
>>> # finished should be True when the operator has finished
>>> print(finished)
True
On Teardown
Sets the onTeardown event handler - the global handler that will be called before a pipeline is torn down.
.qsp.onTeardown[handler]
Parameters:
| name | type | description |
|---|---|---|
| handler | function | A nullary function to execute before a pipeline is torn down. |
This sets the onTeardown event handler for the pipeline.
The onTeardown event handler is called after the pipeline is torn down; it returns generic null.
Examples:
.test.teardown: 0b;
.qsp.onTeardown {.test.teardown: 1b};
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toVariable[`test.cache];
.qsp.teardown[]
.test.teardown
1b
A teardown can be triggered by calling sp.teardown.
sp.lifecycle.on_teardown(handler)
Parameters:
| name | type | description |
|---|---|---|
| handler | Callable[[], None] | A function that will be called before a pipeline is torn down. It will not be provided any arguments, and its return value is ignored. |
Examples:
>>> from kxi import sp
>>> torndown = False
>>> def on_teardown():
torndown = True
>>> sp.lifecycle.on_teardown(on_teardown)
>>> sp.run(sp.read.from_callback('publish')
| sp.write.to_console())
>>> sp.teardown()
>>> assert torndown
True
Register Task
Registers a task for an operator.
.qsp.registerTask[op]
Parameters:
| name | type | description |
|---|---|---|
| op | .qsp.op | A pipeline operator. |
This registers a task for the operator and returns an identifier (int) for the task.
A task represents an asynchronous request that an operator made, and once that request is finished, the task should be marked as finished using Finish Task.
Register an asynchronous task for true async using Apply:
Examples:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.apply[
{[op; md; data]
// Register a task which represents the unfinished async kurl GET request
tid: .qsp.registerTask[op];
.kurl.async (
"https://www.google.ca";
"GET";
``callback!(::;
{[op;md;data;tid;r] // GET request with a callback
.qsp.finishTask[op;tid]; // Mark the task as finished
data[`response]: r 1; // Add GET request response to data
.qsp.push[op;md;data]; // Push enriched data to the next operator
}[op;md;data;tid]
)
);
}
]
.qsp.write.toConsole[]
sp.lifecycle.register_task(operator)
A task represents an asynchronous request that an operator made, and once that request is
finished, the task should be marked as finished using sp.finish_task.
Parameters:
| name | type | description |
|---|---|---|
| operator | OperatorSpecifier | The operator for which a task will be registered. |
Returns:
A task ID, which will be needed to call sp.finish_task.
Subscribe
Subscribes to a particular event with the provided callback.
.qsp.subscribe[eventType; handler]
Parameters:
| name | type | description |
|---|---|---|
| eventType | symbol | Type of an event. |
| handler | function | A unary function to execute after the eventType occurs. Takes the event as argument. |
An event is a dictionary with keys:
| key | q type | description |
|---|---|---|
| type | symbol | Type of the event. |
| time | timestamp | Timestamp. |
| origin | symbol | Origin identifier. |
| data | any | Payload of the event object. |
When an event is emitted matching the type of the event subscribed to, the provided callback will be invoked with the event object.
Returns:
| type | description |
|---|---|
| list | A 2-element list containing the event type and a subscription identifier to use to unsubscribe, {(symbol;long)}. |
Subscribe to file events File:
Examples:
.file.events: ([] eventType: `symbol$(); eventTime: `timestamp$(); origin: `symbol$(); data:());
.qsp.subscribe[ ;{`.file.events upsert value x}] each `file.found`file.start`file.progress`file.end;
`:/tmp/numbers.txt 0: string til 5;
.qsp.run
.qsp.read.fromFile[`:/tmp/numbers.txt]
.qsp.write.toVariable[`test.output];
q).file.events
eventType eventTime origin data
-------------------------------------------------------------------------------------------------------------------------
file.found 2022.11.18D09:41:14.111578506 file_:/tmp/numbers_txt (,`paths)!,,":/tmp/numbers.txt"
file.start 2022.11.18D09:41:14.111668936 file_:/tmp/numbers_txt `path`size!(":/tmp/numbers.txt";10)
file.progress 2022.11.18D09:41:14.111703746 file_:/tmp/numbers_txt `path`totalBytes`bytesRead!(":/tmp/numbers.txt";10;10)
file.end 2022.11.18D09:41:14.111710593 file_:/tmp/numbers_txt `path`size!(":/tmp/numbers.txt";10)
sp.subscribe(event_type, handler)
Parameters:
| name | type | description |
|---|---|---|
| event_type | kx.CharVector | Type of an event. |
| handler | Union[Callable[[kx.Dictionary], None], str] | A unary function to be called for this event. Can be a Python function or a string referencing a q function. |
The callback event will be a dictionary with keys:
| key | q type | Python type | description |
|---|---|---|---|
| type | symbol | str | Type of the event. |
| time | timestamp | kx.TimestampAtom | Timestamp. |
| origin | symbol | str | Origin identifier. |
| data | any | Any | Payload of the event object. |
When the SP notifies this event, the handler will be invoked with the event object.
A Python handler will return a pykx.Dictionary object.
Returns:
| type | description |
|---|---|
| List[Any] | A 2-element list containing the event type and a subscription identifier to use to unsubscribe. |
Examples:
Python function callback
Use a Python function callback.
>>> import pykx as kx
>>> from kxi import sp
>>> # define events to subscribe to and table to store them in
>>> types = ['file.found','file.start','file.progress','file.end']
>>> events = kx.Table(data={'eventType': kx.SymbolVector([]),
'eventTime': kx.TimestampVector([]),
'origin': kx.SymbolVector([]),
'data': kx.Identity(None)})
>>> # define events handler
>>> def update_file_events(event: kx.Dictionary):
events.upsert(event.values())
>>> for typ in types:
sp.subscribe(typ, update_file_events)
>>> # create a file and run pipeline to read it
>>> kx.q('`:/tmp/numbers.txt 0: string til 5;')
>>> sp.run(sp.read.from_file('/tmp/numbers.txt')
| sp.write.to_variable('out'))
>>> # check events table
events
pykx.Table(pykx.q('
eventType eventTime origin data
---------------------------------------------------------------------------------------------------------
file.found 2025.02.21D12:00:38.673846000 file (,`paths)!,,":/tmp/numbers.txt"
file.start 2025.02.21D12:01:18.920759000 file `path`size!(":/tmp/numbers.txt";10)
file.progress 2025.02.21D12:01:18.921781000 file `path`totalBytes`bytesRead!(":/tmp/numbers.txt";10;10)
file.end 2025.02.21D12:01:18.922243000 file `path`size!(":/tmp/numbers.txt";10)
'))
Use a q function callback
>>> import pykx as kx
>>> from kxi import sp
>>> # define events to subscribe to, an events table schema and set as a q global variable
>>> types = ['file.found','file.start','file.progress','file.end']
>>> events = kx.Table(data={'eventType': kx.SymbolVector([]),
'eventTime': kx.TimestampVector([]),
'origin': kx.SymbolVector([]),
'data': kx.Identity(None)})
>>> kx.q.set('qevents', events)
>>> # define a q event handler
>>> kx.q('updateEvents:{`qevents upsert value x}')
>>> for typ in types:
sp.subscribe(typ, update_file_events)
# create a file and run pipeline to read it
>>> kx.q('`:/tmp/numbers.txt 0: string til 5;')
>>> sp.run(sp.read.from_file('/tmp/numbers.txt')
| sp.write.to_variable('out'))
>>> # check events table
>>> kx.q('qevents')
pykx.Table(pykx.q('
eventType eventTime origin data
---------------------------------------------------------------------------------------------------------
file.found 2025.02.21D12:33:39.231703197 file (,`paths)!,,":/tmp/numbers.txt"
file.start 2025.02.21D12:33:41.812599917 file `path`size!(":/tmp/numbers.txt";10)
file.progress 2025.02.21D12:33:41.812831273 file `path`totalBytes`bytesRead!(":/tmp/numbers.txt";10;10)
file.end 2025.02.21D12:33:41.812841521 file `path`size!(":/tmp/numbers.txt";10)
'))
Unsubscribe
Unsubscribes from event callbacks. You can subscribe from a single subscription or from all subscriptions for a given event.
.qsp.unsubscribe[id]
Parameters:
| name | q type | description |
|---|---|---|
| id | symbol or (symbol;long) | If given just an event type, all subscriptions are removed for that event. Alternatively a single subscription can be subscribed using the ID returned by the call to .qsp.subscribe. |
Returns:
| type | description |
|---|---|
| null |
Examples:
Unsubscribe all subscribers for a given event_type
.file.events: ([] eventType: `symbol$(); eventTime: `timestamp$(); origin: `symbol$(); data:());
.qsp.subscribe[`file.found; {`.file.events upsert value x}];
.qsp.unsubscribe[`file.found];
Unsubscribe from a specific event subscription
.file.events: ([] eventType: `symbol$(); eventTime: `timestamp$(); origin: `symbol$(); data:());
id:.qsp.subscribe[`file.found ; {`.file.events upsert value x}];
.qsp.unsubscribe[id];
sp.unsubscribe(id)
Parameters:
| name | type | description |
|---|---|---|
| id | Union[str, List[Any]] | If given just an event type, all subscriptions are removed for that event. Alternatively a single subscription can be subscribed using the ID returned by the call to .qsp.subscribe. |
Unsubscribes a particular subscriber or all subscribers for a given event type.
Returns:
| type | description |
|---|---|
| None |
Examples:
Unsubscribe all subscribers for a given event_type
>>> import pykx as kx
>>> from kxi import sp
>>> def update_events(event):
print(event)
>>> sp.subscribe('file.found', update_events)
>>> # unsubscribe all file.found events
>>> sp.unsubscribe('file.found')
Unsubscribe a specific subscription
>>> import pykx as kx
>>> from kxi import sp
>>> def update_events(event):
print(event)
>>> id = sp.subscribe('file.found', update_events)
>>> sp.unsubscribe(id)