General
This page provides an overview of methods for managing various pipeline behaviours.
Some of these methods take pipeline operator's metadata as an input which the Stream Processor exposes across many APIs, such as state and pushing data further in a pipeline within asynchronous nodes. A pipeline operator's metadata is a dictionary containing the following keys.
Metadata Keys
These keys will be extended as new capabilities are added to the Stream Processor engine.
- key
-
An optional partition key for the events in this message. This is used to group related collections of data based on a common attribute. Events that do not share the same key are considered independent and may be processed independently of other keys. (any type)
- window
-
Indicates the start time or index of a window of data (temporal)
- offset
-
An event offset is a number or time that represents the current message offset to the stream reader. (int or temporal)
This value will be used to recover data if messages are lost downstream. If messages need to be replayed, the message offset will be used to indicate the last successfully processed message.
Add One Shot Timer
Adds or updates a once-off timer event.
.qsp.timer.add1shot[id;x;ofs]
Parameters:
name | type | description |
---|---|---|
id | symbol | Specifies the timer ID. If this ID exists, it is replaced. |
x | list | Specifies the expression to execute (typically a function name followed by its parameters). |
ofs | int, timespan | Specifies the offset to run start, as either a value in milliseconds or a timespan. .tm.nextt can be used to synchronize the timer event with a specific point in time. |
Returns:
Type | Description |
---|---|
null |
sp.timer_add1shot(function_id, function_and_args,offset)
Parameters:
name | type | description |
---|---|---|
function_id | Union[str, kx.SymbolAtom] | Specifies the timer ID. If this ID exists, it is replaced. |
function_and_args | List[Any] | Specifies the expression to execute (typically a function name followed by its parameters). |
offset | Union[int, Any] | Specifies the offset to run start in milliseconds. |
Returns:
Type | Description |
---|---|
None |
Add Timer
Adds or updates a frequent timer event.
.qsp.timer.add[id;x;per;ofs]
Parameters:
name | type | description | default |
---|---|---|---|
id | symbol | Specifies the timer ID. If this ID exists, it is replaced. | Required |
x | list | Specifies the expression to execute. This is a function name followed by its parameters (generic null '::' if no parameters exist). | Required |
per | int, timespan | Specifies the timer period, as either a value in milliseconds or a timespan. If this value is a 2-element vector, an exponential backoff is applied to repeated invocations up to the maximum period specified by the second element. | Required |
ofs | int, timespan | Specifies the offset to the first run, as either a value in milliseconds or a timespan. If this value is zero, the timer is scheduled for the next timer run ("immediate"). .tm.nextt can be used to synchronize the timer event with a specific point in time. |
Returns:
Type | Description |
---|---|
null |
Examples:
counter:0
timerFn:{[] counter+1}
onStart:{[] .qsp.timer.add[`timerTick;(`timerFn;::);20000;0]}
.qsp.onStart[onStart]
sp.timer_add(function_id, function_and_args, period, offset)
Add a function to run timer and act in a deterministic manner
(see Determinism)
with respect to other data events in the pipeline. It behaves
identically to .tm.add in 'unsafe mode' and acts deterministically
in 'safe mode' (when the env var KXI_SP_UNSAFE_MODE
is false)
Parameters:
name | type | description | default |
---|---|---|---|
function_id | Union[str, kx.SymbolAtom] | Specifies the timer ID. If this ID exists, it is replaced. | Required |
function_and_args | List[Any] | Specifies the expression to execute. This is a function name followed by its parameters (None if no parameters exist). |
Required |
period | Union[int, Any] | Specifies the timer period in milliseconds. | Required |
offset | Union[int, Any] | Specifies the offset to the first run in milliseconds. If this value is zero, the timer is scheduled for the next timer run ("immediate"). | Required |
Returns:
Type | Description |
---|---|
None |
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> counter = 0
>>> def on_start():
def timer():
counter+=1
sp.timer_add("timer_tick", [timer, None], 20000, 0)
>>> sp.lifecycle.on_start(on_start)
Clear Data Trace
Deprecated function that will be removed in a future release. Refer to the documentation for Reset Data Trace instead.
Clear Trace
Disables program tracing logs.
.qsp.clearTrace[]
Clears trace logging and sets the log level to its previous level.
sp.clear_trace()
Clears trace level logging and resets logging level.
Config Path
Gets the location of user mounted configurations.
.qsp.configPath[]
.qsp.configPath[object]
Parameters:
name | type | description | default |
---|---|---|---|
object | string | The configuration object to get the path of. | The KXI_SP_CONFIG_PATH environment variable |
sp.config_path()
sp.config_path(obj)
Parameters:
name | type | description | default |
---|---|---|---|
obj | str | A string name of the configuration object to get the path of. | The KXI_SP_CONFIG_PATH environment variable |
Returns:
The location of user mounted configurations.
Usage Guidelines
This is mostly useful in Insights Enterprise deployments to access the path of ConfigMaps and Secrets. Passing the name of the ConfigMap or Secret to this function will return the path where it was mounted.
Changing the mount path
The mount path can be changed by setting KXI_SP_CONFIG_PATH
to a directory on the target deployment.
Delete Timer
Deletes one or more timer events.
.qsp.timer.del[ids]
Parameters:
name | type | description |
---|---|---|
ids | symbol or symbol[] | Specifies the timer IDs to delete. |
Returns:
type | description |
---|---|
null |
sp.timer_del(function_ids)
Parameters:
name | type | description |
---|---|---|
function_ids | Union[str, kx.SymbolAtom] | Specifies the timer IDs to delete |
Returns:
type | description |
---|---|
None |
Disable Data Tracing
Disables data tracing in the current pipeline.
.qsp.disableDataTracing[]
sp.disable_data_tracing()
Note
Disables data tracing from the current pipeline. This does not clear any captured trace data.
Data captured during tracing can still be accessed via .qsp.getDataTrace
.
See Enable Data Tracing for more details
Enable Data Tracing
Captures data outputs as they flow through a pipeline.
.qsp.enableDataTracing[]
Data tracing captures data that is flowing in the streaming pipeline. This inserts probes that cache the last value emitted by each operator in the pipeline. Writer operators capture the input presented to the writer. If a given operator has an error, the error is also captured and where the input is synchronous, the data is the input to the operator.
Performance Implications
Adding data capture to a pipeline may have an impact on the pipeline performance. Data tracing should be reserved for debugging purposes and not used in production deployments where possible.
Examples:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.map[{ select max price from x }]
.qsp.write.toVariable[`output]
.qsp.enableDataTracing[];
publish ([] date: .z.d; sym: 10?3?`3; price:10?100f);
.qsp.getDataTrace[]
| error metadata data
----------------| ------------------------------------------------------------------..
callback_publish| "" (,`)!,:: +`date`sym`price!(2022.06.06 2022.06.06 2022.06.06 ..
map | "" (,`)!,:: +(,`price)!,,95.97684
variable_output | "" (,`)!,:: +(,`price)!,,95.97684
See Get Data Trace for more details
sp.enable_data_tracing()
Data tracing captures data that is flowing in the streaming pipeline. This inserts probes that cache the last value from each operator in the pipeline. If a given operator has an error, the error is also captured and where possible, the data is the input to the operator.
Performance Implications
Adding data capture to a pipeline may have an impact on the pipeline performance. Data tracing should be reserved for debugging purposes and not used in production deployments where possible.
See Get Data Trace for more details
Get Data Trace
Returns the data from a data trace at the time of invocation.
.qsp.getDataTrace[]
When data tracing is enabled, getDataTrace
returns a point in time snapshot of the
last data values emitted by each node in the pipeline.
sp.get_data_trace()
When data tracing is enabled, get_data_trace
returns a point in time snapshot of the
last data values emitted by each node in the pipeline.
Returns:
The returned object is a dictionary of operator IDs to their respective data or errors. If a node has an error message, any data that is captured is the last input to that operator that caused the error.
See Enable Data Tracing for more details
Get Partition Count
Gets count of all partitions.
.qsp.getPartitionCount[]
Return the count of all partitions in the current stream. This can be combined with Get Partitions. to subset a stream.
Coming Soon
Get Partitions
Gets assigned partitions.
.qsp.getPartitions[]
Return a list of the partitions assigned to this worker. This can be used for subsetting a stream.
Coming Soon
Get Record Counts
Gets dataflow information for a pipeline.
.qsp.getRecordCounts[]
Returns:
A dictionary of operator IDs.
When record counting is enabled, .qsp.getRecordCounts
returns information on the total amount of
data that has flowed through the pipeline since enabled or since the cache was last reset.
sp.get_record_counts()
Returns:
A dictionary of operator IDs.
When record counting is enabled, sp.get_record_counts
returns information on the total amount of
data that has flowed through the pipeline since enabled or since the cache was last reset.
The returned object is a dictionary of operator IDs to their respective counts, where counts are partitioned into stream keys. The operators tracked depends on the record counting level.
See Set Record Counting
for more details.
Get Schema
Loads a schema from a mounted Assembly returning a schema object.
.qsp.getSchema[name]
Parameters:
name | type | description | default |
---|---|---|---|
name | symbol, string | The table name to read from the assembly. | Required |
Returns:
Coming Soon
Get Snapshot Cache
Reads the snapshot cache for pipeline operators.
Currently supported for Subscriber writer nodes.
.qsp.getSnapshotCache[opIDs; args]
Parameters:
name | q type | description |
---|---|---|
opIDs | symbol[] or null | List of operators (or operator IDs) to retrieve snapshots from. If null, all snapshots are returned. |
args | dict | Dictionary of arguments to pass to the function. |
Args:
name | q type | description |
---|---|---|
table | symbol | The table to retrieve snapshots from. |
Returns data from the snapshot cache for subscribers.
Examples:
Read the snapshot cache for the 'data' table for the 'subscriber' operator
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toSubscriber[`data; `sym`exch];
publish 2!enlist`sym`exch`time`pos!(`FD;`LSE;2023.06.25D15:00:00.000000000;100);
.qsp.getSnapshotCache[`subscriber_data;enlist[`table]!enlist[`data]]
sym exch| time pos
--------| ---------------------------------
FD LSE | 2023.06.25D15:00:00.000000000 100
Read the snapshot cache for all operators
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toSubscriber[`data; `sym`exch];
publish 2!enlist`sym`exch`time`pos!(`FD;`LSE;2023.06.25D15:00:00.000000000;100);
.qsp.getSnapshotCache[`;enlist[`table]!enlist[`data]]
()
(`s#+`sym`exch!(`p#,`FD;,`LSE))!+`time`pos!(,2023.06.25D15:00:00.000000000;,100)
get_snapshot_cache(operatorIDs, args)
name | type | description |
---|---|---|
operatorIDs | string[] or None | List of operators (or operator IDs) to retrieve snapshots from. If None , all snapshots are returned. |
args | dict | Dictionary of arguments to pass to the function. Current supported argument is table |
Examples:
Read the snapshot cache for the 'data' table
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> kx.q('upd:{show z}')
>>> sp.run(sp.read.from_callback('publish')
| sp.write.to_subscriber('data', 'sym'))
>>> updData = pd.DataFrame({
'sym': ['FDP', 'AAPL', 'MSFT'],
'id': [1, 2, 3],
'size': [100, 150, 200]
})
>>> kx.q('publish', updData)
>>> sp.get_snapshot_cache(None, {'table': 'data'})
Get Timer
Retrieves the properties of one or more timer events.
.qsp.timer.get[ids]
Parameters:
name | type | description | default |
---|---|---|---|
ids | symbol or symbol[] | Specifies the timer IDs to retrieve. If this value is null, all timer events are returned. | null |
Returns:
type | description |
---|---|
table | Relevant entries in .tm.tq table. |
sp.timer_get(function_ids)
Parameters:
name | type | description | default |
---|---|---|---|
function_ids | Union[str, kx.SymbolAtom] | Specifies the timer IDs to retrieve. If this value is null, all timer events are returned. | Required |
Returns:
type | description |
---|---|
table | Relevant entries in .tm.tq table. |
Push
Publishes data to all downstream operators in the pipeline.
.qsp.push[op;md;data]
Parameters:
name | type | description | default |
---|---|---|---|
op | .qsp.op | The current operator configuration. | Required |
md | .qsp.message.metadata | Metadata for the message being emitted from the current asynchronous operator. | Required |
data | .qsp.message.event or .qsp.message.event[] | The data to be published from this async operator. | Required |
sp.push(operator, metadata, data)
Parameters:
name | type | description | default |
---|---|---|---|
operator | OperatorSpecifier | The operator, specified as a configuration dictionary or name. | Required |
metadata | Metadata | Metadata for the message being emitted from the operator. | Required |
data | Any | The data to be pushed to the downstream pipeline. | Required |
Returns:
Publishes data to all downstream operators in the pipeline, and returns data
.
Asynchronous Operators
Should be used from asynchronous operators in a pipeline to continue the flow of data
in the pipeline. It is only required for sp.apply
. Other operators are synchronous and any
data returned will flow to the next operator.
Reset Data Trace
Resets the current data cache state and clears any data that has been captured during a data tracing session.
.qsp.resetDataTrace[]
sp.reset_data_trace()
See Get Data Trace for more details
Reset Record Counts
Resets the current record counts cache, so subsequent data counts begin from zero.
.qsp.resetRecordCounts[]
sp.reset_record_counts()
See Get Record Counts for more details
Run
Runs a pipeline.
.qsp.run[pipe]
Parameters:
name | type | description |
---|---|---|
pipe | #.qsp.pipe | The pipeline to install and run in the current stream processor. |
Multiple Workers
Depending on the deployment configuration, this function can run a pipeline in the current worker process or distribute work across several workers.
sp.run(pipelines)
Parameters:
name | type | description |
---|---|---|
pipelines | Pipeline | The pipeline to install and run in the current stream processor. |
Examples:
Example 1: Running a pipeline that reads from a nullary function, then writes to the console.
>>> from kxi import sp
>>> sp.run(sp.read.from_expr(lambda: range(10)) | sp.write.to_console(timestamp='none'))
0 1 2 3 4 5 6 7 8 9
Example 2: Running multiple pipelines.
>>> from kxi import sp
>>> pipeline_a = sp.read.from_expr(lambda: range(10)) | sp.write.to_console('A) ', timestamp='none')
>>> pipeline_b = sp.read.from_expr(lambda: range(20)) | sp.write.to_console('B) ', timestamp='none')
>>> pipeline_c = sp.read.from_expr(lambda: range(30)) | sp.write.to_console('C) ', timestamp='none')
>>> sp.run(pipeline_a, pipeline_b, pipeline_c)
A) 0 1 2 3 4 5 6 7 8 9
B) 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
C) 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
Example 3: Argument unpacking can be used to run a list/tuple of pipelines.
>>> from kxi import sp
>>> from random import sample
>>> source = sp.read.from_expr(lambda: sample(range(20, 50, 2), 10))
>>> pipelines = (
source | sp.map(lambda x: x * 2) | sp.write.to_console('double ) ', timestamp='none'),
source | sp.write.to_console('original) ', timestamp='none'),
source | sp.map(lambda x: x / 2) | sp.write.to_console('half ) ', timestamp='none'),
)
>>> sp.run(*pipelines)
double ) 68 72 84 52 44 88 96 80 56 48
original) 34 36 42 26 22 44 48 40 28 24
half ) 17 18 21 13 11 22 24 20 14 12
Set Record Counting
Sets the record counting level for tracking dataflow in a pipeline. Seting the level of record counting determines which nodes in the pipeline to count data flow for.
.qsp.setRecordCounting[level]
Parameters:
name | type | description | default |
---|---|---|---|
level | long | Level of record counting. | Required |
Record counting tracks the amount of data flowing through the pipeline. This count stores the sum of the counts of each record for each operator, divided by the records' keys. The operators chosen to perform record counting depends on the record counting level, described below.
The level of record counting determines which nodes in the pipeline to count data flow for.
Available Levels
-
Record counting is disabled for all operators.
-
Count records flowing through readers and writers.
-
Count records flowing through all operators.
The default is level 1.
Changing levels
Changing levels resets the RecordCounts cache.
Examples:
Example 1: Counting records with the default level.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.map[{ select max price from x }]
.qsp.write.toVariable[`output]
publish ([] date: .z.d; sym: 10?3?`3; price:10?100f);
.qsp.getRecordCounts[]
|
--- | --
callback_publish| 10
variable_output | 1
Example 2: Setting record counting to level 2.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.map[{ select max price from x }]
.qsp.write.toVariable[`output]
.qsp.setRecordCounting 2;
publish ([] date: .z.d; sym: 10?3?`3; price:10?100f);
.qsp.getRecordCounts[]
|
--- | --
map | 10
callback_publish| 10
variable_output | 1
Example 3: Counting keyed data using the default count level.
streamA: .qsp.read.fromCallback[`publish1; .qsp.use enlist[`key]!enlist `key]
streamB: .qsp.read.fromCallback[`publish2; .qsp.use enlist[`key]!enlist `key]
.qsp.run
streamA .qsp.union[streamB]
.qsp.write.toConsole[]
publish1 `key`data!(`key1;42)
publish2 `key`data!(`key2;42)
.qsp.getRecordCounts[]
callback_publish1| (,`key1)!,2
callback_publish2| (,`key2)!,2
console | `key1`key2!2 2
sp.set_record_counting(level)
Parameters:
name | type | description | default |
---|---|---|---|
level | long | Level of record counting. | Required |
Available Levels
-
Record counting is disabled for all operators.
-
Count records flowing through readers and writers.
-
Count records flowing through all operators.
The default is level 1.
Changing levels
Changing levels resets the RecordCounts cache.
Set Trace
Enables or disables trace logging. Sets the level of verbosity of trace logging.
.qsp.setTrace[level]
Parameters:
name | type | description | default |
---|---|---|---|
level | long | Level of trace logging to display. | Required |
sp.set_trace(level)
Parameters:
name | type | description | default |
---|---|---|---|
level | long | Level of trace logging to display. | Required |
Available Levels
0: Disable trace logging.
1: Log data that is passed through readers and writers.
2: Log data pushed through buffers.
3: Log operator inputs.
4: Log state operations.
The default is level 0.
Teardown
Tears down a pipeline, removing all state and timers.
.qsp.teardown[]
sp.teardown()
On Teardown action
Any On Teardown handlers or subscribers to teardown
will be called.
Track
Maintains state for a list of in-memory q objects.
.qsp.track[objects]
Parameters:
name | type | description | default |
---|---|---|---|
objects | symbol or symbol[] | The object name(s) to be tracked as state. | Required |
Usage with Checkpoints
When tracking is enabled, at each checkpoint the Stream Processor will persist a copy of the tracked objects. On recovery these objects will be re-initialised before the pipeline begins again at the value corresponding to the last checkpoint.
Examples:
Example 1: Track a single variable.
.qsp.track[`iters]
iters:0
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.map[{iters+:1;x}]
.qsp.write.toConsole[]
Example 2: Track multiple variables.
.qsp.track[`iters`last]
iters:0
last:()
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.map[{iters+:1;last::x;x}]
.qsp.write.toConsole[]
Example 3: Track a namespace.
.qsp.track[`.info] Track anything in the .info namespace
.info.iters:0
.info.last:()
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.map[{.info.iters+:1;.info.last::x;x}]
.qsp.write.toConsole[]
sp.track(objects)
This persists their state across application restarts.
Parameters:
name | type | description | default |
---|---|---|---|
symbols | Union[List[Union[str, kx.SymbolAtom]], str, kx.SymbolAtom] | List of strings denoting the q object(s) to track in the q memory space, e.g. a single variable name, list of variables or a q namespace. | Required |
Usage with Checkpoints
When tracking is enabled, at each checkpoint the Stream Processor will persist a copy of the tracked objects. On recovery these objects will be re-initialised before the pipeline begins again at the value corresponding to the last checkpoint.
Examples:
Example 1: Track a single in-memory q object.
>>> from kxi import sp
>>> import pykx as kx
>>> kx.q['numReadings'] = kx.toq(0, ktype=kx.LongAtom)
>>> def transform(data):
kx.q['numReadings'] += kx.IntAtom(1)
return data
>>> sp.track('numReadings')
>>> sp.run(sp.read.from_callback('publish', name='callback')
| sp.map(transform)
| sp.write.to_console())
>>> kx.q('publish', kx.Table(data = {
'time':[kx.TimestampAtom('now')],
'sym':kx.random.random(1, '2'),
'reading':kx.random.random(1,10.0)}))
>>> kx.q('numReadings')
1
Example 2: Track multiple in-memory q objects.
>>> from kxi import sp
>>> import pykx as kx
>>> schema = {'time': kx.TimestampAtom, 'sym': kx.SymbolAtom, 'reading': kx.FloatAtom}
>>> reading = kx.schema.builder(schema, key='sym')
>>> kx.q['reading'] = reading
>>> kx.q['numReadings'] = kx.toq(0, ktype=kx.LongAtom)
>>> def transform(data):
kx.q['numReadings'] += kx.IntAtom(1)
kx.q.upsert('reading', data)
return data
>>> sp.track(['reading', 'numReadings'])
>>> sp.run(sp.read.from_callback('publish', name='callback')
| sp.map(transform)
| sp.write.to_console())
>>> kx.q('publish', kx.Table(data = {
'time':[kx.TimestampAtom('now')],
'sym':kx.random.random(1, '2'),
'reading':kx.random.random(1,10.0)}))
>>> kx.q('(numReadings; reading)')
1
(+(,`sym)!,,`ip)!+`time`reading!(,2024.09.25D13:48:43.677502778;,0.9895414)
Example 3: Track variables in a q namespace.
>>> from kxi import sp
>>> import pykx as kx
>>> schema = {'time': kx.TimestampAtom, 'sym': kx.SymbolAtom, 'reading': kx.FloatAtom}
>>> reading = kx.schema.builder(schema, key='sym')
>>> kx.q['.rd.reading'] = reading
>>> kx.q['.rd.numReadings'] = kx.toq(0, ktype=kx.LongAtom)
>>> def transform(data):
kx.q['.rd.numReadings'] += kx.IntAtom(1)
kx.q.upsert('.rd.reading', data)
return data
>>> sp.track('.rd')
>>> sp.run(sp.read.from_callback('publish', name='callback')
| sp.map(transform)
| sp.write.to_console())
>>> kx.q('publish', kx.Table(data = {
'time':[kx.TimestampAtom('now')],
'sym':kx.random.random(1, '2'),
'reading':kx.random.random(1,10.0)}))
>>> kx.q('(.rd.numReadings; .rd.reading)')
1
(+(,`sym)!,,`ip)!+`time`reading!(,2024.09.25D13:48:43.677502778;,0.9895414)
Trigger Write
Triggers batch pipeline writer(s).
.qsp.triggerWrite[]
Parameters:
name | type | description |
---|---|---|
opIDs | symbol, symbol[] or null | List of operators (or operator IDs) to trigger write. If left empty all writers triggered. |
For non-streaming writer operators, triggers a write. For streaming writer operators this will have no effect, e.g. for the direct write mode of Database Writer.
Returns:
Type | Description |
---|---|
null |
Examples:
Example 1: Create a writer named dbWriter
with directWrite
enabled.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.v2.write.toDatabase[`trade; `$":db-one"; .qsp.use `name`directWrite]!(`dbWriter;1b)]
Example 2: Publish some data, then trigger a write down of the trade table.
publish[([] sym:`KX`KX; timestamp: 2#.z.p; price:100.1 99.9; size: 100 200)]
.qsp.triggerWrite[]
Example 3: Explicitly trigger a write down for the writer named dbWriter
.
.qsp.triggerWrite[enlist `dbWriter]
sp.trigger_write()
Parameters:
name | type | description | default |
---|---|---|---|
operatorIDs | List of operators (or operator IDs) to trigger write. If left empty all writers triggered. | Not required |
Triggers a write for non streaming writer operators. For streaming writer operators this will have no effect.
Examples:
Example 1: Trigger a write-down using sp.write.to_database v2 in direct write mode.
The writer name can be explicitly set using the operator name
parameter.
To retrieve the name of an existing operator use the /pipeline/{id}/describe
REST API.
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish')
| sp.write.to_database(table='tableName', database='dbName',
directWrite=True, api_version=2, name='dbWriter'))
>>> data = pd.DataFrame({
'name': ['a', 'b', 'c'],
'id': [1, 2, 3],
'quantity': [4, 5, 6]
})
>>> kx.q('publish', data)
# trigger all writers (only db writer here anyway).
>>> sp.trigger_write()
# Explicitly trigger specific writer(s).
>>> sp.trigger_write(["dbWriter"])
Trigger Read
You can configure pull reader(s) for triggering. Configured readers have their read behavior changed from one shot reads, to reading on a user provided trigger. The following shows the q and python trigger APIs. A REST API is also available.
For pull reader operators (configured for triggering), triggers another read. For non pull readers or pull readers not configured for triggering this has no effect.
.qsp.triggerRead[]
Parameters:
name | q type | description |
---|---|---|
opIDs | symbol, symbol[] or null | A list of operators (or operator IDs) to trigger read. If left empty all readers are triggered. |
Returns:
Type |
---|
null |
Example:
This pipeline will not execute anything on startup. Once the trigger API is called, the expression reader will execute the expression and push data down the pipeline. If it is called again it will execute again and push.
.qsp.run
pipe: .qsp.read.fromExpr[{"sym,price\nKX,100\n"}; .qsp.use `trigger`name!(`api; `expr)]
.qsp.decode.csv[]
.qsp.write.toVariable[`.test.cache]
If you then want to execute the expression and push downstream, you can run
// No operators provided as an argument so all readers (configured for triggering) are triggered.
// In this case only a single reader configured for triggering.
.qsp.triggerRead[]
or to explicitly trigger the expression reader call the API with operator ID/name you want triggered:
.qsp.triggerRead[`expr]
sp.trigger_read()
Parameters:
name | type | description | default |
---|---|---|---|
operatorIDs | A list of operators (or operator IDs) to trigger read. If left empty all pull readers are triggered. | Not required |
Returns:
Type |
---|
null |
Example:
Trigger a single read using an expression reader and sp.trigger_read
.
The reader will not output any data until triggered. When more than one reader is
present, specific readers can be triggered individually using the
operator's name
, which can be explicitly set using the operator's
name
parameter when it is defined. To retrieve the name of an
existing operator use the /pipeline/{id}/describe REST API.
>>> import pykx as kx
>>> from kxi import sp
>>> expr = '"sym,price\nKX,100\n"'
>>> sp.run(sp.read.from_expr(expr, name='myExpr', trigger='api')
| sp.decode.csv('')
| sp.write.to_variable('.test.cache'))
# trigger all readers (only a single trigger configured reader anyway).
>>> sp.trigger_read()
# Explicitly trigger specific reader(s).
>>> sp.trigger_read(['myExpr'])