Skip to content

Lifecycle

This page provides an overview for managing actions upon various pipeline events.

Finish Task

Marks a task as finished.

.qsp.finishTask[op;taskID]

Indicate that the specified task in the specified operator has finished processing. If all tasks for the operator are finished, and the operator has been requested to finish, the operator is marked as finished.

Parameters:

name type description default
op .qsp.op A pipeline operator configuration, or the name of the operator as a symbol. Required
taskID int A task identifier. Required

Examples:

Register an asynchronous task for true async using .qsp.apply:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.apply[
    {[op; md; data]
      // Register a task that represents the unfinished async kurl GET request
      tid: .qsp.registerTask[op];
      .kurl.async (
        data`url;
        "GET";
        ``callback!(
          ::;
          {[op;md;data;tid;r]        // GET request with a callback
            .qsp.finishTask[op;tid]; //   Mark the task as finished
            data[`response]: r 1;    //   Add GET request response to data
            .qsp.push[op;md;data];   //   Push enriched data to the next operator
          }[op;md;data;tid]
        )
      )
    }
  ]
  .qsp.write.toConsole[]

publish `url`response!("https://www.google.ca"; "");
2021.10.02D11:49:58.896623200 | url     | "https://www.google.ca"
2021.10.02D11:49:58.896623200 | response| "<!doctype html><html  ..

finish_task(operator, task_id)

Indicate that the specified task in the specified operator has finished processing. If all tasks for the operator are finished, and the operator has been requested to finish, the operator is marked as finished.

Parameters:

name type description default
operator Operator The operator in which the task was created. Required
task_id int The task identifier. Required

See Register Task for more information

On Error

Sets the onError handler for the pipeline - the global handler that will be called when an error occurs.

.qsp.onError[handler]

Parameters:

name type description
handler function A ternary function that is passed any messages that cause an error in the pipeline. The first argument is the error message, the operator that threw the error and the batch of data that cause the error.

The onError event handler is called when an error occurs in a pipeline. The handler will be passed an error message, the operator that threw the error, and the batch that caused the error.

Parameters:

name type description
handler function A function that will be called when an error occurs in a pipeline. It will be provided three arguments: the error message (as a character vector), the operator (as a q dictionary which can be passed into other kxi.sp functions that take an operator), and the batch of data that was being processed when the error occurred. Its return value is ignored.

Examples:

>>> from kxi import sp
>>> import pykx as kx

>>> errored = False

>>> def on_error(errMsg, op, data):
        global errored
        errored = True

>>> sp.lifecycle.on_error(on_error)
>>> sp.run(sp.read.from_callback('publish')
        | sp.map(lambda x: 1 / 0)
        | sp.write.to_console())
>>> assert not errored
>>> kx.q('publish',range(10))
>>> assert errored

On Checkpoint

Sets the onCheckpoint handler - the global handler that will be called when a checkpoint is being created.

.qsp.onCheckpoint[handler]

Parameters:

name type description
handler function A nullary function to execute during checkpointing.

This sets the onCheckpoint handler for the pipeline.

The onCheckpoint handler for the pipeline is called whenever a checkpoint is created. The return value is saved to the checkpoint. This value will be passed into the onPostCheckpoint and onRecover handlers, and can be used for tracking files.

sp.lifecycle.on_checkpoint(my_on_checkpoint_function)

Parameters:

name type description
handler function A function that will be called when a checkpoint is being created. It will not be provided any arguments. The return value is converted to a q type, then saved in the checkpoint. This value will be passed into the on_post_checkpoint and on_recover handlers, and can be used for tracking files.

Examples:

>>> from kxi import sp
>>> import pykx as kx
>>> import datetime

>>> checkpointed = False
>>> def on_checkpoint():
    global checkpointed
    checkpointed = True
    return datetime.datetime.now()
>>> sp.lifecycle.on_checkpoint(on_checkpoint)

>>> recovered = False
>>> def on_recover(auxiliary_data):
    print(f'on_recover called with {auxiliary_data}')
    global recovered
    recovered = True
>>> sp.lifecycle.on_recover(on_recover)

>>> sp.run(sp.read.from_callback('publish')
    | sp.write.to_console())

# Simulate recovery from a checkpoint
>>> assert not checkpointed
>>> kx.q('.spcp.writeCheckpoint[] `id`checkpoint!(0j; .sp.createCheckpoint[])')
>>> assert checkpointed and not recovered
# These are internal API calls, and are liable to change without notice.
# They should not be used in production.
>>> kx.q('.sp.i.recoverFromCheckpoint .spwork.i.createCheckpoint[]')
>>> assert recovered

On Operator Checkpoint

Sets the onOperatorCheckpoint event handler for a specific operator - a per-operator handler that will be called when a checkpoint is being created.

.qsp.onOperatorCheckpoint[id;handler]

Parameters:

name type description
id symbol An operator ID.
handler function A unary function which executes before an operator performs a checkpoint

The onOperatorCheckpoint event handler is called when a checkpoint is created, before operator state is saved. This provides an opportunity for an operator to make updates to its state before it is checkpointed. The return value of the handler is also added to the checkpoint, and will be passed to the operators onOperatorRecover function on recovery.

Examples:

.qsp.onOperatorCheckpoint[`map;{[op]
    log.info ("Operator state:\n%s"; .Q.s .qsp.get[op;::]);
    }]

.qsp.run
    .qsp.read.fromCallback[`publish]
    .qsp.map[{[op;md;x]
        .qsp.set[op;md] .qsp.get[op;md] upsert x;
        x
        }; .qsp.use `state`name!(();`map)]
    .qsp.write.toConsole[];

The handler will be called before the specified operator's state is saved. This provides an opportunity to make updates to the state before it is checkpointed.

sp.lifecycle.on_operator_checkpoint(operator, handler)

Readers Unsupported

This cannot be used for reader operators.

Parameters:

name type description
operator OperatorSpecifier The operator for which to assign the handler.
handler Callable[[kx.Dictionary], Any] A function that will be called when a checkpoint is being created. It will be provided the operator as an argument, and whatever it returns is converted to a q type, then saved in the checkpoint. Later this data may be passed into the operator's on_operator_post_checkpoint and on_operator_recover handlers.

Examples:

>>> from kxi import sp
>>> import pykx as kx

>>> operatorCheckpointed = False
>>> def on_operator_checkpoint(op):
        global operatorCheckpointed
        operatorCheckpointed = True

>>> sp.lifecycle.on_operator_checkpoint('callback', on_operator_checkpoint)
>>> sp.run(sp.read.from_callback('publish', name='callback')
        | sp.write.to_console())

>>> assert not operatorCheckpointed
>>> kx.q('.spcp.writeCheckpoint[] (!) . flip ((`checkpoint; .sp.createCheckpoint[]);(`id;0j));')
>>> assert operatorCheckpointed

On Operator Post Checkpoint

Sets the onOperatorPostCheckpoint event handler for a specific operator - a per-operator handler that will be called when a checkpoint is being created.

.qsp.onOperatorPostCheckpoint[id;handler]

Parameters:

name type description
id symbol An operator ID.
handler function A ternary function to execute after a checkpoint has been saved.

Event Handler

The onOperatorPostCheckpoint event handler is called after a checkpoint has either been saved to disk or written to the Controller.

It has arguments:

  1. operator
  2. checkpointed operator state (i.e. state set via Set)
  3. data returned by the operator's onCheckpoint handler

Readers Unsupported

The operator post checkpoint hook cannot be used on a reader operator. This includes any operators from the .qsp.read.* collection.

The handler will be called before the specified operator's state is saved. This provides an opportunity to make updates to the state before it is checkpointed.

sp.lifecycle.on_operator_post_checkpoint(operator, handler)

Readers Unsupported

This cannot be used for reader operators.

Parameters:

name type description
operator OperatorSpecifier The operator for which to assign the handler.
handler Callable[[kx.Dictionary, kx.K, kx.K] , None] A function that will be called when a checkpoint is being created. It will be provided the operator as an argument, and whatever it returns is converted to a q type, then saved in the checkpoint. Later this data may be passed into the operator's on_operator_post_checkpoint and on_operator_recover handlers.

Examples:

>>> from kxi import sp
>>> import pykx as kx

>>> operatorCheckpointed = False
>>> def on_operator_checkpoint(op):
        global operatorCheckpointed
        operatorCheckpointed = True

>>> sp.lifecycle.on_operator_checkpoint('callback', on_operator_checkpoint)
>>> sp.run(sp.read.from_callback('publish', name='callback')
        | sp.write.to_console())

>>> assert not operatorCheckpointed
>>> kx.q('.spcp.writeCheckpoint[] (!) . flip ((`checkpoint; .sp.createCheckpoint[]);(`id;0j));')
>>> assert operatorCheckpointed

On Operator Recover

Sets the onOperatorRecover event handler for a specific operator - a per-operator handler that will be called when a pipeline recovers from a checkpoint.

.qsp.onOperatorRecover[id;handler]

Parameters:

name type description default
id OperatorSpecifier An operator ID. Required
handler Callable[[kx.Dictionary, kx.K], None] A binary function to execute after an operator recovers Required

Sets the onOperatorRecover event handler of the operator with the id. The onOperatorRecover handler is called when a pipeline recovers from a checkpoint. Operator state has already been restored once this handler has been called. This provides an opportunity for operators to make updates to their state before the pipeline starts running.

Reader Unsupported

The operator recover hook cannot be used on a reader operator. This includes any operators from the .qsp.read.* collection."

Examples:

Checkpointing external state

TABLE: ();

.qsp.onOperatorCheckpoint[`map;{[op] TABLE }];

.qsp.onOperatorRecover[`map; {[op;t] TABLE:: t}];

.qsp.run
    .qsp.read.fromCallback[`publish]
    .qsp.map[{
        `TABLE upsert x;
        x
        }; .qsp.use ``name!(::;`map)]
    .qsp.write.toConsole[];

Operator state has already been restored by the time the recovery handler is called. This provides an opportunity to edit the state before the pipeline starts. See also: on_operator_checkpoint, which can be used to set an operator's state before it is checkpointed.

sp.lifecycle.on_operator_recover(operator, handler)

Readers Unsupported

This cannot be used for reader operators.

Parameters:

name type description default
operator str The operator for which to assign the handler. Required
handler function A function that will be called when a pipeline recovers from a checkpoint. It will be provided two arguments: the operator, and the checkpoint data provided by the operator's on_operator_checkpoint handler. Its return value is ignored. Required

Examples:

>>> from kxi import sp
>>> import pykx as kx

>>> operatorRecovered = False
>>> def on_operator_recover(op, data):
        global operatorRecovered
        operatorRecovered = True

>>> sp.lifecycle.on_operator_recover('callback', on_operator_recover)
>>> sp.run(sp.read.from_callback('publish', name='callback')
       | sp.write.to_console())
>>> assert not operatorRecovered
# These are internal API calls used to simulate checkpoint recovery.
# They are liable to change without notice, and should not be used in production.
>>> kx.q('.sp.i.recoverFromCheckpoint .spwork.i.createCheckpoint[]')
>>> assert operatorRecovered

On Post Checkpoint

Sets the onPostCheckpoint handler - the global handler that will be called after a checkpoint has been created.

.qsp.onPostCheckpoint[handler]

Parameters:

name type description default
handler function A unary function to be called after a checkpoint has been written. Required

This sets the onPostCheckpoint handler for the pipeline.

The onPostCheckpoint handler for the pipeline is called after a checkpoint has been written either to disk or to the controller, and has been acknowledged by the controller.

Its argument is the data returned by the pipeline's onCheckpoint handler.

The handler will be called after the checkpoint has been written to disk and acknowledged by the controller.

sp.lifecycle.on_post_checkpoint(handler)

Parameters:

name type description default
handler Callable[[kx.K], None] A function that will be called after a checkpoint has been created. It will be provided one argument, which is the return value of the on_checkpoint handler. Its return value is ignored. Required

Examples:

>>> from kxi import sp
>>> import pykx as kx
>>> import datetime

>>> def on_checkpoint():
    return datetime.datetime.now()
>>> sp.lifecycle.on_checkpoint(on_checkpoint)

>>> postCheckpointed = False
>>> def on_post_checkpoint(auxiliary_data):
        print(f'on_post_checkpoint called with {auxiliary_data}')
        postCheckpointed = True


>>> sp.lifecycle.on_post_checkpoint(on_post_checkpoint)
>>> sp.run(sp.read.from_callback('publish')
        | sp.write.to_console())
>>> assert not postCheckpointed
>>> kx.q('.spcp.WrittenCheckpoint: (!) . flip ((`checkpoint;.sp.createCheckpoint[]);(`id;0j))')
>>> kx.q('.spwork.acknowledgeCheckpoint[""]')
>>> assert postCheckpointed

On Recover

Sets the onRecover handler - the global handler that will be called when a pipeline recovers from a checkpoint.

.qsp.onRecover[handler]

Parameters:

name type description
handler function A unary function to execute after recovery.

This sets the onRecover handler for the pipeline.

The onRecover handler for the pipeline is called when a pipeline recovers from a checkpoint.

Its argument is the data returned by the pipeline's onCheckpoint handler.

sp.lifecycle.on_recover(handler)

Parameters:

name type description
handler function A function that will be called when a pipeline recovers from a checkpoint. It will be provided one argument, which is whatever was returned by the last run checkpoint handler (see on_checkpoint). Its return value is ignored.

Examples:

>>> from kxi import sp
>>> import pykx as kx

>>> recovered = False
>>> def on_recover():
        global recovered
        recovered = True

>>> sp.lifecycle.on_recover(on_recover)
>>> sp.run(sp.read.from_callback('publish')
       | sp.write.to_console())

>>> assert not recovered
# These are internal API calls used to simulate checkpoint recovery.
# They are liable to change without notice, and should not be used in production.
>>> kx.q('.sp.i.recoverFromCheckpoint .spwork.i.createCheckpoint[]')
>>> recovered
True

On Setup

Sets the onSetup event handler - the global handler that will be called before a pipeline has finished initializing.

.qsp.onSetup[handler]

Parameters:

name type description
handler function A nullary function to execute after all operators in a pipeline are setup.

This sets the onSetup event handler for the pipeline.

The onSetup event handler is called before the pipeline is finished initializing,.

sp.on_setup(handler)

Parameters:

name type description
handler Callable[[], None] A function that will be called before a pipeline has finished initializing, after all operator in a pipeline are setup. It will not be provided any arguments, and its return value is ignored.

Examples:

>>> from kxi import sp

>>> def on_setup():
        global setup
        setup = True
>>> setup = False

>>> assert not setup
>>> sp.lifecycle.on_setup(on_setup)
>>> sp.run(sp.read.from_callback('publish')
        | sp.write.to_console())
>>> assert setup

On Start

Sets the onStart event handler - the global handler that will be called when all operators in a pipeline have started.

.qsp.onStart[handler]

Parameters:

name type description
handler function A nullary function to execute after all operators in a pipeline are started.

This sets the onStart event handler for the pipeline.

The onStart event handler is called after the pipeline has invoked start on its readers; it returns generic null.

sp.on_start(handler)

Parameters:

name type description
handler Callable[[], None] A function that will be called when all operators in a pipeline have started. It will not be provided any arguments, and its return value is ignored.

Examples:

>>> from kxi import sp

>>> def on_start():
        global started
        started = True

>>> started = False

>>> sp.lifecycle.on_start(on_start)
>>> sp.run(sp.read.from_callback('publish')
        | sp.write.to_console())
>>> assert started

On Finish

Sets the onFinish event handler.

.qsp.onFinish[handler]

Parameters:

name type description
handler function A unary function that accepts an operator configuration to execute when a given operator is finished.

This sets the onFinish event handler for the pipeline.

The onFinish event handler is called when an operator is complete; it returns generic null.

Coming Soon

On Teardown

Sets the onTeardown event handler - the global handler that will be called before a pipeline is torn down.

.qsp.onTeardown[handler]

Parameters:

name type description
handler function A nullary function to execute before a pipeline is torn down.

This sets the onTeardown event handler for the pipeline.

The onTeardown event handler is called after the pipeline is torn down; it returns generic null.

A teardown can be triggered by calling sp.teardown.

sp.lifecycle.on_teardown(handler)

Parameters:

name type description
handler Callable[[], None] A function that will be called before a pipeline is torn down. It will not be provided any arguments, and its return value is ignored.

Examples:

>>> from kxi import sp

>>> torndown = False
>>> def on_teardown():
        global torndown
        torndown = True
>>> sp.lifecycle.on_teardown(on_teardown)
>>> sp.run(sp.read.from_callback('publish')
        | sp.write.to_console())
>>> sp.teardown()
>>> assert torndown

Register Task

Registers a task for an operator.

.qsp.registerTask[op]

Parameters:

name type description
op .qsp.op A pipeline operator.

This registers a task for the operator and returns an identifier (int) for the task.

A task represents an asynchronous request that an operator made, and once that request is finished, the task should be marked as finished using Finish Task.

Register an asynchronous task for true async using Apply:

Examples:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.apply[
    {[op; md; data]
      // Register a task which represents the unfinished async kurl GET request
      tid: .qsp.registerTask[op];
      .kurl.async (
        "https://www.google.ca";
        "GET";
        ``callback!(::;
          {[op;md;data;tid;r]         // GET request with a callback
            .qsp.finishTask[op;tid];  //   Mark the task as finished
            data[`response]: r 1;     //   Add GET request response to data
            .qsp.push[op;md;data];    //   Push enriched data to the next operator
          }[op;md;data;tid]
        )
      );
    }
  ]
  .qsp.write.toConsole[]
sp.lifecycle.register_task(operator)

A task represents an asynchronous request that an operator made, and once that request is finished, the task should be marked as finished using sp.finish_task.

Parameters:

name type description
operator OperatorSpecifier The operator for which a task will be registered.

Returns:

A task ID, which will be needed to call sp.finish_task.

Subscribe

Subscribes to a particular event with the provided callback.

.qsp.subscribe[eventType; handler]

Parameters:

name type description
eventType symbol Type of an event.
handler function A unary function to execute after the eventType occurs. Takes the event as argument.

An event is a dictionary with keys:

key q type description
type symbol Type of the event.
time timestamp Timestamp.
origin symbol Origin identifier.
data any Payload of the event object.

When an event is emitted matching the type of the event subscribed to, the provided callback will be invoked with the event object.

Returns:

type description
list A 2-element list containing the event type and a subscription identifier to use to unsubscribe, {(symbol;long)}.

Subscribe to file events File:

Examples:

.file.events: ([] eventType: `symbol$(); eventTime: `timestamp$(); origin: `symbol$(); data:());
.qsp.subscribe[ ;{`.file.events upsert value x}] each `file.found`file.start`file.progress`file.end;
`:/tmp/numbers.txt 0: string til 5;

.qsp.run
  .qsp.read.fromFile[`:/tmp/numbers.txt]
  .qsp.write.toVariable[`test.output];
q).file.events
eventType     eventTime                     origin                 data
-------------------------------------------------------------------------------------------------------------------------
file.found    2022.11.18D09:41:14.111578506 file_:/tmp/numbers_txt (,`paths)!,,":/tmp/numbers.txt"
file.start    2022.11.18D09:41:14.111668936 file_:/tmp/numbers_txt `path`size!(":/tmp/numbers.txt";10)
file.progress 2022.11.18D09:41:14.111703746 file_:/tmp/numbers_txt `path`totalBytes`bytesRead!(":/tmp/numbers.txt";10;10)
file.end      2022.11.18D09:41:14.111710593 file_:/tmp/numbers_txt `path`size!(":/tmp/numbers.txt";10)

sp.subscribe(event_type, handler)

Parameters:

name type description
event_type kx.CharVector Type of an event.
handler Union[Callable[[kx.Dictionary], None], str] A unary function to be called for this event. Can be a Python function or a string referencing a q function.

The callback event will be a dictionary with keys:

key q type Python type description
type symbol str Type of the event.
time timestamp kx.TimestampAtom Timestamp.
origin symbol str Origin identifier.
data any Any Payload of the event object.

When the SP notifies this event, the handler will be invoked with the event object. A Python handler will return a pykx.Dictionary object.

Returns:

type description
List[Any] A 2-element list containing the event type and a subscription identifier to use to unsubscribe.

Examples:

Python function callback

Use a Python function callback.

>>> import pykx as kx
>>> from kxi import sp

>>> # define events to subscribe to and table to store them in
>>> types = ['file.found','file.start','file.progress','file.end']
>>> events = kx.Table(data={'eventType': kx.SymbolVector([]),
        'eventTime': kx.TimestampVector([]),
        'origin': kx.SymbolVector([]),
        'data': kx.Identity(None)})

>>> # define events handler
>>> def update_file_events(event: kx.Dictionary):
        events.upsert(event.values())

>>> for typ in types:
        sp.subscribe(typ, update_file_events)

# create a file and run pipeline to read it
>>> kx.q('`:/tmp/numbers.txt 0: string til 5;')
>>> sp.run(sp.read.from_file('/tmp/numbers.txt')
        | sp.write.to_variable('out'))

# check events table
events
pykx.Table(pykx.q('
eventType     eventTime                     origin data
---------------------------------------------------------------------------------------------------------
file.found    2025.02.21D12:00:38.673846000 file   (,`paths)!,,":/tmp/numbers.txt"
file.start    2025.02.21D12:01:18.920759000 file   `path`size!(":/tmp/numbers.txt";10)
file.progress 2025.02.21D12:01:18.921781000 file   `path`totalBytes`bytesRead!(":/tmp/numbers.txt";10;10)
file.end      2025.02.21D12:01:18.922243000 file   `path`size!(":/tmp/numbers.txt";10)
'))

Use a q function callback

>>> import pykx as kx
>>> from kxi import sp

>>> # define events to subscribe to, an events table schema and set as a q global variable
>>> types = ['file.found','file.start','file.progress','file.end']
>>> events = kx.Table(data={'eventType': kx.SymbolVector([]),
        'eventTime': kx.TimestampVector([]),
        'origin': kx.SymbolVector([]),
        'data': kx.Identity(None)})
>>> kx.q.set('qevents', events)

>>> # define a q event handler
>>> kx.q('updateEvents:{`qevents upsert value x}')

>>> for typ in types:
        sp.subscribe(typ, update_file_events)

# create a file and run pipeline to read it
>>> kx.q('`:/tmp/numbers.txt 0: string til 5;')
>>> sp.run(sp.read.from_file('/tmp/numbers.txt')
        | sp.write.to_variable('out'))

>>> # check events table
>>> kx.q('qevents')
pykx.Table(pykx.q('
eventType          eventTime                     origin data
---------------------------------------------------------------------------------------------------------
file.found    2025.02.21D12:33:39.231703197 file   (,`paths)!,,":/tmp/numbers.txt"
file.start    2025.02.21D12:33:41.812599917 file   `path`size!(":/tmp/numbers.txt";10)
file.progress 2025.02.21D12:33:41.812831273 file   `path`totalBytes`bytesRead!(":/tmp/numbers.txt";10;10)
file.end      2025.02.21D12:33:41.812841521 file   `path`size!(":/tmp/numbers.txt";10)
'))

Unsubscribe

Unsubscribes from event callbacks. You can subscribe from a single subscription or from all subscriptions for a given event.

.qsp.unsubscribe[id]

Parameters:

name q type description
id symbol or (symbol;long) If given just an event type, all subscriptions are removed for that event. Alternatively a single subscription can be subscribed using the ID returned by the call to .qsp.subscribe.

Returns:

type description
null

Examples:

Unsubscribe all subscribers for a given event_type

.file.events: ([] eventType: `symbol$(); eventTime: `timestamp$(); origin: `symbol$(); data:());
.qsp.subscribe[`file.found; {`.file.events upsert value x}];
.qsp.unsubscribe[`file.found];

Unsubscribe from a specific event subscription

.file.events: ([] eventType: `symbol$(); eventTime: `timestamp$(); origin: `symbol$(); data:());
id:.qsp.subscribe[`file.found ; {`.file.events upsert value x}];
.qsp.unsubscribe[id];
sp.unsubscribe(id)

Parameters:

name type description
id Union[str, List[Any]] If given just an event type, all subscriptions are removed for that event. Alternatively a single subscription can be subscribed using the ID returned by the call to .qsp.subscribe.

Unsubscribes a particular subscriber or all subscribers for a given event type.

Returns:

type description
None

Examples:

Unsubscribe all subscribers for a given event_type

>>> import pykx as kx
>>> from kxi import sp

>>> def update_events(event):
        print(event)
>>> sp.subscribe('file.found', update_events)

# unsubscribe all file.found events
>>> sp.unsubscribe('file.found')

Unsubscribe a specific subscription

>>> import pykx as kx
>>> from kxi import sp

>>> def update_events(event):
        print(event)

>>> id = sp.subscribe('file.found', update_events)
>>> sp.unsubscribe(id)