Skip to content

Pipeline Read Triggering

This page provides an introduction to reader triggering. It explains which readers this feature applies to and provides examples with the different trigger read options.

Reader Types

When a pipeline starts, the default behavior is for the reader(s) to begin reading immediately. Readers can be divided into two broad categories: Bounded or Unbounded.

  • Bounded Readers: When reading data, an action is taken by the reader to request, or pull, data from a source. These readers perform this action and then push the resulting data down the pipeline. These readers are considered bounded because once the read is complete, their task is finished and they cannot return to a running state. The kdb Insights Database and Expression readers are typical examples of bounded readers. The file/cloud readers are considered bounded if file watching is not enabled. They can read a file or list of files, and once all the files have been are read, the reader is finished.

  • Unbounded Readers. These readers continue to read indefinitely. Streaming and file watching readers are common examples of unbounded readers. If the file/cloud readers are configured to watch for files, they watch for files forever and do not finish, unless they are manually finished.

Triggering

In many cases, it is desirable for pull readers to execute more than once during a pipeline's lifespan. For example, a database reader pulling reference data from a database needs to refresh periodically in long-running pipelines joining streaming data to reference data. Performing this action once at pipeline startup is insufficient.

To achieve this, the trigger mechanism (provided as an argument to the reader) transforms the reader from bounded to unbounded. The following examples demonstrate how this can be achieved natively within the Stream Processor.

Examples

In these examples, the trigger option has been added to the pull readers. Refer to pull reader options for further details on these options.

Some of the following examples include the name parameter, the API trigger can optionally take the operator name as an argument. Refer to configuring operators for further details.

Once

The default behavior (if no trigger argument is provided) is for the read to be performed once at pipeline startup. This can also be explicitly set using once as the trigger value. The example below shows a simple expression reader explicitly set to trigger once. It executes the expression provided on pipeline startup, pushes the data down the pipeline and then the pipeline finishes.

    .qsp.run
        .qsp.read.fromExpr[{"sym,price\nKX,100\n"}; .qsp.use enlist[`trigger]!enlist `once]
        .qsp.decode.csv[]
        .qsp.write.toVariable[`.test.cache]
    >>> import pykx as kx
    >>> from kxi import sp
    >>> from kxi.sp.read import TriggerMode

    >>> sp.run(sp.read.from_expr('"sym,price\nKX,100\n"', trigger=TriggerMode.once)
            | sp.decode.csv('')
            | sp.write.to_variable('.test.cache'))

The result is the same as running the pipeline without the trigger option.

API

With this option, the read is triggered only by an API call, and can be triggered repeatedly.

You can use either q (for q pipelines), python (for python pipelines) or REST (for either q or python pipelines) APIs which can be used to trigger a read if the operator is configured as such:

The example below shows how two expression readers in a pipeline can be triggered by an API. Note when API mode is chosen for a reader the reader does not start reading automatically on startup, it only begins reading when it is triggered by an API call.

    triggerApi: enlist[`trigger]!enlist `api;

    namedRead: .qsp.read.fromExpr[{"sym,price\nKX,100\n"}; .qsp.use @[triggerApi;`name;:;`myExpr]];
    unnamedRead: .qsp.read.fromExpr[{"sym,price\nKX,100\n"}; .qsp.use triggerApi];

    .qsp.run
      namedRead
      .qsp.merge[unnamedRead]
      .qsp.decode.csv[]
      .qsp.write.toVariable[`.test.cache]

The read can now be triggered logically using the q API - .qsp.triggerRead

    // Triggers only the named `myExpr reader
    .qsp.triggerRead[`myExpr];

    // Triggers all pull readers configured with triggering
    .qsp.triggerRead[];
    >>> import pykx as kx
    >>> from kxi import sp
    >>> from kxi.sp.read import TriggerMode
    >>> import pandas as pd

    >>> def price_data():
            df1 = pd.DataFrame({'symbol': ['AAPL', 'GOOGL'], 'price': [150, 2800]})
            df1.set_index('symbol', inplace=True)
            return df1

    >>> def size_data():
            return pd.DataFrame({'symbol': ['AAPL', 'GOOGL'], 'size': [100, 200]})

    >>> namedRead = sp.read.from_expr(price_data, name='myExpr', trigger=TriggerMode.api)
    >>> unnamedRead = sp.read.from_expr(size_data, TriggerMode.api)
    >>>     sp.run(namedRead
            | sp.merge(unnamedRead,
                lambda x, y: pd.merge(x.pd(), y.pd(), on='symbol',how='left'))
            | sp.write.to_variable('.test.cache'))

The read can be triggered logically using the Python API - sp.trigger_read

    >>> import pykx as kx
    >>> from kxi import sp

    # Triggers only the named 'myExpr' reader
    >>> sp.trigger_read('myExpr')
    # Triggers all pull readers configured with triggering
    >>> sp.trigger_read()

We can also trigger reads using the REST API in both q and python pipelines: /pipeline/{id}/admin/triggerRead

Timer

In cases where the read has to be performed continually based on a timer, the trigger option can be configured as a 2 or 3 element array, with timer as the first value. The remaining items represent the period and start time of the read:

  • Period: Sets how often the timer fires, and is specified as a timespan type in q or a datetime.timedelta in python.

  • Start Time: This is the time of the first read. When this is provided, it must be timestamp or time type in q, and either a datetime.datetime or datetime.time in python. If a start time is not provided the default is now, that is the deployment time.

  • Note if somehow the start time is in the past, the period is added n times until it gets a time in the future. E.g. if start time is 12:00:00.000 with a period of 3 hours, but its deployed at 14:00:00.000, the first read time moves to 12:00:00.000 + 3 hours so is at 15:00:00.000.

    // Timer fires immediately, then every 5 seconds
    triggerTimer: enlist[`trigger]!enlist (`timer; 0D00:00:05);

    // Timer waits until midday, and then reads every 5 seconds
    triggerTimer: enlist[`trigger]!enlist (`timer; 0D00:00:05; 12:00:00.000);

    .qsp.run
      pipe: .qsp.read.fromExpr[{"sym,price\nKX,100\n"}; .qsp.use triggerTimer]
      .qsp.decode.csv[]
      .qsp.write.toVariable[`.test.cache]
    >>> import pykx as kx
    >>> from datetime import datetime, time, timedelta
    >>> from kxi import sp
    >>> from kxi.sp.read import TriggerMode

    # Timer fires immediately, then every 24 hours
    >>> triggerTimer = [TriggerMode.timer, timedelta(days=1)]
    # Timer waits until midnight, and then reads every 24 hours
    >>> triggerTimer = [TriggerMode.timer, timedelta(days=1), time(0, 0, 0)]

    >>> sp.run(sp.read.from_expr('"sym,price\nKX,100\n"', trigger=triggerTimer)
        | sp.decode.csv('')
        | sp.write.to_variable('.test.cache'))

Note that when configured with a timer, the reader can also still be triggered with the Trigger Read q/python/REST APIs.