Data recovery for kdb+tick¶
KX freely offers a complete tick-capture product which allows for the processing, analysis and historical storage of huge volumes of tick data in real time. This product, known as kdb+tick, is extremely powerful, lightweight and forms the core of most kdb+ architectures.
The tickerplant lies at the heart of this structure. It is responsible for receiving data from external feedhandlers and publishing to downstream subscribers.
Perhaps the most important aspect of the tickerplant is how it logs every single message it receives to a binary log file. In the event of a subscriber process failing, this log file can be used to restore any missing data.
kdb+tick¶
This paper will primarily consider the relationship between the TP (tick.q) and RDB (r.q) in a kdb+tick architecture. In particular, the use of tickerplant logs when recovering lost data in an RDB.
A log file created by a tickerplant is often referred to as a TP log
.
Writing a TP log¶
A log file can be created by any kdb+ process to record instructions/data in binary format, which can be later replayed to recover state.
A tickerplant (tick.q) has the option to create and records messages sent to its subscribing clients so that they may recover in situations were they may fail. The tickerplant creates and records logs using the methods as described here.
Should the TP fail, or be shut down for any period of time, no downstream subscriber will receive any published data for the period of its downtime. This data typically will not be recoverable. Thus it is imperative that the TP remain always running and available.
The tickerplant maintains some key variables which can be requested by subscribers in order to read the current TP log.
# start tickerplant
$ q tick.q sym . -p 5010
q).u.L
`:./sym2014.05.03
q).u.l
376i
q).u.i 0
The tickerplant calls the upd
function on any of its subscribing processes.
Therefore the tickerplant logs the upd
function call and any data passed, so that any subscribers can replay the log to regain state.
//from u.q
upd:{[t;x] ...
//if the handle .u.l exists, write (`upd;t;x) to the TP log;
//increment .u.j by one
if[l; l enlist(`upd;t;x); j+:1]
kdb+ messages and upd
function¶
A tickerplant message takes the form of a list.
(updfunctioname;tablename;tabledata)
Here, functionname
and tablename
are symbols, and tabledata
is a row of data to be inserted into tablename
.
Updates using trade schema
trade:([]time:`timespan$();sym:`$();side:`char$();size:`long$();price:` float$() );
would appear in the tp log as
`upd `trade (0D14:56:01.113310000;`AUDUSD;"S";1000;96.96)
`upd `trade (0D14:56:01.115310000;`SGDUSD;"S";5000;95.45)
`upd `trade (0D14:56:01.119310000;`AUDUSD;"B";1000;95.08)
`upd `trade (0D14:56:01.121310000;`AUDUSD;"B";1000;95.65)
`upd `trade (0D14:56:01.122310000;`SGDUSD;"B";5000;98.14)
Replaying a TP log¶
Replay of a log file and dealing with a corrupt log file is described here.
Clients of a tickerplant that may wish to recover state may be an RDB or custom developed RTEs. Its important to note that tickerplant does not playback the log file. An individual client of the tickerplant (e.g. a RDB) replays the log file when required.
An example of an RDB that uses the TP log to recover state on a restart is r.q.
On startup, r.q subscribes to a TP and receives the following information:
- message count (.u.i
)
- location of the TP log (.u.L
).
It then replays this TP log to recover all the data that has passed through the TP up to that point in the day.
This is called within .u.rep
, which is executed when the RDB connects to the TP.
//from r.q
.u.rep:{...;-11!y;...};
kdb+ messages were described above in kdb+ messages and upd function.
In a typical RDB, upd``, which will perform an insert.
Therefore, executing a single line in the logfile will be equivalent to ``insert[
tablename;tabledata]``.
Filtering TP log¶
A TP log will contain all messages published. A client of the tickerplant may have originally been subscribed to a subset of that data e.g. one of many tables published, or may wish to perform alternative logic on the data recovered from the log file.
To filter on the data from the tp log, we can set the function(s) originally logged e.g. upd
to a different value prior to playback and reinstate it after
playback completes. With this playback specific function, logic can be implemented to filter on the data provided or perform alternative logic.ยง
Logging via an RTE/RDB¶
Example¶
Consider a Real-Time Engine designed to keep track of trading account position limits, The limits for each account can be used against realtime data from the tickerplant, and could be updated in the RTE by account managers.
The account positions will use this schema
accounts:([] time:`timespan$(); sym:`$(); curr:`$(); action:`$(); limit:`long$());
This could take the form of a keyed table, where sym is an account name.
q) `sym xkey `accounts
`accounts
q)accounts
sym | time curr action limit
------------|----------------------------------------------------
fgAccount | 2014.05.04D10:27:00.288697000 AUDJPY insert 5000000
pbAcc | 2014.05.04D10:27:00.291699000 GBPUSD insert 1000000
ACCOUNT0023 | 2014.05.04D10:27:01.558332000 SGDUSD insert 1000000
If we wanted this keyed table to be recoverable within this process, we would publish any changes to the table via the tickerplant and have a customized upd
function defined locally in the RTE to take specific action on changes to the accounts table
upd:{[t;x]
$[t~`accounts;
$[`insert~a:first x`action;[t insert x];
`update~a;@[`.;t;,;x];
`delete~a;@[`.;t;:;delete from value[t] where sym=first x`sym ];
'`unknownaction];
t insert x]; }
Here we have three operations we can perform on the accounts
table: insert
, update
and delete
. We wish to record the running of these operations in the TP log, capture in the RDB (in an unkeyed version of the table), and perform customized actions in the RTE. We create a function to publish the data to the TP. The TP will publish to both the RDB and RTE, and the upd
function will then be called locally on each. Assuming TP is running on same machine on port 5010, with no user access credentials required, we can define a function named pub
on the RTE which will publish data from the RTE to the TP where it can be logged and subsequently re-published to the RDB and RTE.
.tp.h:hopen`:localhost:5010
pub:{[t;x]
neg[.tp.h](`upd;t;x);
h"" }
Example usage on the RTE:
// - insert new account
q)k:`sym`time`curr`action`limit
q)pub[`accounts; enlist k!(`ACCOUNT0024;.z.p;`SGDUSD;`insert;1000000)]
q)accounts
sym | time curr action limit
-----------|----------------------------------------------------
fgAccount | 2014.05.04D10:51:49.288168000 AUDJPY insert 5000000
pbAcc | 2014.05.04D10:51:49.291168000 GBPUSD insert 1000000
ACCOUNT0023| 2014.05.04D10:51:50.950002000 SGDUSD insert 1000000
ACCOUNT0024| 2014.05.04D10:54:41.796915000 SGDUSD insert 1000000
// - update the limit on account ACCOUNT0024
q)pub[`accounts; enlist k!(`ACCOUNT0024;.z.p;`SGDUSD;`update;7000000)]
q)accounts
sym | time curr action limit
-----------|----------------------------------------------------
fgAccount | 2014.05.04D10:51:49.288168000 AUDJPY insert 5000000
pbAcc | 2014.05.04D10:51:49.291168000 GBPUSD insert 1000000
ACCOUNT0023| 2014.05.04D10:51:50.950002000 SGDUSD insert 1000000
ACCOUNT0024| 2014.05.04D11:05:30.557228000 SGDUSD update 7000000
// - delete account ACCOUNT0024 from table
q)pub[`accounts; enlist k!(`ACCOUNT0024;.z.p;`SGDUSD;`delete;1000000)]
q)accounts
sym | time curr action limit
------------|----------------------------------------------------
fgAccount | 2014.05.04D10:27:00.288697000 AUDJPY insert 5000000
pbAcc | 2014.05.04D10:27:00.291699000 GBPUSD insert 1000000
ACCOUNT0023 | 2014.05.04D10:27:01.558332000 SGDUSD insert 1000000
Each action will be recorded in an unkeyed table in the RDB, resulting in the following.
q)accounts
sym time curr action limit
----------------------------------------------------------------
fgAccount 2014.05.04D10:27:00.288697000 AUDJPY insert 5000000
pbAcc 2014.05.04D10:27:00.291699000 GBPUSD insert 1000000
ACCOUNT0023 2014.05.04D10:27:01.558332000 SGDUSD insert 1000000
ACCOUNT0024 2014.05.04D10:54:41.796915000 SGDUSD insert 1000000
ACCOUNT0024 2014.05.04D11:05:30.557228000 SGDUSD update 7000000
ACCOUNT0024 2014.05.04D11:05:30.557228000 SGDUSD delete 1000000
The order in which logfile messages are replayed is hugely important in this case. All operations on this table should be made via the tickerplant so that everything is logged and order is maintained. After the three operations above, the TP log will have the following lines appended.
q)get`:TP_2014.05.04
(`upd;`accounts; +`sym`time`curr`action`limit!
(`ACCOUNT0024; 2014.05.04D10:54:41.796915000;`SGDUSD;`insert;1000000);
(`upd;`accounts; +`sym`time`curr`action`limit!
(`ACCOUNT0024; 2014.05.04D11:05:30.557228000;`SGDUSD;`update;7000000);
(`upd;`accounts; +`sym`time`curr`action`limit!
(`ACCOUNT0024; 2014.05.04D11:05:30.557228000;`SGDUSD;`delete;1000000);
Replaying this logfile will recover this table. If we wanted every operation to this table to be recoverable from a logfile, we would need to publish each operation. However, manual user actions that are not recorded in the TP log can cause errors when replaying. Consider the following table on the RTE.
q)accounts
sym | time curr action limit
-----------|----------------------------------------------------
// - insert new account
q)k:`sym`time`curr`action`limit
q)pub[`accounts; enlist k!(`ACCOUNT0024;.z.p;`SGDUSD;`insert;1000000)]
sym | time curr action limit
-----------|----------------------------------------------------
ACCOUNT0024| 2014.05.04D10:54:41.796915000 SGDUSD insert 1000000
// - delete this entry from the in-memory table without publishing to the TP
q)delete from `accounts where sym=`ACCOUNT0024
q)accounts
sym | time curr action limit
-----------|----------------------------------------------------
// - insert new account again
q)pub[`accounts; enlist k!(`ACCOUNT0024;.z.p;`SGDUSD;`insert;1000000)]
sym | time curr action limit
-----------|----------------------------------------------------
ACCOUNT0024| 2014.05.04D10:54:41.796915000 SGDUSD insert 1000000
Only two of these three actions were sent to the TP so only these two messages were recorded in the TP log:
q)get`:TP_2014.05.04
(`upd;`accounts; +`sym`time`curr`action`limit!
(`ACCOUNT0024; 2014.05.04D10:54:41.796915000;`SGDUSD;`insert;1000000);
(`upd;`accounts; +`sym`time`curr`action`limit!
(`ACCOUNT0024; 2014.05.04D10:54:41.796915000;`SGDUSD;`insert;1000000);
Replaying the TP log, the kdb+ process tries to perform an insert to a keyed table, but there is a restriction that the same key cannot be inserted in the table twice. The delete that was performed manually between these two steps was omitted.
q)-11!`:TP_2014.05.04
'insert
Recovering from q errors during replay¶
As seen in the previous section, replaying a TP log can result in an error even if the log file is uncorrupted. We can utilize error trapping to isolate any rows in the TP log which cause an error while transferring the other, error-free rows into a new log file. The problematic TP log lines will be stored in a variable where they can be analyzed to determine the next course of action.
old:`:2014.05.03
new:`TP_2014.05.03_new
new set () / write empty list to the new logfile
h:hopen new / open handle to new log file
updOld:upd / save original upd fn
baddata:() / variable to hold msgs that throw errors
upd:{[t;x] / redefine upd with error trapping
.[{[t;x] updOld[t;x]; / try the original upd fn on the log line
h enlist(`upd;t;x) }; / if successful, write to new logfile
(t;x); / input params
{[t;x;errmsg] / else log error and append to variable
(0N!"error on upd: ",errmsg baddata,:enlist(t;x)) }[t;x] ]
};
Reference: Trap
We now replay the original TP log.
q)-11!old
2
q)accounts
sym | time curr action limit
-----------|----------------------------------------------------
ACCOUNT0024| 2014.05.04D10:54:41.796915000 SGDUSD insert 1000000
Here, both lines of the TP log have been replayed, but only one has actually been executed. The second insert has been appended to the baddata
variable.
q)baddata
(`upd;`accounts; +`sym`time`curr`action`limit!
(`ACCOUNT0024; 2014.05.04D10:54:41.796915000;`SGDUSD;`insert;1000000);
We may wish to write the bad data to its own logfile.
(`:tp_2014.05.03_new) set enlist each `upd,/: baddata;