Skip to content

Streaming analytics

Summary

Streaming analytics is a subscription-based method of streaming data to clients in realtime. The Query Manager (QM) manages client subscriptions and database connections. Once subscription is made, data is published to clients on a timer or when triggered by an event.

Streaming Analytics

Setting up the analytic

Subscription analytic

To leverage for the KX Connect client sample, use the KX Connect conventions to write the subscription analytic.

This analytic registers the subscriber's indentifier ID and parameters. It gets called once upon client subscription.

  • Create an analytic within the .monPublic group and name it .monPublic.sub.

  • In the Analytic Details, add the Description and an unique Alias. Set Connection and Type to be mon_agg and streaming respectively.

  • In the Parameters subtab, set param to type Dict and add the following values.

Screenshot

Screenshot

  • Set the return value as a table and use the monAvgLoad schema

  • In the Content subtab, copy and paste the following code.

{[param]
  syms:(), param`syms;
  .mon.streamingID+:1j;
  `.mon.streamingSubs upsert `id`syms!(.mon.streamingID; syms); 
  .mon.streamingID
 }
  • Add Read permission to kxwUsers user group

Write an instruction

  • Create an instruction and name it .mon.streamingLogic.

  • Add the sample code to the Content subtab.

Streaming Logic

  • This includes the publish, unsubscribe and snapshot functions.

Add the analytic to the aggregation RTE

  • Use the mon_agg RTE created in the data warehouse section.

Aggregation Engine

  • Add the monPublic analytic group to the RTE.

  • In the .mon.initAgg function, also load the .mon.streamingLogic instruction.

{[]
  // load instruction
  .al.loadinstruction[`.mon.aggLogic];
  .al.loadinstruction[`.mon.streamingLogic]; // Add this to the existing logic

  // initialize aggregated tables
  .mon.initAggTables[];
 }
  • In the .mon.aggLogic instruction, add this line .mon.pubStreaming[data]; inside the .mon.updAvg function as shown below.
.mon.updAvg: {[]
  // add times column  
  monAvgLoad::update time:.z.p from (select avgCPU: first total%size by sym from aggMonCPU) uj 
    (select avgMemV: first totalV%size, avgMemP: first totalP%size by sym from aggMonMem) uj 
    (select avgDisk: first total%size by sym from aggMonDisk);

  // select columns + remove attribute from sym to match schema
  // 0! to unkey joined tables
  data: select time, sym:`#sym, avgCPU, avgMemV, avgMemP, avgDisk from 0!monAvgLoad;

  // upsert to and publish monAvgLoad 
  upsert[`monAvgLoad;data];
  .d.pub[`monAvgLoad;data];

  //*** Add this line for streaming analytics ***
  .mon.pubStreaming[data];

  // set values of aggregated tables to 0
  .mon.initAggTables[];
 };

Calling the analytic

  • Run the default QM process, ds_qm_ops_a.1.

  • If not already, run the kxw_tp, mon_feed processes.

  • Kill and re-run the mon_agg process as the analytic called upon startup has been changed.

Kdb+ client

Screenshot

// open a handle to the port of QM process
qm:hopen 21058;
qm(`registerDevice;`name;0);

ids:();

getoid:{[] string "j"$.z.p}
addSaSub:{[sa;p;targ;user] -1"Adding SA"; mySubID:getoid[]; neg[abs qm](`saAddSub;mySubID;sa;p;targ;user); ids,:enlist mySubID};
upd:{[ids;x] 0N!(ids; x); };

addSaSub[`.monPublic.sub; enlist  enlist[`syms]!enlist`server_A; `mon_streaming; `Administrator];

Screenshot

KX Connect via websockets

There is a three-step process in order to access streaming analytics using the KX Connect API. This example uses Node JS.

KX Connect Document

Details of authentication for KX Connect can be found in your deployed system at http://host:port/connect/authentication.html

  1. Authenticate login and get a session ID
  2. Use the session ID to make an authorized connection to the websocket
    • For the initial websocket authentication message, content-MD5 is not required in the StringToSign. Thus StringToSign will look differently in the initial and subsequent messages
  3. Call the streaming analytic through the websocket.
    • Upon successful subscription, the client will receive streaming updates in the format defined in the analytic.

Code

  • Install the required dependencies beforehand.

Dependencies

  • Use the code given below as a sample.

Sample Code

Running code

From where your JS script is located.

  • To run with default parameters:

node <script>.js

  • To run with override parameters:

node monStreamingWS.js <host:port> <username> <password> <method>

If working properly, it should maintain a connection and stream data.

Screenshot

Dashboards

In this section, add a data grid of streaming analytics to your Dashboard.

  • Drag a data grid from the icon on the left-hand side, and adjust the size.

  • To configure the data source, click on the data source on the right-hand side menu.

Screenshot

  • A pop up will appear. Click on New and create a data source named Streaming. Select the Analytic radio button and the .monPublic.sub analytic from the menu. Set the target process to mon_agg.

  • As we've defined this analytic to take in a dictionary as the parameter, select dict. Click on the eye icon to create a View State.

  • Click on New and name it symsList. Add the following properties.

Screenshot

  • Create another View State named symsDict and add symsList to its properties.

Screenshot

  • Set the Subscription to Streaming.

  • Now, execute the query to see if you get data back. If successful, you should see a table returned on the bottom:

Screenshot

  • Select Apply and Select Item.

  • You should see a table displaying where you dragged the data grid.

Execution Order

It is important to execute the data source in this order: Execute > Apply > Select Item.

  • Now add some highlighting rules.

  • From the righ-hand menu, select Highlight rules and add a rule to color the current avgCPU value green when it is lower than the previous value.

Screenshot

  • Similarly, add a highlight rule for the opposite condition and color it red.

Screenshot

  • Create the same highlighting rule for each of the columns excluding time and sym.

Screenshot

Back to top