Streaming analytics
Summary
Streaming analytics is a subscription-based method of streaming data to clients in realtime. The Query Manager (QM) manages client subscriptions and database connections. Once subscription is made, data is published to clients on a timer or when triggered by an event.
Setting up the analytic
Subscription analytic
To leverage for the KX Connect client sample, use the KX Connect conventions to write the subscription analytic.
This analytic registers the subscriber's identifier ID and parameters. It gets called once upon client subscription.
-
Create an analytic within the
.monPublic
group and name it.monPublic.sub
. -
In the Analytic Details, add the Description and an unique Alias. Set Connection and Type to be
mon_agg
and streaming respectively. -
In the Parameters subtab, set param to type Dict and add the following values.
-
Set the return value as a table and use the
monAvgLoad
schema -
In the Content subtab, copy and paste the following code.
{[param]
syms:(), param`syms;
.mon.streamingID+:1j;
`.mon.streamingSubs upsert `id`syms!(.mon.streamingID; syms);
.mon.streamingID
}
- Add Read permission to
kxwUsers
user group
Write an instruction
-
Create an instruction and name it
.mon.streamingLogic
. -
Add the sample code to the Content subtab.
- This includes the publish, unsubscribe and snapshot functions.
Add the analytic to the aggregation RTE
- Use the
mon_agg
RTE created in the data warehouse section.
-
Add the
monPublic
analytic group to the RTE. -
In the
.mon.initAgg
function, also load the.mon.streamingLogic
instruction.
{[]
// load instruction
.al.loadinstruction[`.mon.aggLogic];
.al.loadinstruction[`.mon.streamingLogic]; // Add this to the existing logic
// initialize aggregated tables
.mon.initAggTables[];
}
- In the
.mon.aggLogic
instruction, add this line.mon.pubStreaming[data];
inside the.mon.updAvg
function as shown below.
.mon.updAvg: {[]
// add times column
monAvgLoad::update time:.z.p from (select avgCPU: first total%size by sym from aggMonCPU) uj
(select avgMemV: first totalV%size, avgMemP: first totalP%size by sym from aggMonMem) uj
(select avgDisk: first total%size by sym from aggMonDisk);
// select columns + remove attribute from sym to match schema
// 0! to unkey joined tables
data: select time, sym:`#sym, avgCPU, avgMemV, avgMemP, avgDisk from 0!monAvgLoad;
// upsert to and publish monAvgLoad
upsert[`monAvgLoad;data];
.d.pub[`monAvgLoad;data];
//*** Add this line for streaming analytics ***
.mon.pubStreaming[data];
// set values of aggregated tables to 0
.mon.initAggTables[];
};
Calling the analytic
-
Run the default QM process,
ds_qm_ops_a.1
. -
If not already, run the
kxw_tp
,mon_feed
processes. -
Kill and re-run the
mon_agg
process as the analytic called upon startup has been changed.
Kdb+ client
// open a handle to the port of QM process
qm:hopen 21058;
qm(`registerDevice;`name;0);
ids:();
getoid:{[] string "j"$.z.p}
addSaSub:{[sa;p;targ;user] -1"Adding SA"; mySubID:getoid[]; neg[abs qm](`saAddSub;mySubID;sa;p;targ;user); ids,:enlist mySubID};
upd:{[ids;x] 0N!(ids; x); };
addSaSub[`.monPublic.sub; enlist enlist[`syms]!enlist`server_A; `mon_streaming; `Administrator];
KX Connect via websockets
There is a three-step process in order to access streaming analytics using the KX Connect API. This example uses Node JS.
KX Connect Document
Details of authentication for KX Connect can be found in your deployed system at http://host:port/connect/authentication.html
- Authenticate login and get a session ID
- Use the session ID to make an authorized connection to the websocket
- For the initial websocket authentication message, content-MD5 is not required in the StringToSign. Thus StringToSign will look differently in the initial and subsequent messages
- Call the streaming analytic through the websocket.
- Upon successful subscription, the client will receive streaming updates in the format defined in the analytic.
Code
- Install the required dependencies beforehand.
- Use the code given below as a sample.
Running code
From where your JS script is located.
- To run with default parameters:
node <script>.js
- To run with override parameters:
node monStreamingWS.js <host:port> <username> <password> <method>
If working properly, it should maintain a connection and stream data.
Dashboards
In this section, add a data grid
of streaming analytics to your Dashboard.
-
Drag a
data grid
from the icon on the left-hand side, and adjust the size. -
To configure the data source, click on the
data source
on the right-hand side menu.
-
A pop up will appear. Click on New and create a data source named Streaming. Select the
Analytic
radio button and the.monPublic.sub
analytic from the menu. Set the target process tomon_agg
. -
As we've defined this analytic to take in a dictionary as the parameter, select
dict
. Click on the eye icon to create aView State
. -
Click on New and name it symsList. Add the following properties.
- Create another
View State
named symsDict and add symsList to its properties.
-
Set the
Subscription
toStreaming
. -
Now, execute the query to see if you get data back. If successful, you should see a table returned on the bottom:
-
Select
Apply
andSelect Item
. -
You should see a table displaying where you dragged the data grid.
Execution Order
It is important to execute the data source in this order: Execute > Apply > Select Item.
-
Now add some highlighting rules.
-
From the right-hand menu, select Highlight rules and add a rule to color the current
avgCPU
value green when it is lower than the previous value.
- Similarly, add a highlight rule for the opposite condition and color it red.
- Create the same highlighting rule for each of the columns excluding time and sym.