Real-time conditional analytics¶
Introduction¶
Real-time conditional analytics offer an easily configurable and auto-scalable framework for calculating basic aggregations on filtered real-time data. For example, the average trade price of a symbol for which the traded volume was greater than a threshold limit.
The framework is designed to be a streamlined, pipeline-integrated, use case-specific version of the realtime udf framework
Use cases¶
Some sample use cases for the framework are:
- Alerting triggers for threshold breaches
- Generate signals if a value remains too long above threshold
- Custom data source aggregations
- For non out-of-the box data sources, a summary engine can be created
- Custom filter aggregations
- Calculate on a constrained set of data
- Data subset calculations
- Only calculate statistics of a small set of larger instrument universe
Process structure¶
The conditional analytics engines present themselves as a pool of processes per pipeline. The number of instances is defined by the pipeline YAML along with the configuration defined below. The engines publish back to the pipeline's Tickerplant, meaning that results are available through the RDB and persisted into the HDB.
Configuration¶
The framework is set up with a single configuration parameter that has per pipeline overrides .daas.cfg.realtimeConditionalAnalytics. The pipeline for which the config applies to is notified by a colon separation; i.e. .daas.cfg.realtimeConditionalAnalytics:myPipelineName
You manage this configuration via the Refinery CLI. See current configuration with
refinery configuration --show --param .daas.cfg.realtimeConditionalAnalytics:myPipelineName
Import new file with
refinery configuration --import --param .daas.cfg.realtimeConditionalAnalytics:myPipelineName --file <new file>
Config structure¶
| Parameter | Type | Description | Example |
|---|---|---|---|
| analyticName | symbol | Name to reference the calculation by on subscription/query | myGatedAggregation |
| table | symbol | Name of table to aggregate data from | eqTrade |
| identifiers | symbol list | Identifiers to apply the aggregation to | `VOD.L`BARC.L |
| analytic | mixed list | kdb parse tree for aggregation | (avg;`price) |
| filter | mixed list | kdb parse tree for filter | (>;`volume;1000) |
| period | integer | The size of the period over which the aggregation occurs | 1 |
| periodUnit | symbol | The unit for the period over which the aggregation occurs | day |
| isMovingWindow | boolean | If true, the period is a rolling lookback current tick time | 1b |
| periodStartTime | timestamp | If isMovingWindow is false, can set start point of period buckets | 08:00:00.000 |
| procNum | integer | If manually scaling processes, the process ID to run the analytic on | 0 |
Notes:¶
-
The identifiers parameter may be left blank if there are no identifiers (i.e. in a pivoted data set) or if the calculation is not done on a by sym basis.
-
The identifiers parameter may be null symbol
`if the aggregation applies to all symbols. -
The
analyticparameter is structured as per the mixed list format in the getStats `analytics -
The
filterparameter is structured as per the mixed list format in getTicks `applyFilter -
If the source table is in a tall (sym,time,value) format, it must be set up for pivoting to be used in the realtime conditional analytics framework
-
The period cannot be shorter than the Tickerplant batching time
Examples¶
Consider the inbound tick data trade:
| Time | Sym | Price | Volume |
|---|---|---|---|
| 09:59:55 | VOD.L | 117 | 200 |
| 09:59:56 | BARC.L | 105 | 1000 |
| 09:59:57 | VOD.L | 119 | 25 |
| 09:59:58 | VOD.L | 119 | 125 |
| 09:59:59 | VOD.L | 120 | 150 |
| 10:00:00 | VOD.L | 118 | 10 |
| 10:00:01 | BARC.L | 105 | 1000 |
| 10:00:02 | VOD.L | 118 | 200 |
Basic unfiltered aggregation¶
First we run with the configuration
| analyticName | table | identifiers | analytic | filter | period | periodUnit | isMovingWindow | periodStartTime | procNum |
|---|---|---|---|---|---|---|---|---|---|
| vodCount | eqTrade | `VOD.L |
(count;`sym) |
() |
1 | day | 0b | 00:00:00.000 | 0 |
In this configuration, we count the ticks of `VOD.L with no filter. This is set to be a daily aggregation, which would give the output
| time | analyticName | sym | value |
|---|---|---|---|
| 09:59:55 | vodCount | VOD.L | 1 |
| 09:59:57 | vodCount | VOD.L | 2 |
| 09:59:58 | vodCount | VOD.L | 3 |
| 09:59:59 | vodCount | VOD.L | 4 |
| 10:00:00 | vodCount | VOD.L | 5 |
| 10:00:02 | vodCount | VOD.L | 6 |
Basic unfiltered hourly aggregation¶
Now we change the period and periodUnit settings to be per hour
| analyticName | table | identifiers | analytic | filter | period | periodUnit | isMovingWindow | periodStartTime | procNum |
|---|---|---|---|---|---|---|---|---|---|
| vodCount | trade | `VOD.L |
(count;`sym) |
() |
1 | hour | 0b | 00:00:00.000 | 0 |
In this setup, we count the ticks of `VOD.L with no filter until the hour is hit then reset, which would give the output
| time | analyticName | sym | value |
|---|---|---|---|
| 09:59:55 | vodCount | VOD.L | 1 |
| 09:59:57 | vodCount | VOD.L | 2 |
| 09:59:58 | vodCount | VOD.L | 3 |
| 09:59:59 | vodCount | VOD.L | 4 |
| 10:00:00 | vodCount | VOD.L | 1 |
| 10:00:02 | vodCount | VOD.L | 2 |
Filtered hourly aggregation¶
Now we set the filter parameter to be (>;`volume;100).
| analyticName | table | identifiers | analytic | filter | period | periodUnit | isMovingWindow | periodStartTime | procNum |
|---|---|---|---|---|---|---|---|---|---|
| vodCount | trade | `VOD.L |
(count;`sym) |
(>;`volume;100) |
1 | hour | 0b | 00:00:00.000 | 0 |
The result set becomes
| time | analyticName | sym | value |
|---|---|---|---|
| 09:59:55 | vodCount | VOD.L | 1 |
| 09:59:58 | vodCount | VOD.L | 2 |
| 09:59:59 | vodCount | VOD.L | 3 |
| 10:00:02 | vodCount | VOD.L | 1 |
Here we have ignored ticks that do not satisfy our condition
Multi-aggregation setup¶
We now add a second line to the configuration
| analyticName | table | identifiers | analytic | filter | period | periodUnit | isMovingWindow | periodStartTime | procNum |
|---|---|---|---|---|---|---|---|---|---|
| vodCount | trade | `VOD.L |
(count;`sym) |
(>;`volume;100) |
1 | hour | 0b | 00:00:00.000 | 0 |
| sumPrice | trade | `VOD.L`BARC.L |
(sum;`price) |
(>;`volume;100) |
2 | hour | 0b | 00:00:00.000 | 0 |
This would give the output
| time | analyticName | sym | value |
|---|---|---|---|
| 09:59:55 | sumPrice | VOD.L | 117 |
| 09:59:55 | vodCount | VOD.L | 1 |
| 09:59:56 | sumPrice | BARC.L | 105 |
| 09:59:58 | sumPrice | VOD.L | 236 |
| 09:59:58 | vodCount | VOD.L | 2 |
| 09:59:59 | sumPrice | VOD.L | 356 |
| 09:59:59 | vodCount | VOD.L | 3 |
| 10:00:01 | sumPrice | BARC.L | 105 |
| 10:00:02 | sumPrice | VOD.L | 118 |
| 10:00:02 | vodCount | VOD.L | 2 |
Both analytics are calculated, with the sumPrice analytic applying to both the VOD.L and BARC.L symbols. Here our sumPrice analytic operated on a 2-hour basis, but as it counts from midnight this means the interval was satisfied by the 10 am rollover point
Use of periodStart time¶
The periodStartTime parameter is used if you wish to capture particular buckets. For example, consider the following configuration
| analyticName | table | identifiers | analytic | filter | period | periodUnit | isMovingWindow | periodStartTime | procNum |
|---|---|---|---|---|---|---|---|---|---|
| dailyVolume | trade | ` |
(sum;`volume) |
(>;`volume;100) |
8 | hour | 0b | 0 |
With no periodStartTime defined, this will default to 00:00:00.000. Which means the aggregation intervals will be
00:00:00.000
08:00:00.000
16:00:00.000
However, if you were only interested in aggregating across the trading session that runs 09:00 - 17:00, you would set the configuration to
| analyticName | table | identifiers | analytic | filter | period | periodUnit | isMovingWindow | periodStartTime | procNum |
|---|---|---|---|---|---|---|---|---|---|
| dailyVolume | trade | ` |
(sum;`volume) |
(>;`volume;100) |
8 | hour | 0b | 09:00:00.000 | 0 |
Giving new buckets at
01:00:00.000
09:00:00.000
17:00:00.000
Note: Setting the periodStartTime to 01:00:00.000 would achieve the same result as the intervals will be applied backwards in time as well as forwards.
Using isMovingWindow parameter¶
By setting the isMovingWindow parameter to true, the calculations will now aggregate over a rolling window of the last period.
Consider the following:
| analyticName | table | identifiers | analytic | filter | period | periodUnit | isMovingWindow | periodStartTime | procNum |
|---|---|---|---|---|---|---|---|---|---|
| vodCountInterval | trade | `VOD.L |
(count;`sym ) |
(>;`volume;100) |
1 | hour | 0b | 00:00:00.000 | 0 |
| vodCountLookback | trade | `VOD.L |
(count;`sym ) |
(>;`volume;100) |
1 | hour | 1b | 0 |
Here vodCountInterval will calculate how many prints happen in each hour bucket. vodCountLookback will calculate how many prints have happened in the last hour. For example, if we had a satisfying tick every 15 minutes:
| time | sym | price | volume |
|---|---|---|---|
| 09:00:00 | VOD.L | 117 | 200 |
| 09:15:00 | VOD.L | 105 | 1000 |
| 09:30:00 | VOD.L | 119 | 1000 |
| 09:45:00 | VOD.L | 119 | 1000 |
| 10:00:00 | VOD.L | 120 | 1000 |
| 10:15:00 | VOD.L | 118 | 1000 |
| 10:30:00 | VOD.L | 105 | 1000 |
| 10:45:00 | VOD.L | 118 | 200 |
| 11:00:00 | VOD.L | 118 | 200 |
The output of the conditionalAnalytics will look like
| time | analyticName | sym | value |
|---|---|---|---|
| 09:00:00 | Interval | VOD.L | 1 |
| 09:00:00 | Lookback | VOD.L | 1 |
| 09:15:00 | Interval | VOD.L | 2 |
| 09:15:00 | Lookback | VOD.L | 2 |
| 09:30:00 | Interval | VOD.L | 3 |
| 09:30:00 | Lookback | VOD.L | 3 |
| 09:45:00 | Interval | VOD.L | 4 |
| 09:45:00 | Lookback | VOD.L | 4 |
| 10:00:00 | Interval | VOD.L | 1 |
| 10:00:00 | Lookback | VOD.L | 4 |
| 10:15:00 | Interval | VOD.L | 2 |
| 10:15:00 | Lookback | VOD.L | 4 |
| 10:30:00 | Interval | VOD.L | 3 |
| 10:30:00 | Lookback | VOD.L | 4 |
| 10:45:00 | Interval | VOD.L | 4 |
| 10:45:00 | Lookback | VOD.L | 4 |
| 11:00:00 | Interval | VOD.L | 1 |
| 11:00:00 | Lookback | VOD.L | 4 |
As the Lookback is aggregating over a trailing 1-hour period, once one hour of data is collected it remains at 4. The interval analytic resets on each hourly boundary.
Condition duration thresholds¶
These thresholds are useful in alert generation when tracking how long a value is within or exceeds a given range. You configure these thresholds using the same conditional analytics config; however, they behave slightly differently.
Configuration¶
To configure a duration threshold, set the analytic to `duration. period, periodUnit and periodStartTime must be null and isMovingWindow is ignored.
| analyticName | table | identifiers | analytic | filter | period | periodUnit | isMovingWindow | periodStartTime | procNum |
|---|---|---|---|---|---|---|---|---|---|
| price_over_100 | trade | `VOD.L |
`duration |
(>;100;`price) |
0b | 0 |
Behavior¶
The behavior is slightly different from other conditional analytics, as it does not operate over a set period; hence you do not specify a period. This means that for each occurrence of a true filter match, the duration is accumulated and published. As soon as the filter is false, this accumulation is reset to zero.
This means that the output of the conditional analytic in this case is the time duration of each distinct event in which the threshold condition is satisfied.
Output¶
To demonstrate this behavior, consider the following tick input
| time | sym | price |
|---|---|---|
| 12:00:00 | VOD.L | 80 |
| 12:00:01 | VOD.L | 120 |
| 12:00:02 | VOD.L | 125 |
| 12:00:03 | VOD.L | 130 |
| 12:00:04 | VOD.L | 90 |
| 12:00:05 | VOD.L | 110 |
| 12:00:06 | VOD.L | 120 |
The conditional analytics output will now be
| time | analyticName | sym | duration |
|---|---|---|---|
| 12:00:01 | price_over_100 | VOD.L | 00:00:00 |
| 12:00:02 | price_over_100 | VOD.L | 00:00:01 |
| 12:00:03 | price_over_100 | VOD.L | 00:00:02 |
| 12:00:05 | price_over_100 | VOD.L | 00:00:00 |
| 12:00:06 | price_over_100 | VOD.L | 00:00:01 |
In this example, at 12:00:04 the duration was reset to 0 as the tick came in below price 100. If an alert engine was checking for when price was over 100 for 30s, it would check the output of the conditionalAnalytics table for a value of >00:00:30.
Process scaling and the procNo parameter¶
The distribution of conditional analytics per instance is user defined. The instances: number in the pipeline YAML configuration must match the distinct list of procNo in the accompanying .daas.cfg.realtimeConditionalAnalytics configuration.
For example, if the YAML file has the following configuration
cae:
instances:4
Then the procNo in the realtimeConditionalAnalytics config must be of the set 0 1 2 3
Accessing historical conditional analytics¶
Ticks¶
Previously calculated conditional analytics are available through getTicks and the `conditionalAnalytics dataType. For example, to view all conditional analytics for equities for today you would call
getTicks[symList`assetClass`dataType`startDate`endDate!(`;`equity;`conditionalAnalytics;.z.d;.z.d)]
Extracting bucketed format results¶
For aggregations that are configured as buckets, you can view the bucket result in a getStats format. For example, with the real-time conditional analytic configuration
| analyticName | table | identifiers | analytic | filter | period | periodUnit | isMovingWindow | periodStartTime | procNum |
|---|---|---|---|---|---|---|---|---|---|
| hundredMinAvg | trade | ` |
(avg;`price) |
(>;`volume;100) |
1 | minute | 0b | 00:00:00.000 | 0 |
Whilst this configuration will produce an output on every tick, the final tick in each minute bucket represents the aggregation across the whole minute. Therefore, you can retrieve aggregations for each minute bucket with the getStats call and the last operator.
getStats[`symList`assetClass`dataType`startDate`endDate`granularityUnit`granularity!(`;`equity;`conditionalAnalytics;.z.d;.z.d;(last;`hundredMinAvg);`minute;1)]