Skip to content

Lifecycle

This page provides an overview for managing actions upon various pipeline events.

Finish Task

Marks a task as finished.

.qsp.finishTask[op;taskID]

Indicate that the specified task in the specified operator has finished processing. If all tasks for the operator are finished, and the operator has been requested to finish, the operator is marked as finished.

Parameters:

name type description default
op .qsp.op A pipeline operator configuration, or the name of the operator as a symbol. Required
taskID int A task identifier. Required

Examples:

Register an asynchronous task for true async using .qsp.apply.

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.apply[
    {[op; md; data]
      // Register a task that represents the unfinished async kurl GET request
      tid: .qsp.registerTask[op];
      .kurl.async (
        data`url;
        "GET";
        ``callback!(
          ::;
          {[op;md;data;tid;r]        // GET request with a callback
            .qsp.finishTask[op;tid]; //   Mark the task as finished
            data[`response]: r 1;    //   Add GET request response to data
            .qsp.push[op;md;data];   //   Push enriched data to the next operator
          }[op;md;data;tid]
        )
      )
    }
  ]
  .qsp.write.toConsole[]

publish `url`response!("https://www.google.ca"; "");
2021.10.02D11:49:58.896623200 | url     | "https://www.google.ca"
2021.10.02D11:49:58.896623200 | response| "<!doctype html><html  ..

See Register Task for more information

sp.finish_task(operator, task_id)

Indicate that the specified task in the specified operator has finished processing. If all tasks for the operator are finished, and the operator has been requested to finish, the operator is marked as finished.

Parameters:

name type description default
operator Operator The operator in which the task was created. Required
task_id int The task identifier. Required

Examples:

Register an asynchronous task using sp.apply.

>>> from kxi import sp
>>> import pandas as pd

>>> import pykx as kx

>>> data = kx.Dictionary({'url': kx.CharVector('https://www.google.com'), 'response': kx.CharVector('')})

>>> kx.q('''
.my.callback:{[op; md; data]
   / Register a task that represents the unfinished async kurl GET request
   tid: .qsp.registerTask[op];
   data: .pykx.toq data;
   .kurl.async (
     data`url;
     "GET";
     ``callback!(
     ::;
     {[op;md;data;tid;r]        / GET request with a callback
       .pykx.qeval["lambda x,y : sp.finish_task(x,y)"][op;tid];
       data[`response]: r 1;    /   Add GET request response to data
       .qsp.push[op;md;data];   /   Push enriched data to the next operator
     }[op;md;data;tid]
     )
   )
 }
 ''')

>>> def process(op, md, data):
        kx.q('.my.callback', op, md, data)

>>> sp.run(sp.read.from_callback('publish')
            | sp.apply(process)
            | sp.write.to_console())

>>> kx.q('publish', data)      

See Register Task for more information.

On Error

Sets the onError handler for the pipeline - the global handler that will be called when an error occurs.

.qsp.onError[handler]

Parameters:

name type description
handler function A ternary function that is passed any messages that cause an error in the pipeline. The first argument is the error message, the operator that threw the error and the batch of data that cause the error.

The onError event handler is called when an error occurs in a pipeline. The handler will be passed an error message, the operator that threw the error, and the batch that caused the error.

Examples:

Cause a pipeline to throw an error by trying to add a symbol to int.

.qsp.onError {[msg; op; data]
    `.errors.cache upsert (`time`msg`op`data)!(.z.p;msg;op;data)};

.qsp.run
    .qsp.read.fromCallback[`publish]
    .qsp.map[{[x] 1+`e}]
    .qsp.write.toConsole[];

publish til 10
type - error in operator: map
.sp.compileOp@[37]           'errMsg;
                            ^
.sp.compileOp@[0]   {[fn; err; msg] .sputil.Qtrp[fn; msg; err msg]}
                                    ^
.sp.i.setPush@[0]   {[id;md;data] Runtime[id]@\:(md;data); }
                                            ^
{}[0]   {.sp.pushid[x`id;y;z]}
        ^
>     publish til 10
    ^

The error information should be stored in .errors.cache.

time| 2025.12.03D15:34:08.229743758
msg | "type"
op  | `id``function`inputs`outputs`config`metadata`edge!(`map;::;{[x] 1+`e};(,`)!,"*";(,`)!,"*";(,`allowPartials)!,1b;`plugin`params`initialState`version`stateful`type!(`map;`symbol$();::;1;0b;`operator);`)
data| ((,`)!,::;0 1 2 3 4 5 6 7 8 9)

Parameters:

name type description
handler function A function that will be called when an error occurs in a pipeline. It will be provided three arguments: the error message (as a character vector), the operator (as a q dictionary which can be passed into other kxi.sp functions that take an operator), and the batch of data that was being processed when the error occurred. Its return value is ignored.

Examples:

>>> from kxi import sp
>>> import pykx as kx

>>> errored = False

>>> def on_error(errMsg, op, data):
        errored = True

>>> sp.lifecycle.on_error(on_error)
>>> sp.run(sp.read.from_callback('publish')
        | sp.map(lambda x: 1 / 0)
        | sp.write.to_console())
>>> kx.q('publish',range(10))
>>> print(errored)
True

On Checkpoint

Sets the onCheckpoint handler - the global handler that will be called when a checkpoint is being created.

.qsp.onCheckpoint[handler]

Parameters:

name type description
handler function A nullary function to execute during checkpointing.

This sets the onCheckpoint handler for the pipeline.

The onCheckpoint handler for the pipeline is called whenever a checkpoint is created. The return value is saved to the checkpoint. This value will be passed into the onPostCheckpoint and onRecover handlers, and can be used for tracking files.

Examples:

Set up a handler to run after checkpoint has been created.

checkpointed:: 0b;

stream:
    .qsp.read.fromCallback[`publish]
    .qsp.write.toConsole[];
.qsp.onCheckpoint[{ checkpointed:: 1b; til 10 }];
.qsp.run stream;

The value of checkpointed should be '1b' after checkpoint has been created.

checkpointed
1b

sp.lifecycle.on_checkpoint(my_on_checkpoint_function)

Parameters:

name type description
handler function A function that will be called when a checkpoint is being created. It will not be provided any arguments. The return value is converted to a q type, then saved in the checkpoint. This value will be passed into the on_post_checkpoint and on_recover handlers, and can be used for tracking files.

Examples:

Set up a handler to run when a checkpoint is being created.

>>> from kxi import sp
>>> import pykx as kx
>>> import datetime

>>> checkpointed = False

>>> def on_checkpoint():
    checkpointed = True
    return datetime.datetime.now()

>>> sp.lifecycle.on_checkpoint(on_checkpoint)

>>> sp.run(sp.read.from_callback('publish')
    | sp.write.to_console())

The value of recovered should be 'True' after recovery.

print(checkpointed)
True

On Operator Checkpoint

Sets the onOperatorCheckpoint event handler for a specific operator - a per-operator handler that will be called when a checkpoint is being created.

.qsp.onOperatorCheckpoint[id;handler]

Parameters:

name type description
id symbol An operator ID.
handler function A unary function which executes before an operator performs a checkpoint.

The onOperatorCheckpoint event handler is called when a checkpoint is created, before operator state is saved. This provides an opportunity for an operator to make updates to its state before it is checkpointed. The return value of the handler is also added to the checkpoint, and will be passed to the operators onOperatorRecover function on recovery.

Examples:

Set up a handler to run when a checkpoint is created, before operator state is saved.

operatorCheckpointed:: 0b;
stream:
    .qsp.read.fromCallback[`publish]
    .qsp.map[{x}; .qsp.use ``name!``identity]
    .qsp.write.toConsole[];
.qsp.onOperatorCheckpoint[`identity; {[op] operatorCheckpointed:: 1b; til 45 }];
.qsp.run stream;

The value of operatorCheckpointed should be true after checkpoint has been created.

operatorCheckpointed
1b

The handler will be called before the specified operator's state is saved. This provides an opportunity to make updates to the state before it is checkpointed.

sp.lifecycle.on_operator_checkpoint(operator, handler)

Readers Unsupported

This cannot be used for reader operators.

Parameters:

name type description
operator OperatorSpecifier An operator for which to assign the handler.
handler Callable[[kx.Dictionary], Any] A unary function which executes before an operator performs a checkpoint.

The on_operator_checkpoint event handler is called when a checkpoint is created, before operator state is saved. This provides an opportunity for an operator to make updates to its state before it is checkpointed. The return value of the handler is also added to the checkpoint, and will be passed to the operators on_operator_recover function on recovery.

Examples:

Set up a handler to run when a checkpoint is created, before the operator state is saved.

>>> from kxi import sp
>>> import pykx as kx

>>> operator_checkpointed = False
>>> def on_operator_checkpoint(op):
>>>     operator_checkpointed = True

>>> sp.lifecycle.on_operator_checkpoint('callback', on_operator_checkpoint)
>>> sp.run(sp.read.from_callback('publish', name='callback')
        | sp.write.to_console())

The value of operator_checkpointed should be 'True' after checkpoint has been created.

print(operator_checkpointed)
True

On Operator Post Checkpoint

Sets the onOperatorPostCheckpoint event handler for a specific operator - a per-operator handler that will be called when a checkpoint is being created.

.qsp.onOperatorPostCheckpoint[id;handler]

Parameters:

name type description
id symbol An operator ID.
handler function A ternary function to execute after a checkpoint has been saved.

Event Handler

The onOperatorPostCheckpoint event handler is called after a checkpoint has either been saved to disk or written to the Controller.

It has arguments:

  1. operator
  2. checkpointed operator state (i.e. state set via Set)
  3. data returned by the operator's onCheckpoint handler

Readers Unsupported

The operator post checkpoint hook cannot be used on a reader operator. This includes any operators from the .qsp.read.* collection.

Examples:

Set up a handler to run after a checkpoint has either been saved to disk or written to the Controller.

operatorPostCheckpointed:: 0b;

stream:
    .qsp.read.fromCallback[`publish]
    .qsp.map[{x}; .qsp.use ``name!``identity]
    .qsp.write.toConsole[];
.qsp.onOperatorPostCheckpoint[`identity;{[x;y;z] operatorPostCheckpointed:: 1b}];
.qsp.run stream;
.spcp.WrittenCheckpoint: (!) . flip (
    (`checkpoint; .sp.createCheckpoint[]);
    (`id;0j));

The value of operatorPostCheckpointed should be '1b' after checkpoint has been saved.

operatorPostCheckpointed
1b

The handler will be called before the specified operator's state is saved. This provides an opportunity to make updates to the state before it is checkpointed.

sp.lifecycle.on_operator_post_checkpoint(operator, handler)

Readers Unsupported

This cannot be used for reader operators.

Parameters:

name type description default
operator OperatorSpecifier An operator for which to assign the handler. Required
handler Callable[[kx.Dictionary, kx.K, kx.K] , None] A ternary function to execute after a checkpoint has been saved.
Return value is converted to a q type, then saved in the checkpoint. Required

Event Handler

The on_operator_post_checkpoint event handler is called after a checkpoint has either been saved to disk or written to the Controller.

It has arguments:

  1. operator
  2. checkpointed operator state (i.e. state set via Set)
  3. data returned by the operator's on_checkpoint handler

Readers Unsupported

The operator post checkpoint hook cannot be used on a reader operator. This includes any operators from the sp.read.* collection.

Examples:

Set up a handler to run before the operator's state is saved.

>>> from kxi import sp
>>> import pykx as kx

>>> operator_post_checkpointed = False
>>> def on_operator_post_checkpoint(op, state, data):
        operator_post_checkpointed = True

>>> sp.lifecycle.on_operator_post_checkpoint('callback', on_operator_post_checkpoint)
>>> sp.run(sp.read.from_callback('publish', name='callback')
        | sp.write.to_console())

The value of operator_post_checkpointed should be 'True' after the checkpoint has been saved.

print(operator_post_checkpointed)
True

On Operator Recover

Sets the onOperatorRecover event handler for a specific operator - a per-operator handler that will be called when a pipeline recovers from a checkpoint.

.qsp.onOperatorRecover[id;handler]

Parameters:

name type description default
id symbol An operator ID. Required
handler Callable[[kx.Dictionary, kx.K], None] A binary function to execute after an operator recovers. Required

Sets the onOperatorRecover event handler of the operator with the id. The onOperatorRecover handler is called when a pipeline recovers from a checkpoint. Operator state has already been restored once this handler has been called. This provides an opportunity for operators to make updates to their state before the pipeline starts running.

Reader Unsupported

The operator recover hook cannot be used on a reader operator. This includes any operators from the .qsp.read.* collection."

Examples:

Set up handler to run on recovery before the pipeline starts.

TABLE: ();

.qsp.onOperatorCheckpoint[`map;{[op] TABLE }];

.qsp.onOperatorRecover[`map; {[op;t] TABLE:: t}];

.qsp.run
    .qsp.read.fromCallback[`publish]
    .qsp.map[{
        `TABLE upsert x;
        x
        }; .qsp.use ``name!(::;`map)]
    .qsp.write.toConsole[];

Operator state has already been restored by the time the recovery handler is called. This provides an opportunity to edit the state before the pipeline starts. See also: on_operator_checkpoint, which can be used to set an operator's state before it is checkpointed.

sp.lifecycle.on_operator_recover(operator, handler)

Readers Unsupported

This cannot be used for reader operators.

Parameters:

name type description default
operator OperatorSpecifier An operator for which to assign the handler. Required
handler function A binary function to execute after an operator recovers. It will be provided 2 arguments: the operator, and the checkpoint data provided by the operator's on_operator_checkpoint handler. Its return value is ignored. Required

Examples:

Set up handler to run after an operator recovers.

>>> from kxi import sp
>>> import pykx as kx

>>> operator_recovered = False

>>> def on_operator_recover(op, data):
    operator_recovered = True

>>> sp.lifecycle.on_operator_recover('callback', on_operator_recover)
>>> sp.run(sp.read.from_callback('publish', name='callback')
       | sp.write.to_console())

The value of operator_recovered should be 'True' after recovery.

print(operator_recovered)
True

On Post Checkpoint

Sets the onPostCheckpoint handler - the global handler that will be called after a checkpoint has been created.

.qsp.onPostCheckpoint[handler]

Parameters:

name type description default
handler function A unary function to be called after a checkpoint has been written. Required

This sets the onPostCheckpoint handler for the pipeline.

The onPostCheckpoint handler for the pipeline is called after a checkpoint has been written either to disk or to the controller, and has been acknowledged by the controller.

Its argument is the data returned by the pipeline's onCheckpoint handler.

Examples:

Set up a handler to run after the checkpoint has been written.

result:: "";
.qsp.onOperatorPostCheckpoint[`identity;{[op;state;custom] result:: (op;state;custom)}];
.spcp.WrittenCheckpoint: (!) . flip (
    (`checkpoint; checkpoint`checkpoint);
    (`id;0j));

The value of the result should be '1b' once the checkpoint has been acknowledged.

result ~ (.sp.getOperator identity; checkpoint[checkpoint][app;identity]; til 45)

```txt
1b

The handler will be called after the checkpoint has been written to disk and acknowledged by the controller.

sp.lifecycle.on_post_checkpoint(handler)

Parameters:

name type description default
handler Callable[[kx.K], None] A function that will be called after a checkpoint has been created. It will be provided one argument, which is the return value of the on_checkpoint handler. Its return value is ignored. Required

Examples:

Set up a handler to run after the checkpoint has been written.

>>> from kxi import sp
>>> import pykx as kx
>>> import datetime

>>> def on_checkpoint():
    return datetime.datetime.now()

>>> sp.lifecycle.on_checkpoint(on_checkpoint)

>>> post_checkpointed = False
>>> def on_post_checkpoint():
    post_checkpointed = True

>>> sp.lifecycle.on_post_checkpoint(on_post_checkpoint)
>>> sp.run(sp.read.from_callback('publish')
        | sp.write.to_console())
The value of on_post_checkpoint should be 'True' after checkpoint has been created and acknowledged by the controller.

print(post_checkpointed)   
True

On Recover

Sets the onRecover handler - the global handler that will be called when a pipeline recovers from a checkpoint.

.qsp.onRecover[handler]

Parameters:

name type description default
handler function A unary function to execute after recovery. Required

This sets the onRecover handler for the pipeline.

The onRecover handler for the pipeline is called when a pipeline recovers from a checkpoint.

Its argument is the data returned by the pipeline's onCheckpoint handler.

Examples:

Set up handler to run after recovery.

result:: (::);
stream:
    .qsp.read.fromCallback[`publish]
    .qsp.write.toConsole[];

.qsp.onRecover[{[state] result:: state; }];
.qsp.run stream;

publish til 10
checkpoint:.sp.createCheckpoint[];

Check the value of result after the pipeline has recovered from a checkpoint.

result ~ til 10    
1b

sp.lifecycle.on_recover(handler)

Parameters:

name type description default
handler function A function that will be called when a pipeline recovers from a checkpoint. It will be provided one argument, which is whatever was returned by the last run checkpoint handler (see on_checkpoint). Its return value is ignored. Required

Examples:

Set up handler to run after recovery.

>>> from kxi import sp
>>> import pykx as kx

>>> recovered = False
>>> def on_recover():
        recovered = True

>>> sp.lifecycle.on_recover(on_recover)
>>> sp.run(sp.read.from_callback('publish')
       | sp.write.to_console())

The value of recovered should be 'True' after recovery.

print(recovered)
True

On Setup

Sets the onSetup event handler - the global handler that will be called before a pipeline has finished initializing.

.qsp.onSetup[handler]

Parameters:

name type description
handler function A nullary function to execute after all operators in a pipeline are setup.

This sets the onSetup event handler for the pipeline.

The onSetup event handler is called before the pipeline is finished initializing.

Examples:

.test.setup: 0b;

.qsp.onSetup {.test.setup: 1b};    
.qsp.run
    .qsp.read.fromCallback[`publish]
    .qsp.write.toVariable[`test.cache];

/ .test.setup should be true after pipeline setup
.test.setup
1b

sp.on_setup(handler)

Parameters:

name type description
handler Callable[[], None] A function that will be called before a pipeline has finished initializing, after all operator in a pipeline are setup. It will not be provided any arguments, and its return value is ignored.

Examples:

>>> from kxi import sp

>>> setup = False

>>> def on_setup():
        setup = True

>>> sp.lifecycle.on_setup(on_setup)
>>> sp.run(sp.read.from_callback('publish')
        | sp.write.to_console())

>>> # setup should be True after pipeline has finished initalizing
>>> print(setup)
True

On Start

Sets the onStart event handler - the global handler that will be called when all operators in a pipeline have started.

.qsp.onStart[handler]

Parameters:

name type description
handler function A nullary function to execute after all operators in a pipeline are started.

This sets the onStart event handler for the pipeline.

The onStart event handler is called after the pipeline has invoked start on its readers; it returns generic null.

Examples:

.test.start: 0b;

.qsp.onStart {.test.start: 1b};    
.qsp.run
    .qsp.read.fromCallback[`publish]
    .qsp.write.toVariable[`test.cache];

/ .test.setup should be true after pipeline setup
.test.start
1b

sp.on_start(handler)

Parameters:

name type description
handler Callable[[], None] A function that will be called when all operators in a pipeline have started. It will not be provided any arguments, and its return value is ignored.

Examples:

>>> from kxi import sp

>>> started = False

>>> def on_start():
        started = True

>>> sp.lifecycle.on_start(on_start)
>>> sp.run(sp.read.from_callback('publish')
        | sp.write.to_console())

>>> # started should be True when all operators in the pipeline have started
>>> print(started)            
True

On Finish

Sets the onFinish event handler.

.qsp.onFinish[handler]

Parameters:

name type description
handler function A unary function that accepts an operator configuration to execute when a given operator is finished.

This sets the onFinish event handler for the pipeline.

The onFinish event handler is called when an operator is complete; it returns generic null.

Examples:

.test.finish: 0b;
.qsp.onFinish[{.test.finish: 1b}];
.qsp.run
    .qsp.read.fromExpr["til 5"]
    .qsp.write.toConsole[];
.test.finish
1b

sp.on_finish(handler)

Parameters:

name type description
handler Callable[[kx.CharVector], None] A unary function that accepts an operator configuration to execute when a given operator is finished.

This sets the on_finish event handler for the pipeline.

The on_finish event handler is called when an operator is complete; it returns None.

Examples:

>>> from kxi import sp

>>> finished = False
>>> def on_finish(event: kx.Dictionary):
        finished = True

>>> sp.lifecycle.on_finish(on_finish)

>>> sp.run(sp.read.from_expr('til 10')
            | sp.write.to_console())

>>> # finished should be True when the operator has finished
>>> print(finished)
True

On Teardown

Sets the onTeardown event handler - the global handler that will be called before a pipeline is torn down.

.qsp.onTeardown[handler]

Parameters:

name type description
handler function A nullary function to execute before a pipeline is torn down.

This sets the onTeardown event handler for the pipeline.

The onTeardown event handler is called after the pipeline is torn down; it returns generic null.

Examples:

.test.teardown: 0b;
.qsp.onTeardown {.test.teardown: 1b};
.qsp.run
    .qsp.read.fromCallback[`publish]
    .qsp.write.toVariable[`test.cache];
.qsp.teardown[]
.test.teardown
1b

A teardown can be triggered by calling sp.teardown.

sp.lifecycle.on_teardown(handler)

Parameters:

name type description
handler Callable[[], None] A function that will be called before a pipeline is torn down. It will not be provided any arguments, and its return value is ignored.

Examples:

>>> from kxi import sp

>>> torndown = False

>>> def on_teardown():
        torndown = True

>>> sp.lifecycle.on_teardown(on_teardown)
>>> sp.run(sp.read.from_callback('publish')
        | sp.write.to_console())

>>> sp.teardown()
>>> assert torndown
True

Register Task

Registers a task for an operator.

.qsp.registerTask[op]

Parameters:

name type description
op .qsp.op A pipeline operator.

This registers a task for the operator and returns an identifier (int) for the task.

A task represents an asynchronous request that an operator made, and once that request is finished, the task should be marked as finished using Finish Task.

Register an asynchronous task for true async using Apply:

Examples:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.apply[
    {[op; md; data]
      // Register a task which represents the unfinished async kurl GET request
      tid: .qsp.registerTask[op];
      .kurl.async (
        "https://www.google.ca";
        "GET";
        ``callback!(::;
          {[op;md;data;tid;r]         // GET request with a callback
            .qsp.finishTask[op;tid];  //   Mark the task as finished
            data[`response]: r 1;     //   Add GET request response to data
            .qsp.push[op;md;data];    //   Push enriched data to the next operator
          }[op;md;data;tid]
        )
      );
    }
  ]
  .qsp.write.toConsole[]
sp.lifecycle.register_task(operator)

A task represents an asynchronous request that an operator made, and once that request is finished, the task should be marked as finished using sp.finish_task.

Parameters:

name type description
operator OperatorSpecifier The operator for which a task will be registered.

Returns:

A task ID, which will be needed to call sp.finish_task.

Subscribe

Subscribes to a particular event with the provided callback.

.qsp.subscribe[eventType; handler]

Parameters:

name type description
eventType symbol Type of an event.
handler function A unary function to execute after the eventType occurs. Takes the event as argument.

An event is a dictionary with keys:

key q type description
type symbol Type of the event.
time timestamp Timestamp.
origin symbol Origin identifier.
data any Payload of the event object.

When an event is emitted matching the type of the event subscribed to, the provided callback will be invoked with the event object.

Returns:

type description
list A 2-element list containing the event type and a subscription identifier to use to unsubscribe, {(symbol;long)}.

Subscribe to file events File:

Examples:

.file.events: ([] eventType: `symbol$(); eventTime: `timestamp$(); origin: `symbol$(); data:());
.qsp.subscribe[ ;{`.file.events upsert value x}] each `file.found`file.start`file.progress`file.end;
`:/tmp/numbers.txt 0: string til 5;

.qsp.run
  .qsp.read.fromFile[`:/tmp/numbers.txt]
  .qsp.write.toVariable[`test.output];
q).file.events
eventType     eventTime                     origin                 data
-------------------------------------------------------------------------------------------------------------------------
file.found    2022.11.18D09:41:14.111578506 file_:/tmp/numbers_txt (,`paths)!,,":/tmp/numbers.txt"
file.start    2022.11.18D09:41:14.111668936 file_:/tmp/numbers_txt `path`size!(":/tmp/numbers.txt";10)
file.progress 2022.11.18D09:41:14.111703746 file_:/tmp/numbers_txt `path`totalBytes`bytesRead!(":/tmp/numbers.txt";10;10)
file.end      2022.11.18D09:41:14.111710593 file_:/tmp/numbers_txt `path`size!(":/tmp/numbers.txt";10)

sp.subscribe(event_type, handler)

Parameters:

name type description
event_type kx.CharVector Type of an event.
handler Union[Callable[[kx.Dictionary], None], str] A unary function to be called for this event. Can be a Python function or a string referencing a q function.

The callback event will be a dictionary with keys:

key q type Python type description
type symbol str Type of the event.
time timestamp kx.TimestampAtom Timestamp.
origin symbol str Origin identifier.
data any Any Payload of the event object.

When the SP notifies this event, the handler will be invoked with the event object. A Python handler will return a pykx.Dictionary object.

Returns:

type description
List[Any] A 2-element list containing the event type and a subscription identifier to use to unsubscribe.

Examples:

Python function callback

Use a Python function callback.

>>> import pykx as kx
>>> from kxi import sp

>>> # define events to subscribe to and table to store them in
>>> types = ['file.found','file.start','file.progress','file.end']
>>> events = kx.Table(data={'eventType': kx.SymbolVector([]),
        'eventTime': kx.TimestampVector([]),
        'origin': kx.SymbolVector([]),
        'data': kx.Identity(None)})

>>> # define events handler
>>> def update_file_events(event: kx.Dictionary):
        events.upsert(event.values())

>>> for typ in types:
        sp.subscribe(typ, update_file_events)

>>> # create a file and run pipeline to read it
>>> kx.q('`:/tmp/numbers.txt 0: string til 5;')
>>> sp.run(sp.read.from_file('/tmp/numbers.txt')
        | sp.write.to_variable('out'))

>>> # check events table
events
pykx.Table(pykx.q('
eventType     eventTime                     origin data
---------------------------------------------------------------------------------------------------------
file.found    2025.02.21D12:00:38.673846000 file   (,`paths)!,,":/tmp/numbers.txt"
file.start    2025.02.21D12:01:18.920759000 file   `path`size!(":/tmp/numbers.txt";10)
file.progress 2025.02.21D12:01:18.921781000 file   `path`totalBytes`bytesRead!(":/tmp/numbers.txt";10;10)
file.end      2025.02.21D12:01:18.922243000 file   `path`size!(":/tmp/numbers.txt";10)
'))

Use a q function callback

>>> import pykx as kx
>>> from kxi import sp

>>> # define events to subscribe to, an events table schema and set as a q global variable
>>> types = ['file.found','file.start','file.progress','file.end']
>>> events = kx.Table(data={'eventType': kx.SymbolVector([]),
        'eventTime': kx.TimestampVector([]),
        'origin': kx.SymbolVector([]),
        'data': kx.Identity(None)})
>>> kx.q.set('qevents', events)

>>> # define a q event handler
>>> kx.q('updateEvents:{`qevents upsert value x}')

>>> for typ in types:
        sp.subscribe(typ, update_file_events)

# create a file and run pipeline to read it
>>> kx.q('`:/tmp/numbers.txt 0: string til 5;')
>>> sp.run(sp.read.from_file('/tmp/numbers.txt')
        | sp.write.to_variable('out'))

>>> # check events table
>>> kx.q('qevents')
pykx.Table(pykx.q('
eventType          eventTime                     origin data
---------------------------------------------------------------------------------------------------------
file.found    2025.02.21D12:33:39.231703197 file   (,`paths)!,,":/tmp/numbers.txt"
file.start    2025.02.21D12:33:41.812599917 file   `path`size!(":/tmp/numbers.txt";10)
file.progress 2025.02.21D12:33:41.812831273 file   `path`totalBytes`bytesRead!(":/tmp/numbers.txt";10;10)
file.end      2025.02.21D12:33:41.812841521 file   `path`size!(":/tmp/numbers.txt";10)
'))

Unsubscribe

Unsubscribes from event callbacks. You can subscribe from a single subscription or from all subscriptions for a given event.

.qsp.unsubscribe[id]

Parameters:

name q type description
id symbol or (symbol;long) If given just an event type, all subscriptions are removed for that event. Alternatively a single subscription can be subscribed using the ID returned by the call to .qsp.subscribe.

Returns:

type description
null

Examples:

Unsubscribe all subscribers for a given event_type

.file.events: ([] eventType: `symbol$(); eventTime: `timestamp$(); origin: `symbol$(); data:());
.qsp.subscribe[`file.found; {`.file.events upsert value x}];
.qsp.unsubscribe[`file.found];

Unsubscribe from a specific event subscription

.file.events: ([] eventType: `symbol$(); eventTime: `timestamp$(); origin: `symbol$(); data:());
id:.qsp.subscribe[`file.found ; {`.file.events upsert value x}];
.qsp.unsubscribe[id];
sp.unsubscribe(id)

Parameters:

name type description
id Union[str, List[Any]] If given just an event type, all subscriptions are removed for that event. Alternatively a single subscription can be subscribed using the ID returned by the call to .qsp.subscribe.

Unsubscribes a particular subscriber or all subscribers for a given event type.

Returns:

type description
None

Examples:

Unsubscribe all subscribers for a given event_type

>>> import pykx as kx
>>> from kxi import sp

>>> def update_events(event):
        print(event)
>>> sp.subscribe('file.found', update_events)

>>> # unsubscribe all file.found events
>>> sp.unsubscribe('file.found')

Unsubscribe a specific subscription

>>> import pykx as kx
>>> from kxi import sp

>>> def update_events(event):
        print(event)

>>> id = sp.subscribe('file.found', update_events)
>>> sp.unsubscribe(id)