Action Tracker client
This module allows any KX Platform process to subscribe to replicated updates
from the Action Tracker cluster. On initialisation all the action tracker tables (.rpl.tables
)
can be downloaded by the subscribing process. Any changes to these tables will then be
published to the process. This will include any changes to action tracker items, such as
item transitions, comments and item payload edits. Callbacks can be defined that will be
triggered when an update is made, allowing any custom code to be executed.
Summary of usage;
- Define callbacks for initial download of data, data update and data amend, as well as other optional callbacks. Set these as the handlers using
.px.atsub.setHandlers
- Define data structures required on client process, e.g. load required schema
- Call initialisation function to subscribe to a connection group or service class for the desired action tracker cluster,
.px.atsub.init
- Client process will connect to the AT cluster and download data from the leader AT process
- The processes in the AT cluster will be stored in the
.px.atsub.procs
table - The update and amend callbacks will be triggered whenever there is a change on the leader AT process
- If the connection to the leader AT process is dropped, either the module will attempt to re-subscribe on a timer, or a disconnect callback will be triggered, depending on configuration.
Set handlers
The API to configure the callbacks, .px.atsub.setHandlers
, must be called
before the subscription is initialised using .px.atsub.init
. Callbacks
can be configured using an INSTANCE_CONFIG override or by passing function
names in the dictionary parameter to .px.atsub.setHandlers
, but in
either case .px.atsub.setHandlers
must be called before .px.atsub.init
.
Example subscription
An example of subscribing to the dxATItemCurrent table from the ds_action_tracker cluster would be as follows. Load the AT subscription module
.utils.loadfile[`:../shared/q/px.atsub.q]
Load the dxATItemCurrent schema
.tbl.gettable[`dxATItemCurrent]
update alertkey:string[alertkey] from `dxATItemCurrent
Changes to AT behaviour as of KX Platform version 4.6.0
The alertkey column is a string, as of KX Platform version 4.6.0, not a symbol as in the schema entity. This must be cast to a string when the schema is loaded by the client prior to upserting data to the table from the subscription. In addition, there is no end-of-day trigger to the subscribing client - the client needs to handle any end-of-day actions itself.
Define functions to be used for initialisation and update, these could also be analytics loaded by the client process
.sol.atsub.init:{[d]
if[`.rpl.uid in key d;
.px.atsub.uid:d`.rpl.uid;
];
if[`dxATItemCurrent in key d;
upsert[`dxATItemCurrent;d`dxATItemCurrent]
];
};
.sol.atsub.upd:{[table;data]
if[table=`dxATItemCurrent;
upsert[table;data]
];
};
Set the defined functions as the callbacks for initialisation and update
.px.atsub.setHandlers[`upd`init!(`.sol.atsub.upd;`.sol.atsub.init)]
these function names could instead be configured using the INSTANCE_CONFIG override for the process, as
the .px.atsub.init
and .px.atsub.upd
entries in the override. If that were the case then to set
the same handlers the API could be called with a null key and value
.px.atsub.setHandlers[(enlist `)!enlist `]
Initialise the subscription to the ds_action_tracker_a
cluster
.px.atsub.init[`ds_action_tracker;()!()]
This will initialise the subscription with the default reconnect behaviour, which is to attempt reconnection every 60 seconds
if a connection to a process in the cluster is dropped. This behaviour can be overwritten with a false boolean value for a
reconnect
key in the dictionary parameter, or with a false .px.atsub.arg.reconnect
setting in INSTANCE_CONFIG. The cluster
to connect to can be configured with an INSTANCE_CONFIG setting for .px.atsub.arg.cluster
, in which case a null symbol should
be passed into .px.atsub.init
as the cluster.
The leader action tracker process in the ds_action_tracker
cluster will call .sol.atsub.init
on the client
process, populating the local dxATItemCurrent
table. The client process will then receive updates published from the
remote action tracker process whenever changes occur.
.px.atsub.i.amend
Callback triggered when an amend occurs on the leader action tracker process in the
subscribed cluster. There is no default action for this callback. The code of this callback can be
overwritten using an INSTANCE_CONFIG setting for .px.atsub.i.amend
and using .px.atsub.setHandlers
Parameters:
Name | Type | Description |
---|---|---|
apply | Table name being updated | |
variable | symbol | Variable name |
index | Where to apply the update | |
newvalue | Value to update to |
Example:
.px.atsub.i.amend[@; `.at.reissuemap; `CreditAlert.CreditAlert.EUR/USD`CreditAlert.CreditAlert.EUR/USD; 0Nj]
Example:
.px.atsub.i.amend[.; `.at.infoID; (); 20j]
Example:
.px.atsub.i.amend[@; `.at.iteminfo; 10j; 1j]
.px.atsub.i.disconnect
Callback triggered when a handle is disconnected. A separate callback will update
the subscriptions table (.px.atsub.procs). This callback allows any custom action to be taken.
There is no default action for this callback. The code of this callback can be overwritten using an
INSTANCE_CONFIG setting for .px.atsub.i.disconnect
and using .px.atsub.setHandlers
Parameter:
Name | Type | Description |
---|---|---|
hnd | int | Handle that as disconnected |
Example:
.px.atsub.i.disconnect[20i]
.px.atsub.i.init
Callback triggered when a subscription to an Action Tracker cluster is established. This will
be called by the leader action tracker process in the cluster on the subscribing client. The default action
for this callback sets the client uid number, .px.atsub.uid
, equal to the uid of the leader process in the
action tracker cluster, .rpl.uid
. The code of this callback can be overwritten using an
INSTANCE_CONFIG setting for .px.atsub.i.init
and using .px.atsub.setHandlers
Parameter:
Name | Type | Description |
---|---|---|
d | dict | With table names as keys and table values as the values. The tables will be the contents of the .rpl.tables list on the leader action tracker process. This will also include the uid number .rpl.uid |
Example:
.px.atsub.i.init[`.rpl.uid`dxATItemCurrent!(1500;+(,`dItemID)!,`u#0 1 2 7 52 77 80 83......)]
.px.atsub.i.newLeader
Callback triggered when there is a new leader process in the subscribed action tracker cluster.
The default action of this callback is to make a call to the new leader process to have it trigger the .px.atsub.i.init
callback. The code of this callback can be overwritten using an INSTANCE_CONFIG setting for .px.atsub.i.newLeader
and using .px.atsub.setHandlers
Parameter:
Name | Type | Description |
---|---|---|
name | symbol | Name of new leader process |
Example:
.px.atsub.i.newLeader[`ds_action_tracker_b]
.px.atsub.i.seqNoGap
Callback triggered when there is an unexpected uid/sequence number gap.
There is no default action for this callback. The code of this callback can be overwritten
using an INSTANCE_CONFIG setting for .px.atsub.i.seqNoGap
and using .px.atsub.setHandlers
Parameters:
Name | Type | Description |
---|---|---|
u | long | uid/sequence number |
cmd | message accompanying the out of sequence uid |
Example:
.px.atsub.i.seqNoGap[3060; (`rUpdRT; `dxATItemCurrent; +(,`dItemID)!,,270)!+`alertTime`recvTime`alert`alertkey.....)]
.px.atsub.i.uidUpdate
Callback triggered when there is a uid/sequence number update outside of the regular increments.
The default action of this callback is to make a call to the leader process in the cluster to have it trigger the .px.atsub.i.init
callback. The code of this callback can be overwritten using an INSTANCE_CONFIG setting for .px.atsub.i.uidUpdate
and using .px.atsub.setHandlers
Parameters:
Name | Type | Description |
---|---|---|
name | symbol | Name of leader process |
uid | long | New uid number |
Example:
.px.atsub.i.uidUpdate[`ds_action_tracker_b; 3050]
.px.atsub.i.upd
Callback triggered when an update occurs on the leader action tracker process in the
subscribed cluster. There is no default action for this callback. The code of this callback can be
overwritten using an INSTANCE_CONFIG setting for .px.atsub.i.upd
and using .px.atsub.setHandlers
Parameters:
Name | Type | Description |
---|---|---|
table | symbol | Table name being updated |
data | Value of data in update |
Example:
.px.atsub.i.upd[`dxATItemCurrent;+(,`dItemID)!(,,260)!+`alertTime`recvTime`alert`alertkey.....]
.px.atsub.init
Initiates a subscription to an Action Tracker cluster
Parameters:
Name | Type | Description |
---|---|---|
cluster | Symbol | Name of cluster, can be connection group or service class. If null is passed the INSTANCE_CONFIG setting for .px.atsub.arg.cluster will be used |
arg | dict | With optional key, reconnect, taking a boolean value. If true, for any dropped connections to AT cluster processes, an attempt will be made to re-establish them every 60 seconds. If this flag is absent the INSTANCE_CONFIG setting for .px.atsub.arg.reconnect will be used, which defaults to true |
Returns:
Type | Description |
---|---|
Nothing returned |
Example:
.px.atsub.init[`ds_action_tracker;(enlist `reconnect)!enlist 0b]
Example:
.px.atsub.init[`ds_at;()!()]
Example:
.px.atsub.init[`;()!()]
.px.atsub.setHandlers
Sets callback functions. This must be called before .px.atsub.init
. Takes a dictionary
parameter with the callbacks as keys (e.g. init, upd, amend etc.) and function names as values. The function
names could be analytics loaded on the process or functions defined in an instruction for example.
If a callback is not present in the input dictionary the INSTANCE_CONFIG setting for the callback
will be used. If there is also no INSTANCE_CONFIG setting configured for a callback then the default
callback function will be used. This API must still be called to set callbacks configured using INSTANCE_CONFIG.
If configuring using INSTANCE_CONFIG, the parameter names to define the callbacks are:
.px.atsub.arg.upd
.px.atsub.arg.amend
.px.atsub.arg.init
.px.atsub.arg.disconnect
.px.atsub.arg.newLeader
.px.atsub.arg.uidUpdate
.px.atsub.arg.seqNoGap
Parameter:
Name | Type | Description |
---|---|---|
arg | dict | Possible keys (all optional) init, upd, amend, disconnect, newLeader, uidUpdate, seqNoGap. Values should be the callback function names |
Example:
.px.atsub.setHandlers[`init`upd!`ATSubInitAnalytic`ATSubUpdAnalytic]
Example:
.px.atsub.setHandlers[(enlist `)!enlist `]