General
.qsp. track maintain state for a list of in-memory q objects configPath return the path of mounted configuration files getPartitions return the current assigned partitions getPartitionCount return the count of all partitions push publish data to all downstream operators run install and run a pipeline teardown tear down a pipeline triggerWrite trigger a writedown setTrace enables trace logging clearTrace clears trace logging and resets logging level enableDataTracing captures data flowing in a pipeline disableDataTracing disables data tracing in a pipeline resetDataTrace resets the current trace data cache clearDataTrace resets the current trace data cache (deprecated) getDataTrace returns a point-in-time data trace capture getSchema returns the provided schema from an assembly timer.add adds or updates a timer event timer.add1shot adds or updates a one-shot timer event. timer.del deletes one or more timer events timer.get retrieves the properties of one or more timer events. setRecordCounting (Beta) sets the level for tracking dataflow in a pipeline resetRecordCounts (Beta) resets the current record counts cache getRecordCounts (Beta) returns information on the amount of dataflow
Stream Processor metadata is exposed across many APIs, such as state and pushing data further in a pipeline within asynchronous nodes. A pipeline operator's metadata is a dictionary containing the following keys.
Metadata Keys
These keys will be extended as new capabilities are added to the Stream Processor engine.
- key
-
An optional partition key for the events in this message. This is used to group related collections of data based on a common attribute. Events that do not share the same key are considered independent and may be processed independently of other keys. (any type)
- window
-
Indicates the start time or index of a window of data (temporal)
- offset
-
An event offset is a number or time that represents the current message offset to the stream reader. (int or temporal)
This value will be used to recover data if messages are lost downstream. If messages need to be replayed, the message offset will be used to indicate the last successfully processed message.
.qsp.getSchema
Loads a schema from a mounted Assembly returning a schema object.
Parameter:
Name | Type | Description |
---|---|---|
name | symbol | string | The table name to read from the assembly. |
Returns:
Type | Description |
---|---|
#.qsp.schema |
.qsp.track
Maintain state for a list of in-memory q objects
.qsp.track[objects]
Parameters:
name | q type | description | default |
---|---|---|---|
objects | symbol or symbol[] | The object name(s) to be tracked as state. | Required |
When tracking is enabled, at each checkpoint the Stream Processor will persist a copy of the tracked objects. On recovery these objects will be re-initialised before the pipeline begins again at the value corresponding to the last checkpoint.
Track a single variable:
.qsp.track[`iters]
iters:0
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.map[{iters+:1;x}]
.qsp.write.toConsole[]
Track multiple variables:
.qsp.track[`iters`last]
iters:0
last:()
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.map[{iters+:1;last::x;x}]
.qsp.write.toConsole[]
Track a namespace:
.qsp.track[`.info] // Track anything in the .info namespace
.info.iters:0
.info.last:()
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.map[{.info.iters+:1;.info.last::x;x}]
.qsp.write.toConsole[]
.qsp.push
Publish data to all downstream operators in the pipeline
.qsp.push[op;md;data]
Parameters:
name | q type | description |
---|---|---|
op | .qsp.op | The current operator configuration. |
md | .qsp.message.metadata | Metadata for the message being emitted from the current asynchronous operator. |
data | .qsp.message.event or .qsp.message.event[] | The data to be published from this async operator. |
publishes data to all downstream operators in the pipeline, and returns data
.
Should be used from asynchronous operators in a pipeline to continue the flow of data
in the pipeline. It is only required for .qsp.apply
. Other operators are synchronous and any
data returned will flow to the next operator.
.qsp.run
Install and run a pipeline
.qsp.run[pipe]
Parameters:
name | q type | description |
---|---|---|
pipe | #.qsp.pipe | The pipeline to install and run in the current stream processor. |
Returns generic null.
Depending on the deployment configuration, this function can run a pipeline in the current worker process or distribute work across several workers.
.qsp.teardown
Tear down a pipeline
.qsp.teardown[]
Tears down a pipeline, removing all state and timers; returns generic null.
Any onTeardown
(using .qsp.onTeardown
) handlers
or subscribers to teardown
will be called.
.qsp.triggerWrite
Trigger batch pipeline writer(s)
.qsp.triggerWrite[]
Parameters:
name | q type | description |
---|---|---|
opIDs | symbol[] or null | List of operators (or operator IDs) to trigger write. If left empty all writers triggered. |
For non-streaming writer operators, triggers a write. For streaming writer operators this will have no effect, e.g. for the direct write mode of .qsp.write.toDatabase.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.v2.write.toDatabase[`trade; `$":db-one"; .qsp.use `name`directWrite]!(`dbWriter;1b)]
Publish some data, then trigger a write down of the trade table:
publish[([] sym:`KX`KX; timestamp: 2#.z.p; price:100.1 99.9; size: 100 200)]
.qsp.triggerWrite[]
Or to be explicit:
.qsp.triggerWrite[enlist `dbWriter]
Returns:
Type | Description |
---|---|
null |
.qsp.getPartitions
Get assigned partitions
.qsp.getPartitions[]
Return a list of the partitions assigned to this worker. This can be used for subsetting a stream.
.qsp.getPartitionCount
Get count of all partitions
.qsp.getPartitionCount[]
Return the count of all partitions in the current stream. This can be combined with getPartitions[]
to subset a stream.
.qsp.configPath
Returns the location of user mounted configurations
.qsp.configPath[]
.qsp.configPath[object]
Parameters:
name | q type | description | default |
---|---|---|---|
object | string | The configuration object to get the path of. | The KXI_SP_CONFIG_PATH environment variable |
This is mostly useful in Kubernetes deployments to access the path of ConfigMaps and Secrets. Passing the name of the ConfigMap or Secret to this function will return the path where it was mounted.
The mount path can be changed by setting KXI_SP_CONFIG_PATH
to a directory on the
target deployment.
.qsp.setTrace
Enables program tracing logs
.qsp.setTrace[level]
Parameters:
name | q type | description | default |
---|---|---|---|
level | long | Level of trace logging to display. | Required |
Level of verbosity of trace logging. The following levels are available:
- Clears trace logging.
- Log data that is passed through readers and writers.
- Log data pushed through buffers.
- Log operator inputs.
- Log state operations.
The default is level 0.
.qsp.clearTrace
Disable program tracing logs
.qsp.clearTrace[]
.qsp.enableDataTracing
Capture data outputs as they flow though a pipeline
.qsp.enableDataTracing[]
Data tracing captures data that is flowing in the streaming pipeline. This inserts probes that cache the last value emitted by each operator in the pipeline. Writer operators capture the input presented to the writer. If a given operator has an error, the error is also captured and where the input is synchronous, the data is the input to the operator.
Performance implications
Adding data capture to a pipeline may have an impact on the pipeline performance. Data tracing should be reserved for debugging purposes and not used in production deployments where possible.
Enable data tracing:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.map[{ select max price from x }]
.qsp.write.toVariable[`output]
.qsp.enableDataTracing[];
publish ([] date: .z.d; sym: 10?3?`3; price:10?100f);
.qsp.getDataTrace[]
| error metadata data
----------------| ------------------------------------------------------------------..
callback_publish| "" (,`)!,:: +`date`sym`price!(2022.06.06 2022.06.06 2022.06.06 ..
map | "" (,`)!,:: +(,`price)!,,95.97684
variable_output | "" (,`)!,:: +(,`price)!,,95.97684
See .qsp.getDataTrace
for more details
.qsp.disableDataTracing
Disables data tracing in the current pipeline
.qsp.disableDataTracing[]
Disables data tracing from the current pipeline. This does not clear any captured
trace data. Data captured during tracing can still be accessed via .qsp.getDataTrace
.
See .qsp.enableDataTracing
for more details
.qsp.resetDataTrace
Resets the current data cache state
.qsp.resetDataTrace[]
Clears any data that has been captured during a data tracing session.
See .qsp.getDataTrace
for more details
.qsp.clearDataTrace
Deprecated function that will be removed in a future release.
Refer to the documentation for .qsp.resetDataTrace
instead.
.qsp.getDataTrace
Returns the data from data trace
.qsp.getDataTrace[]
When data tracing is enabled, getDataTrace
returns a point in time snapshot of the
last data values emitted by each node in the pipeline. The return is a dictionary of operator
IDs to their respective data or errors. If a node has an error message, any data that
is captured is the last input to that operator that caused the error.
See .qsp.enableDataTracing
for more details
.qsp.setRecordCounting
.qsp.setRecordCounting[level]
Parameters:
name | q type | description | default |
---|---|---|---|
level | long | Level of record counting. | Required |
Record counting tracks the amount of data flowing through the pipeline. This count stores the sum of the counts of each record for each operator, divided by the records' keys. The operators chosen to perform record counting depends on the record counting level, described below.
The level of record counting determines which nodes in the pipeline to count data flow for. The following levels are available:
- Record counting is disabled for all operators.
- Count records flowing through readers and writers.
- Count records flowing through all operators.
The default is level 1.
Note: Changing levels resets the RecordCounts cache.
Counting records with the default level:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.map[{ select max price from x }]
.qsp.write.toVariable[`output]
publish ([] date: .z.d; sym: 10?3?`3; price:10?100f);
.qsp.getRecordCounts[]
|
--- | --
callback_publish| 10
variable_output | 1
Setting record counting to level 2:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.map[{ select max price from x }]
.qsp.write.toVariable[`output]
.qsp.setRecordCounting 2;
publish ([] date: .z.d; sym: 10?3?`3; price:10?100f);
.qsp.getRecordCounts[]
|
--- | --
map | 10
callback_publish| 10
variable_output | 1
Counting keyed data using the default count level:
streamA: .qsp.read.fromCallback[`publish1; .qsp.use enlist[`key]!enlist `key]
streamB: .qsp.read.fromCallback[`publish2; .qsp.use enlist[`key]!enlist `key]
.qsp.run
streamA .qsp.union[streamB]
.qsp.write.toConsole[]
publish1 `key`data!(`key1;42)
publish2 `key`data!(`key2;42)
.qsp.getRecordCounts[]
callback_publish1| (,`key1)!,2
callback_publish2| (,`key2)!,2
console | `key1`key2!2 2
.qsp.resetRecordCounts
.qsp.resetRecordCounts[]
Resets the current record counts cache, so subsequent data counts begin from zero.
See .qsp.getRecordCounts
for more details
.qsp.getRecordCounts
.qsp.getRecordCounts[]
When record counting is enabled, getRecordCounts
returns information on the total amount of
data that has flowed through the pipeline since enabled or since the cache was last reset.
The return is a dictionary of operator IDs to their respective counts, where counts
are partitioned into stream keys. The operators tracked depends on the record counting level.
See .qsp.enableRecordCounting
for more details
.qsp.timer.add
Adds or updates a frequent timer event.
Parameters:
Name | Type | Description |
---|---|---|
id | symbol | Specifies the timer ID. If this ID exists, it is replaced. |
x | list | Specifies the expression to execute (typically a function name followed by its parameters). |
per | int | timespan | Specifies the timer period, as either a value in milliseconds or a timespan. If this value is a 2-element vector, an exponential backoff is applied to repeated invocations up to the maximum period specified by the second element. |
ofs | int | timespan | Specifies the offset to the first run, as either a value in milliseconds or a timespan. If this value is zero, the timer is scheduled for the next timer run ("immediate"). .tm.nextt can be used to synchronize the timer event with a specific point in time. |
Returns:
Type | Description |
---|---|
null |
.qsp.timer.add1shot
Adds or updates a once-off timer event.
Parameters:
Name | Type | Description |
---|---|---|
id | symbol | Specifies the timer ID. If this ID exists, it is replaced. |
x | list | Specifies the expression to execute (typically a function name followed by its parameters). |
ofs | int | timespan | Specifies the offset to run start, as either a value in milliseconds or a timespan. .tm.nextt can be used to synchronize the timer event with a specific point in time. |
Returns:
Type | Description |
---|---|
null |
.qsp.timer.del
Deletes one or more timer events.
Parameter:
Name | Type | Description |
---|---|---|
ids | symbol | symbol[] | Specifies the timer IDs to delete. |
Returns:
Type | Description |
---|---|
null |
.qsp.timer.get
Retrieves the properties of one or more timer events.
Parameter:
Name | Type | Description |
---|---|---|
ids | symbol | symbol[] | null | Specifies the timer IDs to retrieve. If this value is null, all timer events are returned. |
Returns:
Type | Description |
---|---|
table | Relevant entries in .tm.tq table |