Appendix
.mon.feedLogic
HOST:`$"server_",/:("A","B","C","D");
BASE:HOST!20 35 50 65;
MV:HOST!4?rand 0.5f;
n:5;
CBase:MBase:DBase:();
.mon.createUsage:{[b]
base:BASE,b;
usage:HOST!{[host] sums?[n?1.<MV host;neg MV host; MV host]} each HOST;
(base HOST) + usage
}
.mon.genData:{[]
t: .z.p + 00:00:02 * til n;
ts:raze 4#enlist t;
hosts:raze flip n#enlist HOST;
CPU:.mon.createUsage[CBase];
CBase::last each CPU;
CUsage:raze {max(0;min(x;100))} each CPU;
MemV:.mon.createUsage[MBase];
MBase::last each MemV;
MVUsage:raze {max(0;min(x;100))} each MemV;
MPUsage:raze {min(x;100)} each (first 1?10f) + MVUsage;
Disk:.mon.createUsage[DBase];
DBase::last each Disk;
DUsage:raze {max(0;min(x;100))} each Disk;
CPUData:([] time:ts; sym:hosts; usage:CUsage);
memData:([] time:ts; sym:hosts; virtual:MVUsage; physical:MPUsage);
diskData:([] time:ts; sym:hosts; usage:DUsage);
// publish relevant data to each table
.dm.pub[`monCPU; CPUData];
.dm.pub[`monMem; memData];
.dm.pub[`monDisk; diskData];
}
// set a timer
.d.prcl.addFunctToTimer[`.mon.genData; () ; 0Nz; 0Nz; 10000; 1b];
.mon.initFeed
{[]
// load instruction
.al.loadinstruction[`.mon.feedLogic]
}
.mon.aggLogic
// function to initialize aggregated tables
.mon.initAggTables:{
`aggMonCPU set ([sym:`$()] size:`int$(); total:`float$());
`aggMonMem set ([sym:`$()] size:`int$(); totalV:`float$(); totalP:`float$());
`aggMonDisk set ([sym:`$()] size:`int$(); total:`float$());
};
// define callback functions
// for each, create a keyed table that sums + counts data as they come in
.mon.updAvgCPU: {[tab;data]
`aggMonCPU upsert update size:size + 0j^ aggMonCPU[([] sym:sym); `size], total:total + 0j^ aggMonCPU[([] sym:sym); `total] from select size:count time, total:sum usage by sym from data
};
.mon.updAvgMem: {[tab;data]
`aggMonMem upsert update size:size + 0j^ aggMonMem[([] sym:sym); `size], totalV:totalV + 0j^ aggMonMem[([] sym:sym); `totalV], totalP:totalP + 0j^ aggMonMem[([] sym:sym); `totalP] from select size:count time, totalV:sum virtual, totalP:sum physical by sym from data
};
.mon.updAvgDisk: {[tab;data]
`aggMonDisk upsert update size:size + 0j^ aggMonDisk[([] sym:sym); `size], total:total + 0j^ aggMonDisk[([] sym:sym); `total] from select size:count time, total:sum usage by sym from data
};
// add callback functions
.dm.addCallback[`monCPU; `.mon.updAvgCPU];
.dm.addCallback[`monMem; `.mon.updAvgMem];
.dm.addCallback[`monDisk; `.mon.updAvgDisk];
// define a function to aggregate, join and publish data to monAvgLoad table
.mon.updAvg: {[]
// add times column
// select columns + remove attribute from sym to match schema
// 0! to unkey joined tables
data: select time:.z.p, sym:`#sym, avgCPU, avgMemV, avgMemP, avgDisk from
0!(select avgCPU: first total%size by sym from aggMonCPU) uj
(select avgMemV: first totalV%size, avgMemP: first totalP%size by sym from aggMonMem) uj
(select avgDisk: first total%size by sym from aggMonDisk);
// upsert to and publish monAvgLoad
upsert[`monAvgLoad;data];
.d.pub[`monAvgLoad;data];
// set values of aggregated tables to 0
.mon.initAggTables[];
};
// set the timer to call the .mon.updAvg function every minute
.d.prcl.addFunctToTimer[`.mon.updAvg; () ; 0Nz; 0Nz; 60000; 1b];
.kxw.updCallbacks
{[tab;data]
.dm.applyCallbacks[tab; data];
}
.mon.initAgg
{[]
// load instruction
.al.loadinstruction[`.mon.aggLogic];
// initialize aggregated tables
.mon.initAggTables[];
}
.mon.fileLoadCPU
{[dir;file]
.log.out[.z.h; "Replaying file"; file];
// Extract date from filename for partitioning
date:"D"$-4_ last "_" vs string file;
path:.Q.dd[dir;file];
// Load CSV and sort sym column to apply the parted attribute later
monCPU:`sym xasc ("PSF"; enlist ",") 0: path;
hdbDir:hsym .utils.checkForEnvVar["ENV=DELTADATA_HOME=/KxWarehouseData/hdblog"];
hdbPar:.Q.dd[hdbDir; date, `monCPU, `];
// Save to disk
hdbPar set .Q.en[hdbDir; monCPU];
// Set attribute
@[hdbPar; `sym; `p#];
// TODO: load hdb
}
.mon.fileLoadMem
{[dir;file]
.log.out[.z.h; "Replaying file"; file];
// Extract date from filename for partitioning
date:"D"$-5_ last "_" vs string file;
path:.Q.dd[dir;file];
// Load JSON and sort sym column to apply the parted attribute later
monMem:`sym xasc update time:"p"$"Z"$time, sym:`$sym from k:.j.k first read0 path
hdbDir:hsym .utils.checkForEnvVar["ENV=DELTADATA_HOME=/KxWarehouseData/hdblog"];
hdbPar:.Q.dd[hdbDir; date, `monMem, `];
// Save
hdbPar set .Q.en[hdbDir; monMem];
// Set attribute
@[hdbPar; `sym; `p#];
}
.mon.fileLoadDisk
{[dir;file]
.log.out[.z.h; "Replaying file"; file];
// Extract date from filename for partitioning
date:"D"$-4_ last "_" vs string file;
path:.Q.dd[dir;file];
// Load fixed-width binary text and sort sym column to apply the parted attribute later
monDisk:`sym xasc update "p"$time, `$sym from flip `time`sym`usage!(8 15 8; "j*f") 1:path;
hdbDir:hsym .utils.checkForEnvVar["ENV=DELTADATA_HOME=/KxWarehouseData/hdblog"];
hdbPar:.Q.dd[hdbDir; date, `monDisk, `];
// Save
hdbPar set .Q.en[hdbDir; monDisk];
// Set attribute
@[hdbPar; `sym; `p#];
}
./startServicesCSV.sh
#!/bin/bash
# Source delta.profile
if [ "${DELTA_PROFILE_HOME}" != "" ]
then
if [ -f ${DELTA_PROFILE_HOME}/delta.profile ]
then
echo "Sourcing [${DELTA_PROFILE_HOME}/delta.profile]"
. ${DELTA_PROFILE_HOME}/delta.profile
else
echo "Error: No delta.profile found at \${DELTA_PROFILE_HOME} (${DELTA_PROFILE_HOME}) !"
exit 1
fi
elif [ "${DELTABIN_HOME}" != "" ]
then
if [ -f ${DELTABIN_HOME}/delta.profile ]
then
echo "Sourcing [${DELTABIN_HOME}/delta.profile]"
. ${DELTABIN_HOME}/delta.profile
else
echo "Error: No delta.profile found at \${DELTABIN_HOME} (${DELTABIN_HOME}) !"
exit 1
fi
else
if [ -f ../delta.profile ]
then
echo "Sourcing [../delta.profile]"
. ../delta.profile
else
echo "Error: cannot find delta.profile !"
exit 1
fi
fi
# Source delta_user.profile
if [ "${DELTAUSER_PROFILE_HOME}" != "" ]
then
if [ -f ${DELTAUSER_PROFILE_HOME}/delta_user.profile ]
then
echo "Sourcing [${DELTAUSER_PROFILE_HOME}/delta_user.profile]"
. ${DELTAUSER_PROFILE_HOME}/delta_user.profile
else
echo "Error: No delta_user.profile found at \${DELTAUSER_PROFILE_HOME} (${DELTAUSER_PROFILE_HOME}) !"
exit 1
fi
elif [ -f ${HOME}/delta_user.profile ]
then
echo "Sourcing [${HOME}/delta_user.profile]"
. ${HOME}/delta_user.profile
else
if [ -f ../delta_user.profile ]
then
echo "Sourcing [../delta_user.profile]"
. ../delta_user.profile
fi
fi
function processLine {
local cmd="${DELTABIN_HOME}/bin/startService.sh"
startcmd=""
# class name
if [ -z "$1" ]
then
echo "Class name can't be empty"
exit 1
else
cmd="${cmd} -i ${1}"
fi
# service name
if [ ! -z "$2" ]
then
cmd="${cmd} -n ${2}"
fi
# taskset
if [ ! -z "$3" ]
then
cmd="${cmd} -t \"${3}\""
fi
# logfile
if [ ! -z "$4" ]
then
cmd="${cmd} -l ${4}"
fi
# additional args
if [ ! -z "$5" ]
then
cmd="${cmd} -a ${5}"
fi
startcmd=${cmd}
}
while IFS=, read -r col1 col2 col3 col4 col5
do
processLine "$col1" "$col2" "$col3" "$col4" "$col5"
echo "Running service with command: ${startcmd}"
eval ${startcmd}
done < $1
cd ${SCRIPT_DIR}
./stopServicesCSV.sh
#!/bin/bash
# Source delta.profile
if [ "${DELTA_PROFILE_HOME}" != "" ]
then
if [ -f ${DELTA_PROFILE_HOME}/delta.profile ]
then
echo "Sourcing [${DELTA_PROFILE_HOME}/delta.profile]"
. ${DELTA_PROFILE_HOME}/delta.profile
else
echo "Error: No delta.profile found at \${DELTA_PROFILE_HOME} (${DELTA_PROFILE_HOME}) !"
exit 1
fi
elif [ "${DELTABIN_HOME}" != "" ]
then
if [ -f ${DELTABIN_HOME}/delta.profile ]
then
echo "Sourcing [${DELTABIN_HOME}/delta.profile]"
. ${DELTABIN_HOME}/delta.profile
else
echo "Error: No delta.profile found at \${DELTABIN_HOME} (${DELTABIN_HOME}) !"
exit 1
fi
else
if [ -f ../delta.profile ]
then
echo "Sourcing [../delta.profile]"
. ../delta.profile
else
echo "Error: cannot find delta.profile !"
exit 1
fi
fi
# Source delta_user.profile
if [ "${DELTAUSER_PROFILE_HOME}" != "" ]
then
if [ -f ${DELTAUSER_PROFILE_HOME}/delta_user.profile ]
then
echo "Sourcing [${DELTAUSER_PROFILE_HOME}/delta_user.profile]"
. ${DELTAUSER_PROFILE_HOME}/delta_user.profile
else
echo "Error: No delta_user.profile found at \${DELTAUSER_PROFILE_HOME} (${DELTAUSER_PROFILE_HOME}) !"
exit 1
fi
elif [ -f ${HOME}/delta_user.profile ]
then
echo "Sourcing [${HOME}/delta_user.profile]"
. ${HOME}/delta_user.profile
else
if [ -f ../delta_user.profile ]
then
echo "Sourcing [../delta_user.profile]"
. ../delta_user.profile
fi
fi
function processLine {
local cmd="${DELTABIN_HOME}/bin/stopService.sh"
stopcmd=""
# class name
if [ -z "$1" ]
then
echo "Class name can't be empty"
exit 1
else
cmd="${cmd} -i ${1}"
fi
# service name
if [ ! -z "$2" ]
then
cmd="${cmd} -n ${2}"
fi
# taskset
if [ ! -z "$3" ]
then
cmd="${cmd} -t \"${3}\""
fi
# logfile
if [ ! -z "$4" ]
then
cmd="${cmd} -l ${4}"
fi
# additional args
if [ ! -z "$5" ]
then
cmd="${cmd} -a ${5}"
fi
stopcmd=${cmd}
}
while IFS=, read -r col1 col2 col3 col4 col5
do
processLine "$col1" "$col2" "$col3" "$col4" "$col5"
echo "Stopping service with command: ${stopcmd}"
eval ${stopcmd}
done < $1
cd ${SCRIPT_DIR}
system.csv
kxw_tp,kxw_tp_1,,,
mon_feed,mon_feed_1,,,
kxw_rdb,kxw_rdb_1,,,
mon_agg,mon_agg_1,,,
kxw_lr,kxw_lr_1,,,
kxw_ihdb,kxw_ihdb_1,,,
kxw_hdb,kxw_hdb_1,,,