Real-time conditional analytics

Introduction

Real-time conditional analytics offer an easily configurable and auto-scalable framework for calculating basic aggregations on filtered real-time data. For example, the average trade price of a symbol for which the traded volume was greater than a threshold limit.

The framework is designed to be a streamlined, pipeline-integrated, use case-specific version of the realtime udf framework

Use cases

Some sample use cases for the framework are:

  • Alerting triggers for threshold breaches
    • Generate signals if a value remains too long above threshold
  • Custom data source aggregations
    • For non out-of-the box data sources, a summary engine can be created
  • Custom filter aggregations
    • Calculate on a constrained set of data
  • Data subset calculations
    • Only calculate statistics of a small set of larger instrument universe

Process structure

The conditional analytics engines present themselves as a pool of processes per pipeline. The number of instances is defined by the pipeline YAML along with the configuration defined below. The engines publish back to the pipeline's Tickerplant, meaning that results are available through the RDB and persisted into the HDB.

Configuration

The framework is set up with a single configuration parameter that has per pipeline overrides .daas.cfg.realtimeConditionalAnalytics. The pipeline for which the config applies to is notified by a colon separation; i.e. .daas.cfg.realtimeConditionalAnalytics:myPipelineName

You manage this configuration via the Refinery CLI. See current configuration with

refinery configuration --show --param .daas.cfg.realtimeConditionalAnalytics:myPipelineName 

Import new file with

refinery configuration --import --param .daas.cfg.realtimeConditionalAnalytics:myPipelineName --file <new file>

Config structure

Parameter Type Description Example
analyticName symbol Name to reference the calculation by on subscription/query myGatedAggregation
table symbol Name of table to aggregate data from eqTrade
identifiers symbol list Identifiers to apply the aggregation to `VOD.L`BARC.L
analytic mixed list kdb parse tree for aggregation (avg;`price)
filter mixed list kdb parse tree for filter (>;`volume;1000)
period integer The size of the period over which the aggregation occurs 1
periodUnit symbol The unit for the period over which the aggregation occurs day
isMovingWindow boolean If true, the period is a rolling lookback current tick time 1b
periodStartTime timestamp If isMovingWindow is false, can set start point of period buckets 08:00:00.000
procNum integer If manually scaling processes, the process ID to run the analytic on 0

Notes:

  • The identifiers parameter may be left blank if there are no identifiers (i.e. in a pivoted data set) or if the calculation is not done on a by sym basis.

  • The identifiers parameter may be null symbol ` if the aggregation applies to all symbols.

  • The analytic parameter is structured as per the mixed list format in the getStats `analytics

  • The filter parameter is structured as per the mixed list format in getTicks `applyFilter

  • If the source table is in a tall (sym,time,value) format, it must be set up for pivoting to be used in the realtime conditional analytics framework

  • The period cannot be shorter than the Tickerplant batching time

Examples

Consider the inbound tick data trade:

Time Sym Price Volume
09:59:55 VOD.L 117 200
09:59:56 BARC.L 105 1000
09:59:57 VOD.L 119 25
09:59:58 VOD.L 119 125
09:59:59 VOD.L 120 150
10:00:00 VOD.L 118 10
10:00:01 BARC.L 105 1000
10:00:02 VOD.L 118 200

Basic unfiltered aggregation

First we run with the configuration

analyticName table identifiers analytic filter period periodUnit isMovingWindow periodStartTime procNum
vodCount eqTrade `VOD.L (count;`sym) () 1 day 0b 00:00:00.000 0

In this configuration, we count the ticks of `VOD.L with no filter. This is set to be a daily aggregation, which would give the output

time analyticName sym value
09:59:55 vodCount VOD.L 1
09:59:57 vodCount VOD.L 2
09:59:58 vodCount VOD.L 3
09:59:59 vodCount VOD.L 4
10:00:00 vodCount VOD.L 5
10:00:02 vodCount VOD.L 6

Basic unfiltered hourly aggregation

Now we change the period and periodUnit settings to be per hour

analyticName table identifiers analytic filter period periodUnit isMovingWindow periodStartTime procNum
vodCount trade `VOD.L (count;`sym) () 1 hour 0b 00:00:00.000 0

In this setup, we count the ticks of `VOD.L with no filter until the hour is hit then reset, which would give the output

time analyticName sym value
09:59:55 vodCount VOD.L 1
09:59:57 vodCount VOD.L 2
09:59:58 vodCount VOD.L 3
09:59:59 vodCount VOD.L 4
10:00:00 vodCount VOD.L 1
10:00:02 vodCount VOD.L 2

Filtered hourly aggregation

Now we set the filter parameter to be (>;`volume;100).

analyticName table identifiers analytic filter period periodUnit isMovingWindow periodStartTime procNum
vodCount trade `VOD.L (count;`sym) (>;`volume;100) 1 hour 0b 00:00:00.000 0

The result set becomes

time analyticName sym value
09:59:55 vodCount VOD.L 1
09:59:58 vodCount VOD.L 2
09:59:59 vodCount VOD.L 3
10:00:02 vodCount VOD.L 1

Here we have ignored ticks that do not satisfy our condition

Multi-aggregation setup

We now add a second line to the configuration

analyticName table identifiers analytic filter period periodUnit isMovingWindow periodStartTime procNum
vodCount trade `VOD.L (count;`sym) (>;`volume;100) 1 hour 0b 00:00:00.000 0
sumPrice trade `VOD.L`BARC.L (sum;`price) (>;`volume;100) 2 hour 0b 00:00:00.000 0

This would give the output

time analyticName sym value
09:59:55 sumPrice VOD.L 117
09:59:55 vodCount VOD.L 1
09:59:56 sumPrice BARC.L 105
09:59:58 sumPrice VOD.L 236
09:59:58 vodCount VOD.L 2
09:59:59 sumPrice VOD.L 356
09:59:59 vodCount VOD.L 3
10:00:01 sumPrice BARC.L 105
10:00:02 sumPrice VOD.L 118
10:00:02 vodCount VOD.L 2

Both analytics are calculated, with the sumPrice analytic applying to both the VOD.L and BARC.L symbols. Here our sumPrice analytic operated on a 2-hour basis, but as it counts from midnight this means the interval was satisfied by the 10 am rollover point

Use of periodStart time

The periodStartTime parameter is used if you wish to capture particular buckets. For example, consider the following configuration

analyticName table identifiers analytic filter period periodUnit isMovingWindow periodStartTime procNum
dailyVolume trade ` (sum;`volume) (>;`volume;100) 8 hour 0b 0

With no periodStartTime defined, this will default to 00:00:00.000. Which means the aggregation intervals will be

00:00:00.000

08:00:00.000

16:00:00.000

However, if you were only interested in aggregating across the trading session that runs 09:00 - 17:00, you would set the configuration to

analyticName table identifiers analytic filter period periodUnit isMovingWindow periodStartTime procNum
dailyVolume trade ` (sum;`volume) (>;`volume;100) 8 hour 0b 09:00:00.000 0

Giving new buckets at

01:00:00.000

09:00:00.000

17:00:00.000

Note: Setting the periodStartTime to 01:00:00.000 would achieve the same result as the intervals will be applied backwards in time as well as forwards.

Using isMovingWindow parameter

By setting the isMovingWindow parameter to true, the calculations will now aggregate over a rolling window of the last period.

Consider the following:

analyticName table identifiers analytic filter period periodUnit isMovingWindow periodStartTime procNum
vodCountInterval trade `VOD.L (count;`sym ) (>;`volume;100) 1 hour 0b 00:00:00.000 0
vodCountLookback trade `VOD.L (count;`sym ) (>;`volume;100) 1 hour 1b 0

Here vodCountInterval will calculate how many prints happen in each hour bucket. vodCountLookback will calculate how many prints have happened in the last hour. For example, if we had a satisfying tick every 15 minutes:

time sym price volume
09:00:00 VOD.L 117 200
09:15:00 VOD.L 105 1000
09:30:00 VOD.L 119 1000
09:45:00 VOD.L 119 1000
10:00:00 VOD.L 120 1000
10:15:00 VOD.L 118 1000
10:30:00 VOD.L 105 1000
10:45:00 VOD.L 118 200
11:00:00 VOD.L 118 200

The output of the conditionalAnalytics will look like

time analyticName sym value
09:00:00 Interval VOD.L 1
09:00:00 Lookback VOD.L 1
09:15:00 Interval VOD.L 2
09:15:00 Lookback VOD.L 2
09:30:00 Interval VOD.L 3
09:30:00 Lookback VOD.L 3
09:45:00 Interval VOD.L 4
09:45:00 Lookback VOD.L 4
10:00:00 Interval VOD.L 1
10:00:00 Lookback VOD.L 4
10:15:00 Interval VOD.L 2
10:15:00 Lookback VOD.L 4
10:30:00 Interval VOD.L 3
10:30:00 Lookback VOD.L 4
10:45:00 Interval VOD.L 4
10:45:00 Lookback VOD.L 4
11:00:00 Interval VOD.L 1
11:00:00 Lookback VOD.L 4

As the Lookback is aggregating over a trailing 1-hour period, once one hour of data is collected it remains at 4. The interval analytic resets on each hourly boundary.

Condition duration thresholds

These thresholds are useful in alert generation when tracking how long a value is within or exceeds a given range. You configure these thresholds using the same conditional analytics config; however, they behave slightly differently.

Configuration

To configure a duration threshold, set the analytic to `duration. period, periodUnit and periodStartTime must be null and isMovingWindow is ignored.

analyticName table identifiers analytic filter period periodUnit isMovingWindow periodStartTime procNum
price_over_100 trade `VOD.L `duration (>;100;`price) 0b 0

Behavior

The behavior is slightly different from other conditional analytics, as it does not operate over a set period; hence you do not specify a period. This means that for each occurrence of a true filter match, the duration is accumulated and published. As soon as the filter is false, this accumulation is reset to zero.

This means that the output of the conditional analytic in this case is the time duration of each distinct event in which the threshold condition is satisfied.

Output

To demonstrate this behavior, consider the following tick input

time sym price
12:00:00 VOD.L 80
12:00:01 VOD.L 120
12:00:02 VOD.L 125
12:00:03 VOD.L 130
12:00:04 VOD.L 90
12:00:05 VOD.L 110
12:00:06 VOD.L 120

The conditional analytics output will now be

time analyticName sym duration
12:00:01 price_over_100 VOD.L 00:00:00
12:00:02 price_over_100 VOD.L 00:00:01
12:00:03 price_over_100 VOD.L 00:00:02
12:00:05 price_over_100 VOD.L 00:00:00
12:00:06 price_over_100 VOD.L 00:00:01

In this example, at 12:00:04 the duration was reset to 0 as the tick came in below price 100. If an alert engine was checking for when price was over 100 for 30s, it would check the output of the conditionalAnalytics table for a value of >00:00:30.

Process scaling and the procNo parameter

The distribution of conditional analytics per instance is user defined. The instances: number in the pipeline YAML configuration must match the distinct list of procNo in the accompanying .daas.cfg.realtimeConditionalAnalytics configuration.

For example, if the YAML file has the following configuration

cae:
  instances:4

Then the procNo in the realtimeConditionalAnalytics config must be of the set 0 1 2 3

Accessing historical conditional analytics

Ticks

Previously calculated conditional analytics are available through getTicks and the `conditionalAnalytics dataType. For example, to view all conditional analytics for equities for today you would call

getTicks[symList`assetClass`dataType`startDate`endDate!(`;`equity;`conditionalAnalytics;.z.d;.z.d)]

Extracting bucketed format results

For aggregations that are configured as buckets, you can view the bucket result in a getStats format. For example, with the real-time conditional analytic configuration

analyticName table identifiers analytic filter period periodUnit isMovingWindow periodStartTime procNum
hundredMinAvg trade ` (avg;`price) (>;`volume;100) 1 minute 0b 00:00:00.000 0

Whilst this configuration will produce an output on every tick, the final tick in each minute bucket represents the aggregation across the whole minute. Therefore, you can retrieve aggregations for each minute bucket with the getStats call and the last operator.

getStats[`symList`assetClass`dataType`startDate`endDate`granularityUnit`granularity!(`;`equity;`conditionalAnalytics;.z.d;.z.d;(last;`hundredMinAvg);`minute;1)]