Skip to content

Tickerplant

The tickerplant (TP) template is an important component in the data warehouse architecture. It handles messages into the application, persisting them to disk and distributing to other processes. It usually receives all updates from feedhandlers (FHs). The logging takes the form of an append-only file on-disk and is critical for process recovery after a failure or restart.

Parameters

name type description
bufferEnabled Boolean Enable message buffering
bufferFunct Analytic Function to handle updates during buffering
bufferLimit Integer Buffer file size limit (MB)
consumerAlertSize Integer Limit on buffered messages before alert in bytes
consumerDisconnectSize Integer Limit on buffered messages before disconnect in bytes
dsLsInstance Instance Name of log streamer
eodTime Time Time when EOD signal triggered
EOIOffset Integer An integer number of minutes to offset the EOI times. May also be negative
includeColumns Boolean Include column names in logs
initialStateFunct Analytic Analytic to run on startup
intradayFreq Integer Intraday roll frequency
intradayTables Symbol[] Intraday table names
logDirectory Symbol Location of logs
logRecoveryFunct Analytic Analytic to recover corrupt logs
logUpdError Boolean Log message to console if error processing
messagingServer Config Messaging server config
multiPublish Boolean Publish single message with multiple tables
pubFreq Integer Batching interval (ms)
publishChannel Symbol Publish channel
retryInterval Integer Retry interval (ms) for failures writing to log file
subscriptionChannel Symbol Subscription channel
subscriptionTableList Symbol[] Subscription tables
timeOffset Integer Hour offset from local time
timeType Symbol Timestamping updates with gmt or local

Intraday logging

By default the TP only writes one single log per day containing all data for all tables. In large volume environments, this can cause application and hardware issues. To address these, it's possible to split high-volume tables into lower frequency interval files. These files contain data for sub-intervals of the day; the frequency is determined by the intradayFreq parameter. If set to a value greater than zero, this mode is enabled. The intradayTables parameter specifies which tables should be written to these logs.

In a multi node set up, interval file rolling can be staggered using the EOIOffset parameter. Setting the value to a postive or negative value will offset the interval roll time for that instance. It will not affect the interval size.

Resilience and retries

The role of the TP is important for persistence and recovery. However this counts on the disk being reliable. Functionality is provided to try mitigate temporary disk failures by retrying when these occur. This can be enabled by setting the retryInterval parameter. If set, the TP will attempt I/O operations when they fail by sleeping and retrying.

Buffering

This feature provides support for applications to buffer messages in the TP. The target use-case is for rebalancing data across sharded data nodes. During the rebalance event, the application may not want to process certain data until the rebalance has completed. In this case, the TP will provide a hook for the application to buffer this to a separate file until a later point.

An example would be handling late data. Assume the application starts as a single node system with intraday write-downs. The current hour is stored in the RDB, IDB stores previous hours and HDB stores previous dates. Now if the load increases, the application decides to shard the instruments into two sets; A-M and N-Z. The system now consists of two sets of databases. The RDB data is easily migrated and feeds sharded to send to different locations. The on-disk data will likely take longer. Below is the situation at this point, assuming the current time is between 11:00 and 12:00.

Screenshot

If data older than 11:00 hits the system, it can’t be merged with the on-disk data yet as that hasn't been migrated. In this case, the application wants the TP to buffer this data to a separate logfile until the rebalance completes. In the diagram below, the application code recognises a record as late and buffers it for later consumption.

Screenshot

Important notes

  • The buffering logic is implemented using the bufferFunct parameter
  • The hook is injected into the TP upd handler
  • Buffering hook isn't injected by default to protect performance
  • Buffering events are controlled by the application starting and stopping them

Implementation

The mechanics of a buffering event are described. The application initiates a buffer event with a call to .ds.tp.buff.start. This creates a buffer log and opens a handler to it. It updates the TP update handler to inject the bufferFunct. Each event is identified by an ID provided by the application and this is part of the buffer filename; kx_tp_a.10.buffer. The start event also publishes a mark to subscribers and TP logs so the rest of the application can track the event.

The bufferFunct takes the incoming table name and data, and decides whether to process the update normally or buffer to the logfile. It should return the data it doesn't want to buffer and call .ds.tp.buff.log for the rest.

When complete, the application can end the event using .ds.tp.buff.end. This closes the buffer log and renames to indicate it's complete; kx_tp_a.10.buffer.complete. The bufferFunct is removed from the TP update handler. Another mark is published to subscribers and logs to indicate the event has finished.

If the TP restarts during a buffer event, it will attempt to recover its state by interrogating any open buffer logs. If an active log is found, the TP will re-initiate the event as active.

.ds.tp.buff.start

Initiate a buffering event

.ds.tp.buff[id;args]

Where

  • id (long) is an event ID
  • args is a dictionary of event details

Example:

.ds.tp.buff.start[10j; `source`cutoff!(`kx_dbw_a; .z.p)]

.dm.buff.start

Mark published at the start of a buffering event

.dm.buff.start[id;logname; args]

where

  • id (long) is an event ID
  • logname (symbol) is a buffer log path
  • args is a dictionary of event metadata

Example:

(`.dm.buff.start; 10; `:/path/to/kx_tp_a.10.buffer; `source`cutoff!(`kx_dbw_a; .z.p))

.ds.tp.buff.log

Log a table record to the buffer log

.ds.tp.buff.log[tablename;data]

Where

  • tablename (symbol) is the name of table
  • data (table) is data to be logged

Example:

.ds.tp.buff.log[`kxData; ([] time:.z.p; sym:2?`2; reading:2?10.)]

.ds.tp.buff.end

End a buffer event

.ds.tp.buff.end[id;args]

Where

  • id (long) is an event ID
  • args (dict) is additional information

Example:

.ds.tp.buff.end[10; `time`status!(.z.p; `complete)]

.dm.buff.end

The mark published at the end of a buffering event

.dm.buff.end[id;logname;args]

Where

  • id (long) is an event ID
  • logname (symbol) is a buffer log path
  • args (dict) is an event meta dictionary

Example:

(`.dm.buff.end; 10; `:/path/to/kx_tp_a.10.buffer.complete; `time`status!(.z.p; `complete)

Subscribers

Subscriber processes can specify handlers for the buffer mark events. This allows them to take actions when buffer events start or end. The handlers are defined using Instance parameters .dm.buff.i.start and .dm.buff.i.end.

If not defined, the hooks will default to stubbed functions that take no action.

Interval triggering

This function is added to TP to manually trigger interval roll. It takes one optional parameter: the name (symbol) of the calling process. This then updates the next interval trigger time to the end of the current second. Interval will trigger and then reset to the previous scheduled times, i.e. if rolling every hour and triggered manually at 11:30, will be re-scheduled for 12:00 after the manual trigger.

.ds.tp.triggerInterval

Trigger the interval roll

.ds.tp.triggerInterval inst

Where inst is the name of the calling process as a symbol (or the null symbol) triggers the interval roll.

Example:

.ds.tp.triggerInterval[`]

<->2020.02.04D12:10:29.653 ### normal ### (24501): Manual interval trigger. Scheduled for: 2020.02.04T12:10:30.000 ### ()!()

.ds.tp.triggerIntervalDetails

Trigger the interval roll, log details

.ds.tp.triggerIntervalDetails[inst;detsails]

Where

  • inst is the name of the calling process as a symbol (or the null symbol)
  • args is a dictionary details to write in the log

triggers the interval roll and writes details in the log.

Similar to .ds.tp.triggerInterval but takes additional dictionary parameter of details you wish to print to logs.

Example:

.ds.tp.triggerIntervalDetails[`; `a`b`c!til 3];

<->2020.02.04D12:15:39.046 ### normal ### (24501): Manual interval trigger. Scheduled for: 2020.02.04T12:15:40.000 ### `a`b`c!0 1 2