Tickerplant

The tickerplant (TP) template is an important component in the data warehouse architecture. It handles messages into the application, persisting them to disk and distributing to other processes. It usually receives all updates from feedhandlers (FHs). The logging takes the form of an append-only file on-disk and is critical for process recovery after a failure or restart.

Parameters

The below table details the TP template parameters.

name type description
messagingServer Config Messaging server config
publishChannel Symbol Publish channel
subscriptionChanel Symbol Subscription channel
subscriptionTableList Symbol[] Subscription tables
logDirectory Symbol Location of logs
dsLsInstance Instance Name of log streamer
timeOffset Integer Hour offset from local time
pubFreq Integer Batching interval (ms)
eodTime Time Time when EOD signal triggered
initialStateFunct Analytic Analytic to run on startup
logRecoveryFunct Analytic Analytic to recover corrupt logs
includeColumns Boolean Include column names in logs
intradayFreq Integer Intraday roll frequency
intradayTables Symbol[] Intraday table names
logUpdError Boolean Log message to console if error processing
timeType Symbol Timestamping updates with gmt or local
consumerAlertSize Integer Limit on buffered messages before alert
consumerDisconnectSize Integer Limit on buffered messages before disconnect
multiPublish Boolean Publish single message with multiple tables
retryInterval Integer Retry interval (ms) for on-disk failures
bufferEnabled Boolean Enable message buffering
bufferFunct Analytic Function to handle updates during buffering
bufferLimit Integer Buffer file size limit (MB)

Intraday logging

By default the TP only writes one single log per day containing all data for all tables. In large volume environments, this can cause application and hardware issues. To address these, it is possible to split high-volume tables into lower-frequency interval files. These files contain data for sub-intervals of the day; the frequency is determined by the intradayFreq parameter. If set to a value greater than zero, this mode is enabled. The intradayTables parameter specifies which tables should be written to these logs.

Resilience and retries

The role of the TP is important for persistence and recovery. However this counts on the disk being reliable. Functionality is provided to try mitigate temporary disk failures by retrying when these occur. This can be enabled by setting the retryInterval parameter. If set, the TP will attempt I/O operations when they fail by sleeping and retrying.

Buffering

This feature provides support for applications to buffer messages in the TP. The target use-case is for rebalancing data across sharded data nodes. During the rebalance event, the application may not want to process certain data until the rebalance has completed. In this case, the TP will provide a hook for the application to buffer this to a separate file until a later point.

An example would be handling late data. Assume the application starts as a single node system with intraday write-downs. The current hour is stored in the RDB, IDB stores previous hours and HDB stores previous dates. Now if the load increases, the application decides to shard the instruments into two sets; A-M and N-Z. The system now consists of two sets of databases. The RDB data is easily migrated and feeds sharded to send to different locations. The on-disk data will likely take longer. Below is the situation at this point, assuming the current time is between 11:00 and 12:00.

Screenshot

If data older than 11:00 hits the system, it cannot be merged with the on-disk data yet as that has not been migrated. In this case, the application wants the TP to buffer this data to a separate logfile until the rebalance completes. In the diagram below, the application code recognizes a record as late and buffers it for later consumption.

Screenshot

Important notes

  • The buffering logic is implemented using the bufferFunct parameter
  • The hook is injected into the TP upd handler
  • By default, buffering hook isn’t injected to protect performance
  • Buffering events are controlled by the application starting and stopping them

Implementation

The mechanics of a buffering event are described. The application initiates a buffer event with a call to .ds.tp.buff.start. This creates a buffer log and opens a handler to it. It updates the TP update handler to inject the bufferFunct. Each event is identified by an ID provided by the application and this is part of the buffer filename; kx_tp_a.10.buffer. The start event also publishes a mark to subscribers and TP logs so the rest of the application can track the event.

The bufferFunct takes the incoming table name and data, and decides whether to process the update normally or buffer to the logfile. It should return the data it doesn’t want to buffer and call .ds.tp.buff.log for the rest.

When complete, the application can end the event using .ds.tp.buff.end. This closes the buffer log and renames to indicate it is complete; kx_tp_a.10.buffer.complete. The bufferFunct is removed from the TP update handler. Another mark is published to subscribers and logs to indicate the event has finished.

If the TP restarts during a buffer event, it will attempt to recover its state by interrogating any open buffer logs. If an active log is found, the TP will re-initiate the event as active.

APIs

.ds.tp.buff.start

Initiate a buffering event

Syntax: .ds.tp.buff.start[id;args]

Where

  • id is an event ID as a long
  • args is a dictionary which can be used to describe the event to subscribers

a buffering event is initiated.

.ds.tp.buff.start[10j; `source`cutoff!(`kx_dbw_a; .z.p)]

.dm.buff.start

Buffering event start mark

Syntax: .dm.buff.start

This is the mark published at the start of a buffering event. It contains the ID, buffer log path and event meta dictionary.

q)`.dm.buff.start
10
`:/path/to/kx_tp_a.10.buffer
`source`cutoff!(`kx_dbw_a;2020.03.27D17:57:40.767955000)

.ds.tp.buff.log

Log a table record to the buffer log

Syntax: .ds.tp.buff.log[t;d]

Where

  • t is the name of a table as a symbol
  • d is a table of records

logs the records to the buffer log.

.ds.tp.buff.log[`kxData; ([] time:.z.p; sym:2?`2; reading:2?10.)]

.ds.tp.buff.end

End a buffer event

Syntax: .ds.tp.buff.end[id;args]

Where

  • id is a buffer event ID as a long
  • args is a dictionary of additional information

ends the buffer event.

.ds.tp.buff.end[10; `time`status!(.z.p; `complete)];

.dm.buff.end

Buffering event end mark

Syntax: .dm.buff.end

This is the mark published at the end of a buffering event. It contains the ID, buffer log path and event meta dictionary.

Parameter Type
id long
logname symbol
args dict

Example

q)`.dm.buff.end
10
`:/path/to/kx_tp_a.10.buffer.complete
`time`status!(2020.03.27D18:06:38.367615000;`complete)

Subscribers

Subscriber processes can specify handlers for the buffer mark events. This allows them to take actions when buffer events start or end. The handlers are defined using instance parameters: .dm.buff.i.start and .dm.buff.i.end.

If not defined, the hooks will default to stubbed functions that take no action.

Instance parameters