Streaming appendix
.mon.streamingLogic
/ Set up subscription tracking table
.mon.streamingSubs:([id:`u#`long$()] syms:())
/ Insert a dummy row to set type of params
`.mon.streamingSubs upsert (-1; "s"$())
/ Initialize streaming ID
.mon.streamingID:0j;
/ Main publish function to be called on every update to a table
.mon.pubStreaming:{[table]
toRun:1_ 0!.mon.streamingSubs;
if[
not count toRun;
:()
];
.mon.query[table] each toRun;
}
/ Defines the function to select subscribed syms
.mon.query:{[table;x]
s:x`syms;
w:();
if[count s;
w:enlist(in; `sym; enlist s)
];
t:?[table; w; 0b; ()];
r:`time xcols 0!t;
if[count r;
.sa.pub[x`id; r]
];
}
/ Define the unsubscribe function
.mon.unsubStreaming:{delete from `.mon.streamingSubs where id=x }
/ Define the "snap" function. This gives the latest state to provide an initial dataset
.mon.snapStreaming:{
if[
not count s:exec from .mon.streamingSubs where id=x;
:()
];
`.mon.query[`monAvgLoad; s];
}
/ Registers the functions as streaming analytics
.sa.registerfuncs[`.monPublic.sub; `.mon.unsubStreaming; `.mon.snapStreaming]
monStreamingWS.js
// Get input parameters
var HOST_PORT = "localhost:8080";
var USERNAME = "Administrator";
var PASSWORD = "password";
var METHOD = "subStreaming";
var input = process.argv.slice(2);
if(input.length > 0){
if(input.length == 4){
HOST_PORT = input[0];
USERNAME = input[1];
PASSWORD = input[2];
METHOD = input[3];
} else {
console.log("Expected 4 parameters: HOST:PORT USERNAME PASSWORD METHOD. Received params: " + input.length);
console.log("Continuing using default parameters");
}
}
// Define variables
const HTTP_HOST = "http://" + HOST_PORT;
const BASE_URL = HTTP_HOST + "/connect/api";
var GROUP = "monPublic";
var PARAMS = {"syms" : ["server_A", "server_C"]};
const CONTENT_TYPE = "application/json";
var id = UUID();
// Define login info
var LOGIN_URL = BASE_URL + "/auth/login";
var LOGIN_DATA = {
"msg" : [{
"username" : USERNAME,
"password" : PASSWORD
}],
"type" : "LoginReq",
"id" : id,
"date" : new Date().toUTCString()
};
// All users accessing Connect API must login before sending further requests
// Call login API
loginRequest(LOGIN_URL, LOGIN_DATA);
function loginRequest(url, data) {
console.log("Making a login request to " + LOGIN_URL + " with user " + USERNAME);
return fetch(url, {
credentials: "same-origin",
method: "POST",
body: JSON.stringify(data),
headers: new Headers({
"Content-Type": CONTENT_TYPE
}),
})
// If response, webSocket Authorization
.then(response => {
authorizeWS(response)
})
.catch(error =>{
console.error(error)
})
}
var authorizeWS = function(response){
// If login is unsuccessful, return status code
if (response.status !== 200) {
console.log("Error. Status Code: " + response.status)
return;
}
// If login is successful, make a websocket connection
response.json().then(function(data) {
// Get session ID from login response
var sessionId = data["msg"][0]["sessionId"];
ws = new WebSocket("ws://coredev1:21010/connect/websocket");
ws.binaryType = 'arraybuffer';
console.log("connecting...");
ws.onopen = function(){
console.log("connected");
var authPath = "/connect/websocket";
var initAuthorization = createAuthorization(authPath, USERNAME, null, CONTENT_TYPE, new Date().toUTCString(), sessionId);
var QUERY_DATA = {
"type" : "WebSocketAuthenticationReq",
"msg" : [
{"authorization": initAuthorization}
],
"id" : id,
"date" : new Date().toUTCString()
}
var data = JSON.stringify(QUERY_DATA);
ws.send(data);
console.log("sent\n " + data);
ws.onmessage=function(e){
console.log("received\n" + e.data);
}
// var CONTENT_DATA = {
// "type" : "HeartbeatReq",
// "msg" : [
// {"userIdentifier": USERNAME + sessionId.substring(sessionId.length - 5)}
// ],
// "id" : UUID(),
// "date" : new Date().toUTCString()
// };
var symList = ["server_A", "server_C"];
var CONTENT_DATA = createContentJson(symList);
var authorization = createAuthorization(authPath, USERNAME, CONTENT_DATA, CONTENT_TYPE, new Date().toUTCString(), sessionId);
var content = createContentJson(symList);
var QUERY_DATA = {
content,
"authHeader" : {
"authorization" : authorization
}
}
var queryData = JSON.stringify(QUERY_DATA);
ws.send(queryData);
console.log("sent " + queryData);
}
ws.onclose=function(e){
console.log("disconnected");
};
ws.onmessage=function(e){
console.log("received\n" + e.data);
}
ws.onerror=function(e){console.log(e.data);}
})
}
function createAuthorization(authPath, username, contentMsg, contentType, date, sessionId){
// For the initial websocket authentication message, Content-MD5 is not required
// For subsequent requests, include the Content-MD5
if (contentMsg !== null) {
var content = JSON.stringify(contentMsg);
// Content message must be hashed using MD5
var contentMD5 = CryptoJS.MD5(content).toString();
var contentType = contentMD5 + "\n" + contentType;
}
// Create signed input string
var stringToSign = authPath + "\n" + username + "\n" + contentType + "\n" + date + "\n" + sessionId;
console.log("stringToSign: " + stringToSign);
// Create signature
var hmacEncStr = CryptoJS.HmacSHA1(stringToSign, sessionId);
console.log("HMAC:" + hmacEncStr);
var signature = hmacEncStr.toString(CryptoJS.enc.Base64);
// Authorization HTTP header = username + last 5 characters of the session Id : signature value
var authorization = username + sessionId.substring(sessionId.length - 5) + ":" + signature;
console.log("Auth:" + authorization + "\n");
return authorization;
}
function createContentJson(symList){
var data = {
"group" : "monPublic",
"method" : "subStreaming",
"request" : {
"type" : "SubStreamingReq",
"msg" : [ {
"syms" : symList
} ],
"id" : id,
"date" : new Date().toUTCString()
}
};
return data;
}