Streaming analytics
Summary
Streaming analytics is a subscription-based method of streaming data to clients in realtime. The Query Manager (QM) manages client subscriptions and database connections. Once subscription is made, data is published to clients on a timer or when triggered by an event.
Setting up the analytic
Subscription analytic
To leverage for the KX Connect client sample, use the KX Connect conventions to write the subscription analytic.
This analytic registers the subscriber's identifier ID and parameters. It gets called once upon client subscription.
- 
Create an analytic within the .monPublicgroup and name it.monPublic.sub.
- 
In the Analytic Details, add the Description and an unique Alias. Set Connection and Type to be mon_aggand streaming respectively.
- 
In the Parameters subtab, set param to type Dict and add the following values. 
- 
Set the return value as a table and use the monAvgLoadschema
- 
In the Content subtab, copy and paste the following code. 
{[param]
  syms:(), param`syms;
  .mon.streamingID+:1j;
  `.mon.streamingSubs upsert `id`syms!(.mon.streamingID; syms); 
  .mon.streamingID
 }
- Add Read permission to kxwUsersuser group
Write an instruction
- 
Create an instruction and name it .mon.streamingLogic.
- 
Add the sample code to the Content subtab. 
- This includes the publish, unsubscribe and snapshot functions.
Add the analytic to the aggregation RTE
- Use the mon_aggRTE created in the data warehouse section.
- 
Add the monPublicanalytic group to the RTE.
- 
In the .mon.initAggfunction, also load the.mon.streamingLogicinstruction.
{[]
  // load instruction
  .al.loadinstruction[`.mon.aggLogic];
  .al.loadinstruction[`.mon.streamingLogic]; // Add this to the existing logic
  // initialize aggregated tables
  .mon.initAggTables[];
 }
- In the .mon.aggLogicinstruction, add this line.mon.pubStreaming[data];inside the.mon.updAvgfunction as shown below.
.mon.updAvg: {[]
  // add times column  
  monAvgLoad::update time:.z.p from (select avgCPU: first total%size by sym from aggMonCPU) uj 
    (select avgMemV: first totalV%size, avgMemP: first totalP%size by sym from aggMonMem) uj 
    (select avgDisk: first total%size by sym from aggMonDisk);
  // select columns + remove attribute from sym to match schema
  // 0! to unkey joined tables
  data: select time, sym:`#sym, avgCPU, avgMemV, avgMemP, avgDisk from 0!monAvgLoad;
  // upsert to and publish monAvgLoad 
  upsert[`monAvgLoad;data];
  .d.pub[`monAvgLoad;data];
  //*** Add this line for streaming analytics ***
  .mon.pubStreaming[data];
  // set values of aggregated tables to 0
  .mon.initAggTables[];
 };
Calling the analytic
- 
Run the default QM process, ds_qm_ops_a.1.
- 
If not already, run the kxw_tp,mon_feedprocesses.
- 
Kill and re-run the mon_aggprocess as the analytic called upon startup has been changed.
Kdb+ client
// open a handle to the port of QM process
qm:hopen 21058;
qm(`registerDevice;`name;0);
ids:();
getoid:{[] string "j"$.z.p}
addSaSub:{[sa;p;targ;user] -1"Adding SA"; mySubID:getoid[]; neg[abs qm](`saAddSub;mySubID;sa;p;targ;user); ids,:enlist mySubID};
upd:{[ids;x] 0N!(ids; x); };
addSaSub[`.monPublic.sub; enlist  enlist[`syms]!enlist`server_A; `mon_streaming; `Administrator];
KX Connect via websockets
There is a three-step process in order to access streaming analytics using the KX Connect API. This example uses Node JS.
KX Connect Document
Details of authentication for KX Connect can be found in your deployed system at http://host:port/connect/authentication.html
- Authenticate login and get a session ID
- Use the session ID to make an authorized connection to the websocket- For the initial websocket authentication message, content-MD5 is not required in the StringToSign. Thus StringToSign will look differently in the initial and subsequent messages
 
- Call the streaming analytic through the websocket. - Upon successful subscription, the client will receive streaming updates in the format defined in the analytic.
 
Code
- Install the required dependencies beforehand.
- Use the code given below as a sample.
Running code
From where your JS script is located.
- To run with default parameters:
node <script>.js
- To run with override parameters:
node monStreamingWS.js <host:port> <username> <password> <method>
If working properly, it should maintain a connection and stream data.
Dashboards
In this section, add a data grid of streaming analytics to your Dashboard.
- 
Drag a data gridfrom the icon on the left-hand side, and adjust the size.
- 
To configure the data source, click on the data sourceon the right-hand side menu.
- 
A pop up will appear. Click on New and create a data source named Streaming. Select the Analyticradio button and the.monPublic.subanalytic from the menu. Set the target process tomon_agg.
- 
As we've defined this analytic to take in a dictionary as the parameter, select dict. Click on the eye icon to create aView State.
- 
Click on New and name it symsList. Add the following properties. 
- Create another View Statenamed symsDict and add symsList to its properties.
- 
Set the SubscriptiontoStreaming.
- 
Now, execute the query to see if you get data back. If successful, you should see a table returned on the bottom: 
- 
Select ApplyandSelect Item.
- 
You should see a table displaying where you dragged the data grid. 
Execution Order
It is important to execute the data source in this order: Execute > Apply > Select Item.
- 
Now add some highlighting rules. 
- 
From the right-hand menu, select Highlight rules and add a rule to color the current avgCPUvalue green when it is lower than the previous value.
- Similarly, add a highlight rule for the opposite condition and color it red.
- Create the same highlighting rule for each of the columns excluding time and sym.











