Lifecycle
This page provides an overview for managing actions upon various pipeline events.
Finish Task
Marks a task as finished.
.qsp.finishTask[op;taskID]
Indicate that the specified task in the specified operator has finished processing. If all tasks for the operator are finished, and the operator has been requested to finish, the operator is marked as finished.
Parameters:
name | type | description | default |
---|---|---|---|
op | .qsp.op | A pipeline operator configuration, or the name of the operator as a symbol. | Required |
taskID | int | A task identifier. | Required |
Examples:
Register an asynchronous task for true async using .qsp.apply
:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.apply[
{[op; md; data]
// Register a task that represents the unfinished async kurl GET request
tid: .qsp.registerTask[op];
.kurl.async (
data`url;
"GET";
``callback!(
::;
{[op;md;data;tid;r] // GET request with a callback
.qsp.finishTask[op;tid]; // Mark the task as finished
data[`response]: r 1; // Add GET request response to data
.qsp.push[op;md;data]; // Push enriched data to the next operator
}[op;md;data;tid]
)
)
}
]
.qsp.write.toConsole[]
publish `url`response!("https://www.google.ca"; "");
2021.10.02D11:49:58.896623200 | url | "https://www.google.ca"
2021.10.02D11:49:58.896623200 | response| "<!doctype html><html ..
finish_task(operator, task_id)
Indicate that the specified task in the specified operator has finished processing. If all tasks for the operator are finished, and the operator has been requested to finish, the operator is marked as finished.
Parameters:
name | type | description | default |
---|---|---|---|
operator | Operator | The operator in which the task was created. | Required |
task_id | int | The task identifier. | Required |
See Register Task for more information
On Error
Sets the onError
handler for the pipeline - the global handler that will be called when an error occurs.
.qsp.onError[handler]
Parameters:
name | type | description |
---|---|---|
handler | function | A ternary function that is passed any messages that cause an error in the pipeline. The first argument is the error message, the operator that threw the error and the batch of data that cause the error. |
The onError
event handler is called when an error occurs in a pipeline. The handler
will be passed an error message, the operator that threw the error, and the
batch that caused the error.
Parameters:
name | type | description |
---|---|---|
handler | function | A function that will be called when an error occurs in a pipeline. It will be provided three arguments: the error message (as a character vector), the operator (as a q dictionary which can be passed into other kxi.sp functions that take an operator), and the batch of data that was being processed when the error occurred. Its return value is ignored. |
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> errored = False
>>> def on_error(errMsg, op, data):
global errored
errored = True
>>> sp.lifecycle.on_error(on_error)
>>> sp.run(sp.read.from_callback('publish')
| sp.map(lambda x: 1 / 0)
| sp.write.to_console())
>>> assert not errored
>>> kx.q('publish',range(10))
>>> assert errored
On Checkpoint
Sets the onCheckpoint
handler - the global handler that will be called when a checkpoint is being created.
.qsp.onCheckpoint[handler]
Parameters:
name | type | description |
---|---|---|
handler | function | A nullary function to execute during checkpointing. |
This sets the onCheckpoint
handler for the pipeline.
The onCheckpoint
handler for the pipeline is called whenever a checkpoint is created.
The return value is saved to the checkpoint. This value will be passed into the
onPostCheckpoint
and onRecover
handlers, and can be used for tracking files.
sp.lifecycle.on_checkpoint(my_on_checkpoint_function)
Parameters:
name | type | description |
---|---|---|
handler | function | A function that will be called when a checkpoint is being created. It will not be provided any arguments. The return value is converted to a q type, then saved in the checkpoint. This value will be passed into the on_post_checkpoint and on_recover handlers, and can be used for tracking files. |
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> import datetime
>>> checkpointed = False
>>> def on_checkpoint():
global checkpointed
checkpointed = True
return datetime.datetime.now()
>>> sp.lifecycle.on_checkpoint(on_checkpoint)
>>> recovered = False
>>> def on_recover(auxiliary_data):
print(f'on_recover called with {auxiliary_data}')
global recovered
recovered = True
>>> sp.lifecycle.on_recover(on_recover)
>>> sp.run(sp.read.from_callback('publish')
| sp.write.to_console())
# Simulate recovery from a checkpoint
>>> assert not checkpointed
>>> kx.q('.spcp.writeCheckpoint[] `id`checkpoint!(0j; .sp.createCheckpoint[])')
>>> assert checkpointed and not recovered
# These are internal API calls, and are liable to change without notice.
# They should not be used in production.
>>> kx.q('.sp.i.recoverFromCheckpoint .spwork.i.createCheckpoint[]')
>>> assert recovered
On Operator Checkpoint
Sets the onOperatorCheckpoint
event handler for a specific operator - a per-operator handler that will be called when a checkpoint is being created.
.qsp.onOperatorCheckpoint[id;handler]
Parameters:
name | type | description |
---|---|---|
id | symbol | An operator ID. |
handler | function | A unary function which executes before an operator performs a checkpoint |
The onOperatorCheckpoint
event handler is called when a checkpoint is created, before operator
state is saved. This provides an opportunity for an operator to make updates to its state
before it is checkpointed. The return value of the handler is also added to the checkpoint,
and will be passed to the operators onOperatorRecover
function on recovery.
Examples:
.qsp.onOperatorCheckpoint[`map;{[op]
log.info ("Operator state:\n%s"; .Q.s .qsp.get[op;::]);
}]
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.map[{[op;md;x]
.qsp.set[op;md] .qsp.get[op;md] upsert x;
x
}; .qsp.use `state`name!(();`map)]
.qsp.write.toConsole[];
The handler will be called before the specified operator's state is saved. This provides an opportunity to make updates to the state before it is checkpointed.
sp.lifecycle.on_operator_checkpoint(operator, handler)
Readers Unsupported
This cannot be used for reader operators.
Parameters:
name | type | description |
---|---|---|
operator | OperatorSpecifier | The operator for which to assign the handler. |
handler | Callable[[kx.Dictionary], Any] | A function that will be called when a checkpoint is being created. It will be provided the operator as an argument, and whatever it returns is converted to a q type, then saved in the checkpoint. Later this data may be passed into the operator's on_operator_post_checkpoint and on_operator_recover handlers. |
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> operatorCheckpointed = False
>>> def on_operator_checkpoint(op):
global operatorCheckpointed
operatorCheckpointed = True
>>> sp.lifecycle.on_operator_checkpoint('callback', on_operator_checkpoint)
>>> sp.run(sp.read.from_callback('publish', name='callback')
| sp.write.to_console())
>>> assert not operatorCheckpointed
>>> kx.q('.spcp.writeCheckpoint[] (!) . flip ((`checkpoint; .sp.createCheckpoint[]);(`id;0j));')
>>> assert operatorCheckpointed
On Operator Post Checkpoint
Sets the onOperatorPostCheckpoint
event handler for a specific operator - a per-operator handler that will be called when a checkpoint is being created.
.qsp.onOperatorPostCheckpoint[id;handler]
Parameters:
name | type | description |
---|---|---|
id | symbol | An operator ID. |
handler | function | A ternary function to execute after a checkpoint has been saved. |
Event Handler
The onOperatorPostCheckpoint
event handler is called after a checkpoint has either been
saved to disk or written to the Controller.
It has arguments:
- operator
- checkpointed operator state (i.e. state set via Set)
- data returned by the operator's
onCheckpoint
handler
Readers Unsupported
The operator post checkpoint hook cannot be used on a reader operator. This includes any operators from the .qsp.read.*
collection.
The handler will be called before the specified operator's state is saved. This provides an opportunity to make updates to the state before it is checkpointed.
sp.lifecycle.on_operator_post_checkpoint(operator, handler)
Readers Unsupported
This cannot be used for reader operators.
Parameters:
name | type | description |
---|---|---|
operator | OperatorSpecifier | The operator for which to assign the handler. |
handler | Callable[[kx.Dictionary, kx.K, kx.K] , None] | A function that will be called when a checkpoint is being created. It will be provided the operator as an argument, and whatever it returns is converted to a q type, then saved in the checkpoint. Later this data may be passed into the operator's on_operator_post_checkpoint and on_operator_recover handlers. |
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> operatorCheckpointed = False
>>> def on_operator_checkpoint(op):
global operatorCheckpointed
operatorCheckpointed = True
>>> sp.lifecycle.on_operator_checkpoint('callback', on_operator_checkpoint)
>>> sp.run(sp.read.from_callback('publish', name='callback')
| sp.write.to_console())
>>> assert not operatorCheckpointed
>>> kx.q('.spcp.writeCheckpoint[] (!) . flip ((`checkpoint; .sp.createCheckpoint[]);(`id;0j));')
>>> assert operatorCheckpointed
On Operator Recover
Sets the onOperatorRecover
event handler for a specific operator - a per-operator handler that will be called when a pipeline recovers from a checkpoint.
.qsp.onOperatorRecover[id;handler]
Parameters:
name | type | description | default |
---|---|---|---|
id | OperatorSpecifier | An operator ID. | Required |
handler | Callable[[kx.Dictionary, kx.K], None] | A binary function to execute after an operator recovers | Required |
Sets the onOperatorRecover
event handler of the operator with the id
.
The onOperatorRecover
handler is called when a pipeline recovers from a checkpoint.
Operator state has already been restored once this handler has been called. This provides
an opportunity for operators to make updates to their state before the pipeline starts running.
Reader Unsupported
The operator recover hook cannot be used on a reader operator. This includes any operators from the .qsp.read.*
collection."
Examples:
Checkpointing external state
TABLE: ();
.qsp.onOperatorCheckpoint[`map;{[op] TABLE }];
.qsp.onOperatorRecover[`map; {[op;t] TABLE:: t}];
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.map[{
`TABLE upsert x;
x
}; .qsp.use ``name!(::;`map)]
.qsp.write.toConsole[];
Operator state has already been restored by the time the recovery handler is called. This provides an opportunity to edit the state before the pipeline starts.
See also: on_operator_checkpoint
, which can be used to set an operator's state before it is checkpointed.
sp.lifecycle.on_operator_recover(operator, handler)
Readers Unsupported
This cannot be used for reader operators.
Parameters:
name | type | description | default |
---|---|---|---|
operator | str | The operator for which to assign the handler. | Required |
handler | function | A function that will be called when a pipeline recovers from a checkpoint. It will be provided two arguments: the operator, and the checkpoint data provided by the operator's on_operator_checkpoint handler. Its return value is ignored. |
Required |
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> operatorRecovered = False
>>> def on_operator_recover(op, data):
global operatorRecovered
operatorRecovered = True
>>> sp.lifecycle.on_operator_recover('callback', on_operator_recover)
>>> sp.run(sp.read.from_callback('publish', name='callback')
| sp.write.to_console())
>>> assert not operatorRecovered
# These are internal API calls used to simulate checkpoint recovery.
# They are liable to change without notice, and should not be used in production.
>>> kx.q('.sp.i.recoverFromCheckpoint .spwork.i.createCheckpoint[]')
>>> assert operatorRecovered
On Post Checkpoint
Sets the onPostCheckpoint
handler - the global handler that will be called after a checkpoint has been created.
.qsp.onPostCheckpoint[handler]
Parameters:
name | type | description | default |
---|---|---|---|
handler | function | A unary function to be called after a checkpoint has been written. | Required |
This sets the onPostCheckpoint
handler for the pipeline.
The onPostCheckpoint
handler for the pipeline is called after a checkpoint has been written
either to disk or to the controller, and has been acknowledged by the controller.
Its argument is the data returned by the pipeline's onCheckpoint
handler.
The handler will be called after the checkpoint has been written to disk and acknowledged by the controller.
sp.lifecycle.on_post_checkpoint(handler)
Parameters:
name | type | description | default |
---|---|---|---|
handler | Callable[[kx.K], None] | A function that will be called after a checkpoint has been created. It will be provided one argument, which is the return value of the on_checkpoint handler. Its return value is ignored. |
Required |
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> import datetime
>>> def on_checkpoint():
return datetime.datetime.now()
>>> sp.lifecycle.on_checkpoint(on_checkpoint)
>>> postCheckpointed = False
>>> def on_post_checkpoint(auxiliary_data):
print(f'on_post_checkpoint called with {auxiliary_data}')
postCheckpointed = True
>>> sp.lifecycle.on_post_checkpoint(on_post_checkpoint)
>>> sp.run(sp.read.from_callback('publish')
| sp.write.to_console())
>>> assert not postCheckpointed
>>> kx.q('.spcp.WrittenCheckpoint: (!) . flip ((`checkpoint;.sp.createCheckpoint[]);(`id;0j))')
>>> kx.q('.spwork.acknowledgeCheckpoint[""]')
>>> assert postCheckpointed
On Recover
Sets the onRecover
handler - the global handler that will be called when a pipeline recovers from a checkpoint.
.qsp.onRecover[handler]
Parameters:
name | type | description |
---|---|---|
handler | function | A unary function to execute after recovery. |
This sets the onRecover
handler for the pipeline.
The onRecover
handler for the pipeline is called when a pipeline recovers from a checkpoint.
Its argument is the data returned by the pipeline's onCheckpoint
handler.
sp.lifecycle.on_recover(handler)
Parameters:
name | type | description |
---|---|---|
handler | function | A function that will be called when a pipeline recovers from a checkpoint. It will be provided one argument, which is whatever was returned by the last run checkpoint handler (see on_checkpoint ). Its return value is ignored. |
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> recovered = False
>>> def on_recover():
global recovered
recovered = True
>>> sp.lifecycle.on_recover(on_recover)
>>> sp.run(sp.read.from_callback('publish')
| sp.write.to_console())
>>> assert not recovered
# These are internal API calls used to simulate checkpoint recovery.
# They are liable to change without notice, and should not be used in production.
>>> kx.q('.sp.i.recoverFromCheckpoint .spwork.i.createCheckpoint[]')
>>> recovered
True
On Setup
Sets the onSetup
event handler - the global handler that will be called before a pipeline has finished initializing.
.qsp.onSetup[handler]
Parameters:
name | type | description |
---|---|---|
handler | function | A nullary function to execute after all operators in a pipeline are setup. |
This sets the onSetup
event handler for the pipeline.
The onSetup
event handler is called before the pipeline is finished initializing,.
sp.on_setup(handler)
Parameters:
name | type | description |
---|---|---|
handler | Callable[[], None] | A function that will be called before a pipeline has finished initializing, after all operator in a pipeline are setup. It will not be provided any arguments, and its return value is ignored. |
Examples:
>>> from kxi import sp
>>> def on_setup():
global setup
setup = True
>>> setup = False
>>> assert not setup
>>> sp.lifecycle.on_setup(on_setup)
>>> sp.run(sp.read.from_callback('publish')
| sp.write.to_console())
>>> assert setup
On Start
Sets the onStart
event handler - the global handler that will be called when all operators in a pipeline have started.
.qsp.onStart[handler]
Parameters:
name | type | description |
---|---|---|
handler | function | A nullary function to execute after all operators in a pipeline are started. |
This sets the onStart
event handler for the pipeline.
The onStart
event handler is called after the pipeline has invoked start on its readers; it returns generic null.
sp.on_start(handler)
Parameters:
name | type | description |
---|---|---|
handler | Callable[[], None] | A function that will be called when all operators in a pipeline have started. It will not be provided any arguments, and its return value is ignored. |
Examples:
>>> from kxi import sp
>>> def on_start():
global started
started = True
>>> started = False
>>> sp.lifecycle.on_start(on_start)
>>> sp.run(sp.read.from_callback('publish')
| sp.write.to_console())
>>> assert started
On Finish
Sets the onFinish
event handler.
.qsp.onFinish[handler]
Parameters:
name | type | description |
---|---|---|
handler | function | A unary function that accepts an operator configuration to execute when a given operator is finished. |
This sets the onFinish
event handler for the pipeline.
The onFinish
event handler is called when an operator is complete; it returns generic null.
Coming Soon
On Teardown
Sets the onTeardown
event handler - the global handler that will be called before a pipeline is torn down.
.qsp.onTeardown[handler]
Parameters:
name | type | description |
---|---|---|
handler | function | A nullary function to execute before a pipeline is torn down. |
This sets the onTeardown
event handler for the pipeline.
The onTeardown
event handler is called after the pipeline is torn down; it returns generic null.
A teardown can be triggered by calling sp.teardown
.
sp.lifecycle.on_teardown(handler)
Parameters:
name | type | description |
---|---|---|
handler | Callable[[], None] | A function that will be called before a pipeline is torn down. It will not be provided any arguments, and its return value is ignored. |
Examples:
>>> from kxi import sp
>>> torndown = False
>>> def on_teardown():
global torndown
torndown = True
>>> sp.lifecycle.on_teardown(on_teardown)
>>> sp.run(sp.read.from_callback('publish')
| sp.write.to_console())
>>> sp.teardown()
>>> assert torndown
Register Task
Registers a task for an operator.
.qsp.registerTask[op]
Parameters:
name | type | description |
---|---|---|
op | .qsp.op | A pipeline operator. |
This registers a task for the operator and returns an identifier (int) for the task.
A task represents an asynchronous request that an operator made, and once that request is finished, the task should be marked as finished using Finish Task.
Register an asynchronous task for true async using Apply:
Examples:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.apply[
{[op; md; data]
// Register a task which represents the unfinished async kurl GET request
tid: .qsp.registerTask[op];
.kurl.async (
"https://www.google.ca";
"GET";
``callback!(::;
{[op;md;data;tid;r] // GET request with a callback
.qsp.finishTask[op;tid]; // Mark the task as finished
data[`response]: r 1; // Add GET request response to data
.qsp.push[op;md;data]; // Push enriched data to the next operator
}[op;md;data;tid]
)
);
}
]
.qsp.write.toConsole[]
sp.lifecycle.register_task(operator)
A task represents an asynchronous request that an operator made, and once that request is
finished, the task should be marked as finished using sp.finish_task
.
Parameters:
name | type | description |
---|---|---|
operator | OperatorSpecifier | The operator for which a task will be registered. |
Returns:
A task ID, which will be needed to call sp.finish_task
.
Subscribe
Subscribes to a particular event with the provided callback.
.qsp.subscribe[eventType; handler]
Parameters:
name | type | description |
---|---|---|
eventType | symbol | Type of an event. |
handler | function | A unary function to execute after the eventType occurs. Takes the event as argument. |
An event is a dictionary with keys:
key | q type | description |
---|---|---|
type | symbol | Type of the event. |
time | timestamp | Timestamp. |
origin | symbol | Origin identifier. |
data | any | Payload of the event object. |
When an event is emitted matching the type of the event subscribed to, the provided callback will be invoked with the event object.
Returns:
type | description |
---|---|
list | A 2-element list containing the event type and a subscription identifier to use to unsubscribe, {(symbol;long)} . |
Subscribe to file events File:
Examples:
.file.events: ([] eventType: `symbol$(); eventTime: `timestamp$(); origin: `symbol$(); data:());
.qsp.subscribe[ ;{`.file.events upsert value x}] each `file.found`file.start`file.progress`file.end;
`:/tmp/numbers.txt 0: string til 5;
.qsp.run
.qsp.read.fromFile[`:/tmp/numbers.txt]
.qsp.write.toVariable[`test.output];
q).file.events
eventType eventTime origin data
-------------------------------------------------------------------------------------------------------------------------
file.found 2022.11.18D09:41:14.111578506 file_:/tmp/numbers_txt (,`paths)!,,":/tmp/numbers.txt"
file.start 2022.11.18D09:41:14.111668936 file_:/tmp/numbers_txt `path`size!(":/tmp/numbers.txt";10)
file.progress 2022.11.18D09:41:14.111703746 file_:/tmp/numbers_txt `path`totalBytes`bytesRead!(":/tmp/numbers.txt";10;10)
file.end 2022.11.18D09:41:14.111710593 file_:/tmp/numbers_txt `path`size!(":/tmp/numbers.txt";10)
sp.subscribe(event_type, handler)
Parameters:
name | type | description |
---|---|---|
event_type | kx.CharVector | Type of an event. |
handler | Union[Callable[[kx.Dictionary], None], str] | A unary function to be called for this event. Can be a Python function or a string referencing a q function. |
The callback event will be a dictionary with keys:
key | q type | Python type | description |
---|---|---|---|
type | symbol | str | Type of the event. |
time | timestamp | kx.TimestampAtom | Timestamp. |
origin | symbol | str | Origin identifier. |
data | any | Any | Payload of the event object. |
When the SP notifies this event, the handler will be invoked with the event object.
A Python handler will return a pykx.Dictionary
object.
Returns:
type | description |
---|---|
List[Any] | A 2-element list containing the event type and a subscription identifier to use to unsubscribe. |
Examples:
Python function callback
Use a Python function callback.
>>> import pykx as kx
>>> from kxi import sp
>>> # define events to subscribe to and table to store them in
>>> types = ['file.found','file.start','file.progress','file.end']
>>> events = kx.Table(data={'eventType': kx.SymbolVector([]),
'eventTime': kx.TimestampVector([]),
'origin': kx.SymbolVector([]),
'data': kx.Identity(None)})
>>> # define events handler
>>> def update_file_events(event: kx.Dictionary):
events.upsert(event.values())
>>> for typ in types:
sp.subscribe(typ, update_file_events)
# create a file and run pipeline to read it
>>> kx.q('`:/tmp/numbers.txt 0: string til 5;')
>>> sp.run(sp.read.from_file('/tmp/numbers.txt')
| sp.write.to_variable('out'))
# check events table
events
pykx.Table(pykx.q('
eventType eventTime origin data
---------------------------------------------------------------------------------------------------------
file.found 2025.02.21D12:00:38.673846000 file (,`paths)!,,":/tmp/numbers.txt"
file.start 2025.02.21D12:01:18.920759000 file `path`size!(":/tmp/numbers.txt";10)
file.progress 2025.02.21D12:01:18.921781000 file `path`totalBytes`bytesRead!(":/tmp/numbers.txt";10;10)
file.end 2025.02.21D12:01:18.922243000 file `path`size!(":/tmp/numbers.txt";10)
'))
Use a q function callback
>>> import pykx as kx
>>> from kxi import sp
>>> # define events to subscribe to, an events table schema and set as a q global variable
>>> types = ['file.found','file.start','file.progress','file.end']
>>> events = kx.Table(data={'eventType': kx.SymbolVector([]),
'eventTime': kx.TimestampVector([]),
'origin': kx.SymbolVector([]),
'data': kx.Identity(None)})
>>> kx.q.set('qevents', events)
>>> # define a q event handler
>>> kx.q('updateEvents:{`qevents upsert value x}')
>>> for typ in types:
sp.subscribe(typ, update_file_events)
# create a file and run pipeline to read it
>>> kx.q('`:/tmp/numbers.txt 0: string til 5;')
>>> sp.run(sp.read.from_file('/tmp/numbers.txt')
| sp.write.to_variable('out'))
>>> # check events table
>>> kx.q('qevents')
pykx.Table(pykx.q('
eventType eventTime origin data
---------------------------------------------------------------------------------------------------------
file.found 2025.02.21D12:33:39.231703197 file (,`paths)!,,":/tmp/numbers.txt"
file.start 2025.02.21D12:33:41.812599917 file `path`size!(":/tmp/numbers.txt";10)
file.progress 2025.02.21D12:33:41.812831273 file `path`totalBytes`bytesRead!(":/tmp/numbers.txt";10;10)
file.end 2025.02.21D12:33:41.812841521 file `path`size!(":/tmp/numbers.txt";10)
'))
Unsubscribe
Unsubscribes from event callbacks. You can subscribe from a single subscription or from all subscriptions for a given event.
.qsp.unsubscribe[id]
Parameters:
name | q type | description |
---|---|---|
id | symbol or (symbol;long) | If given just an event type, all subscriptions are removed for that event. Alternatively a single subscription can be subscribed using the ID returned by the call to .qsp.subscribe. |
Returns:
type | description |
---|---|
null |
Examples:
Unsubscribe all subscribers for a given event_type
.file.events: ([] eventType: `symbol$(); eventTime: `timestamp$(); origin: `symbol$(); data:());
.qsp.subscribe[`file.found; {`.file.events upsert value x}];
.qsp.unsubscribe[`file.found];
Unsubscribe from a specific event subscription
.file.events: ([] eventType: `symbol$(); eventTime: `timestamp$(); origin: `symbol$(); data:());
id:.qsp.subscribe[`file.found ; {`.file.events upsert value x}];
.qsp.unsubscribe[id];
sp.unsubscribe(id)
Parameters:
name | type | description |
---|---|---|
id | Union[str, List[Any]] | If given just an event type, all subscriptions are removed for that event. Alternatively a single subscription can be subscribed using the ID returned by the call to .qsp.subscribe. |
Unsubscribes a particular subscriber or all subscribers for a given event type.
Returns:
type | description |
---|---|
None |
Examples:
Unsubscribe all subscribers for a given event_type
>>> import pykx as kx
>>> from kxi import sp
>>> def update_events(event):
print(event)
>>> sp.subscribe('file.found', update_events)
# unsubscribe all file.found events
>>> sp.unsubscribe('file.found')
Unsubscribe a specific subscription
>>> import pykx as kx
>>> from kxi import sp
>>> def update_events(event):
print(event)
>>> id = sp.subscribe('file.found', update_events)
>>> sp.unsubscribe(id)