Skip to content

Streaming appendix

.mon.streamingLogic

/ Set up subscription tracking table
.mon.streamingSubs:([id:`u#`long$()] syms:())

/ Insert a dummy row to set type of params
`.mon.streamingSubs upsert (-1; "s"$())

/ Initialize streaming ID
.mon.streamingID:0j;


/ Main publish function to be called on every update to a table
.mon.pubStreaming:{[table]
  toRun:1_ 0!.mon.streamingSubs;
  if[
    not count toRun;
    :()
  ];

  .mon.query[table] each toRun;
 }

/ Defines the function to select subscribed syms
.mon.query:{[table;x]
  s:x`syms;
  w:();
  if[count s;
    w:enlist(in; `sym; enlist s)
  ];

  t:?[table; w; 0b; ()];
  r:`time xcols 0!t;
  if[count r;
    .sa.pub[x`id; r]
  ];
 }


/ Define the unsubscribe function
.mon.unsubStreaming:{delete from `.mon.streamingSubs where id=x }


/ Define the "snap" function. This gives the latest state to provide an initial dataset
.mon.snapStreaming:{
  if[
    not count s:exec from .mon.streamingSubs where id=x;
    :()
  ];

  `.mon.query[`monAvgLoad; s];
 }

/ Registers the functions as streaming analytics 
.sa.registerfuncs[`.monPublic.sub; `.mon.unsubStreaming; `.mon.snapStreaming]

monStreamingWS.js

// Get input parameters
var HOST_PORT = "localhost:8080";
var USERNAME = "Administrator";
var PASSWORD = "password";
var METHOD = "subStreaming";

var input = process.argv.slice(2);

if(input.length > 0){
  if(input.length == 4){
    HOST_PORT = input[0];
    USERNAME = input[1];
    PASSWORD = input[2];
    METHOD = input[3];

  } else {
    console.log("Expected 4 parameters: HOST:PORT USERNAME PASSWORD METHOD. Received params: " + input.length);
    console.log("Continuing using default parameters");
  }
}

// Define variables
const HTTP_HOST = "http://" + HOST_PORT;
const BASE_URL = HTTP_HOST + "/connect/api";
var GROUP = "monPublic";
var PARAMS = {"syms" : ["server_A", "server_C"]};
const CONTENT_TYPE = "application/json";
var id = UUID();

// Define login info
var LOGIN_URL = BASE_URL + "/auth/login";
var LOGIN_DATA = {
  "msg" : [{
    "username" : USERNAME, 
    "password" : PASSWORD 
  }], 
  "type" : "LoginReq", 
  "id" : id, 
  "date" : new Date().toUTCString()
};

// All users accessing Connect API must login before sending further requests
// Call login API
loginRequest(LOGIN_URL, LOGIN_DATA);

function loginRequest(url, data) {
  console.log("Making a login request to " + LOGIN_URL + " with user " + USERNAME);

  return fetch(url, {
    credentials: "same-origin",
    method: "POST",
    body: JSON.stringify(data),
    headers: new Headers({
      "Content-Type": CONTENT_TYPE
    }),
  })
  // If response, webSocket Authorization
  .then(response => {
    authorizeWS(response)
  })
  .catch(error =>{
    console.error(error)
  })
}


var authorizeWS = function(response){
  // If login is unsuccessful, return status code
  if (response.status !== 200) {
        console.log("Error. Status Code: " + response.status)
        return;
  }

  // If login is successful, make a websocket connection
  response.json().then(function(data) {
    // Get session ID from login response
    var sessionId = data["msg"][0]["sessionId"];

    ws = new WebSocket("ws://coredev1:21010/connect/websocket");
    ws.binaryType = 'arraybuffer';

    console.log("connecting...");

    ws.onopen = function(){
      console.log("connected");

      var authPath = "/connect/websocket";

      var initAuthorization = createAuthorization(authPath, USERNAME, null, CONTENT_TYPE, new Date().toUTCString(), sessionId);

      var QUERY_DATA = {
        "type" : "WebSocketAuthenticationReq",
        "msg" : [
          {"authorization": initAuthorization}
        ],
        "id" : id,
        "date" : new Date().toUTCString()
      }

      var data = JSON.stringify(QUERY_DATA);

      ws.send(data);
      console.log("sent\n " + data);

      ws.onmessage=function(e){
        console.log("received\n" + e.data);
      }

      // var CONTENT_DATA = {
      //   "type" : "HeartbeatReq",
      //   "msg" : [
      //     {"userIdentifier": USERNAME + sessionId.substring(sessionId.length - 5)}
      //   ],
      //   "id" : UUID(),
      //   "date" : new Date().toUTCString()
      // };


      var symList = ["server_A", "server_C"];
      var CONTENT_DATA = createContentJson(symList);

      var authorization = createAuthorization(authPath, USERNAME, CONTENT_DATA, CONTENT_TYPE, new Date().toUTCString(), sessionId);

      var content = createContentJson(symList);
      var QUERY_DATA = {
        content,
        "authHeader" : {
          "authorization" : authorization
        }
      }

      var queryData = JSON.stringify(QUERY_DATA);
      ws.send(queryData);
      console.log("sent " + queryData);
    }

    ws.onclose=function(e){
      console.log("disconnected");
    };

    ws.onmessage=function(e){
      console.log("received\n" + e.data);
    }

    ws.onerror=function(e){console.log(e.data);}

  })
}

function createAuthorization(authPath, username, contentMsg, contentType, date, sessionId){
  // For the initial websocket authentication message, Content-MD5 is not required
  // For subsequent requests, include the Content-MD5 
  if (contentMsg !== null) {
    var content = JSON.stringify(contentMsg);
    // Content message must be hashed using MD5
    var contentMD5 = CryptoJS.MD5(content).toString();
    var contentType = contentMD5 + "\n" + contentType;
  }

  // Create signed input string
  var stringToSign = authPath + "\n" + username + "\n" + contentType + "\n" + date + "\n" + sessionId;
  console.log("stringToSign: " + stringToSign);

  // Create signature
  var hmacEncStr = CryptoJS.HmacSHA1(stringToSign, sessionId);
  console.log("HMAC:" + hmacEncStr);
  var signature = hmacEncStr.toString(CryptoJS.enc.Base64);

  // Authorization HTTP header = username + last 5 characters of the session Id : signature value
  var authorization = username + sessionId.substring(sessionId.length - 5) + ":" + signature;
  console.log("Auth:" + authorization + "\n");

  return authorization;
}


function createContentJson(symList){
  var data = {
    "group" : "monPublic",
    "method" : "subStreaming",
    "request" : {
      "type" : "SubStreamingReq",
      "msg" : [ {
        "syms" : symList
      } ],
      "id" : id,
      "date" : new Date().toUTCString()
    }
  };
  return data;
}