Skip to content

Appendix

.mon.feedLogic

HOST:`$"server_",/:("A","B","C","D");
BASE:HOST!20 35 50 65;
MV:HOST!4?rand 0.5f;
n:5;
CBase:MBase:DBase:();

.mon.createUsage:{[b]
  base:BASE,b;
  usage:HOST!{[host] sums?[n?1.<MV host;neg MV host; MV host]} each HOST;
  (base HOST) + usage
 }

.mon.genData:{[]
  t: .z.p + 00:00:02 * til n;
  ts:raze 4#enlist t; 
  hosts:raze flip n#enlist HOST;

  CPU:.mon.createUsage[CBase];
  CBase::last each CPU;
  CUsage:raze {max(0;min(x;100))} each CPU;

  MemV:.mon.createUsage[MBase];
  MBase::last each MemV;
  MVUsage:raze {max(0;min(x;100))} each MemV;
  MPUsage:raze {min(x;100)} each (first 1?10f) + MVUsage;

  Disk:.mon.createUsage[DBase];
  DBase::last each Disk;
  DUsage:raze {max(0;min(x;100))} each Disk;

  CPUData:([] time:ts; sym:hosts; usage:CUsage);
  memData:([] time:ts; sym:hosts; virtual:MVUsage; physical:MPUsage);
  diskData:([] time:ts; sym:hosts; usage:DUsage);

  // publish relevant data to each table
  .dm.pub[`monCPU; CPUData];
  .dm.pub[`monMem; memData];
  .dm.pub[`monDisk; diskData];
 }

// set a timer
.d.prcl.addFunctToTimer[`.mon.genData; () ; 0Nz; 0Nz; 10000; 1b];

.mon.initFeed

{[]
// load instruction
.al.loadinstruction[`.mon.feedLogic]
 }

.mon.aggLogic

// function to initialize aggregated tables
.mon.initAggTables:{
  `aggMonCPU set ([sym:`$()] size:`int$(); total:`float$());
  `aggMonMem set ([sym:`$()] size:`int$(); totalV:`float$(); totalP:`float$());
  `aggMonDisk set ([sym:`$()] size:`int$(); total:`float$());
 };


// define callback functions
// for each, create a keyed table that sums + counts data as they come in
.mon.updAvgCPU: {[tab;data]
    `aggMonCPU upsert update size:size + 0j^ aggMonCPU[([] sym:sym); `size], total:total + 0j^ aggMonCPU[([] sym:sym); `total] from select size:count time, total:sum usage by sym from data
 };

.mon.updAvgMem: {[tab;data]
    `aggMonMem upsert update size:size + 0j^ aggMonMem[([] sym:sym); `size], totalV:totalV + 0j^ aggMonMem[([] sym:sym); `totalV], totalP:totalP + 0j^ aggMonMem[([] sym:sym); `totalP]  from select size:count time, totalV:sum virtual, totalP:sum physical by sym from data
 };

.mon.updAvgDisk: {[tab;data]
    `aggMonDisk upsert update size:size + 0j^ aggMonDisk[([] sym:sym); `size], total:total + 0j^ aggMonDisk[([] sym:sym); `total] from select size:count time, total:sum usage by sym from data
 };


// add callback functions
.dm.addCallback[`monCPU; `.mon.updAvgCPU];
.dm.addCallback[`monMem; `.mon.updAvgMem];
.dm.addCallback[`monDisk; `.mon.updAvgDisk];


// define a function to aggregate, join and publish data to monAvgLoad table
.mon.updAvg: {[]
  // add times column
  // select columns + remove attribute from sym to match schema
  // 0! to unkey joined tables
  data: select time:.z.p, sym:`#sym, avgCPU, avgMemV, avgMemP, avgDisk from 
    0!(select avgCPU: first total%size by sym from aggMonCPU) uj 
    (select avgMemV: first totalV%size, avgMemP: first totalP%size by sym from aggMonMem) uj 
    (select avgDisk: first total%size by sym from aggMonDisk);

  // upsert to and publish monAvgLoad 
  upsert[`monAvgLoad;data];
  .d.pub[`monAvgLoad;data];

  // set values of aggregated tables to 0
  .mon.initAggTables[];
 };

  // set the timer to call the .mon.updAvg function every minute
.d.prcl.addFunctToTimer[`.mon.updAvg; () ; 0Nz; 0Nz; 60000; 1b];

.kxw.updCallbacks

{[tab;data]
  .dm.applyCallbacks[tab; data];
 }

.mon.initAgg

{[]
  // load instruction
  .al.loadinstruction[`.mon.aggLogic];

  // initialize aggregated tables
  .mon.initAggTables[];
 }

.mon.fileLoadCPU

{[dir;file]
  .log.out[.z.h; "Replaying file"; file];  

  // Extract date from filename for partitioning 
  date:"D"$-4_ last "_" vs string file;
  path:.Q.dd[dir;file];

  // Load CSV and sort sym column to apply the parted attribute later
  monCPU:`sym xasc ("PSF"; enlist ",") 0: path;
  hdbDir:hsym .utils.checkForEnvVar["ENV=DELTADATA_HOME=/KxWarehouseData/hdblog"];
  hdbPar:.Q.dd[hdbDir; date, `monCPU, `];

  // Save to disk  
  hdbPar set .Q.en[hdbDir; monCPU];
  // Set attribute
  @[hdbPar; `sym; `p#];

  // TODO: load hdb
 }

.mon.fileLoadMem

{[dir;file]
  .log.out[.z.h; "Replaying file"; file];

  // Extract date from filename for partitioning 
  date:"D"$-5_ last "_" vs string file;
  path:.Q.dd[dir;file];

  // Load JSON and sort sym column to apply the parted attribute later
  monMem:`sym xasc update time:"p"$"Z"$time, sym:`$sym from k:.j.k first read0 path
  hdbDir:hsym .utils.checkForEnvVar["ENV=DELTADATA_HOME=/KxWarehouseData/hdblog"];
  hdbPar:.Q.dd[hdbDir; date, `monMem, `];

  // Save
  hdbPar set .Q.en[hdbDir; monMem];
  // Set attribute
  @[hdbPar; `sym; `p#];
 }

.mon.fileLoadDisk

{[dir;file]
  .log.out[.z.h; "Replaying file"; file];

  // Extract date from filename for partitioning 
  date:"D"$-4_ last "_" vs string file;
  path:.Q.dd[dir;file];

  // Load fixed-width binary text and sort sym column to apply the parted attribute later
  monDisk:`sym xasc update "p"$time, `$sym from flip `time`sym`usage!(8 15 8; "j*f") 1:path;
  hdbDir:hsym .utils.checkForEnvVar["ENV=DELTADATA_HOME=/KxWarehouseData/hdblog"];
  hdbPar:.Q.dd[hdbDir; date, `monDisk, `];

  // Save
  hdbPar set .Q.en[hdbDir; monDisk];
  // Set attribute
  @[hdbPar; `sym; `p#];    
 }

./startServicesCSV.sh

#!/bin/bash

# Source delta.profile
if [ "${DELTA_PROFILE_HOME}" != "" ]
then
    if [ -f ${DELTA_PROFILE_HOME}/delta.profile ]
    then
        echo "Sourcing [${DELTA_PROFILE_HOME}/delta.profile]"
        . ${DELTA_PROFILE_HOME}/delta.profile
    else
        echo "Error: No delta.profile found at \${DELTA_PROFILE_HOME} (${DELTA_PROFILE_HOME}) !"
        exit 1
    fi
elif [ "${DELTABIN_HOME}" != "" ]
then
    if [ -f ${DELTABIN_HOME}/delta.profile ]
    then
        echo "Sourcing [${DELTABIN_HOME}/delta.profile]"
        . ${DELTABIN_HOME}/delta.profile
    else
        echo "Error: No delta.profile found at \${DELTABIN_HOME} (${DELTABIN_HOME}) !"
        exit 1
    fi
else
    if [ -f ../delta.profile ]
    then
        echo "Sourcing [../delta.profile]"
        . ../delta.profile
    else
        echo "Error: cannot find delta.profile !"
        exit 1
    fi
fi

# Source delta_user.profile
if [ "${DELTAUSER_PROFILE_HOME}" != "" ]
then
    if [ -f ${DELTAUSER_PROFILE_HOME}/delta_user.profile ]
    then
        echo "Sourcing [${DELTAUSER_PROFILE_HOME}/delta_user.profile]"
        . ${DELTAUSER_PROFILE_HOME}/delta_user.profile
    else
        echo "Error: No delta_user.profile found at \${DELTAUSER_PROFILE_HOME} (${DELTAUSER_PROFILE_HOME}) !"
        exit 1
    fi
elif [ -f ${HOME}/delta_user.profile ]
then
    echo "Sourcing [${HOME}/delta_user.profile]"
    . ${HOME}/delta_user.profile
else
    if [ -f ../delta_user.profile ]
    then
        echo "Sourcing [../delta_user.profile]"
        . ../delta_user.profile
    fi
fi

function processLine {
  local cmd=""${DELTABIN_HOME}/bin/startService.sh"
  startcmd=""

  # class name
  if [ -z "$1" ]
  then
    echo "Class name can't be empty"
    exit 1
  else
    cmd="${cmd} -i ${1}"
  fi

  # service name
  if [ ! -z "$2" ]
  then
    cmd="${cmd} -n ${2}"
  fi

  # taskset
  if [ ! -z "$3" ]
  then
    cmd="${cmd} -t \"${3}\""
  fi

  # logfile
  if [ ! -z "$4" ]
  then
    cmd="${cmd} -l ${4}"
  fi

  # additional args
  if [ ! -z "$5" ]
  then
    cmd="${cmd} -a ${5}"
  fi

  startcmd=${cmd}
}


while IFS=, read -r col1 col2 col3 col4 col5
do
  processLine "$col1" "$col2" "$col3" "$col4" "$col5"
  echo "Running service with command: ${startcmd}"
  eval ${startcmd}
done < $1

cd ${SCRIPT_DIR}

./stopServicesCSV.sh

#!/bin/bash

# Source delta.profile
if [ "${DELTA_PROFILE_HOME}" != "" ]
then
    if [ -f ${DELTA_PROFILE_HOME}/delta.profile ]
    then
        echo "Sourcing [${DELTA_PROFILE_HOME}/delta.profile]"
        . ${DELTA_PROFILE_HOME}/delta.profile
    else
        echo "Error: No delta.profile found at \${DELTA_PROFILE_HOME} (${DELTA_PROFILE_HOME}) !"
        exit 1
    fi
elif [ "${DELTABIN_HOME}" != "" ]
then
    if [ -f ${DELTABIN_HOME}/delta.profile ]
    then
        echo "Sourcing [${DELTABIN_HOME}/delta.profile]"
        . ${DELTABIN_HOME}/delta.profile
    else
        echo "Error: No delta.profile found at \${DELTABIN_HOME} (${DELTABIN_HOME}) !"
        exit 1
    fi
else
    if [ -f ../delta.profile ]
    then
        echo "Sourcing [../delta.profile]"
        . ../delta.profile
    else
        echo "Error: cannot find delta.profile !"
        exit 1
    fi
fi

# Source delta_user.profile
if [ "${DELTAUSER_PROFILE_HOME}" != "" ]
then
    if [ -f ${DELTAUSER_PROFILE_HOME}/delta_user.profile ]
    then
        echo "Sourcing [${DELTAUSER_PROFILE_HOME}/delta_user.profile]"
        . ${DELTAUSER_PROFILE_HOME}/delta_user.profile
    else
        echo "Error: No delta_user.profile found at \${DELTAUSER_PROFILE_HOME} (${DELTAUSER_PROFILE_HOME}) !"
        exit 1
    fi
elif [ -f ${HOME}/delta_user.profile ]
then
    echo "Sourcing [${HOME}/delta_user.profile]"
    . ${HOME}/delta_user.profile
else
    if [ -f ../delta_user.profile ]
    then
        echo "Sourcing [../delta_user.profile]"
        . ../delta_user.profile
    fi
fi


function processLine {
  local cmd="${DELTABIN_HOME}/bin/stopService.sh"
  stopcmd=""

  # class name
  if [ -z "$1" ]
  then
    echo "Class name can't be empty"
    exit 1
  else
    cmd="${cmd} -i ${1}"
  fi

  # service name
  if [ ! -z "$2" ]
  then
    cmd="${cmd} -n ${2}"
  fi

  # taskset
  if [ ! -z "$3" ]
  then
    cmd="${cmd} -t \"${3}\""
  fi

  # logfile
  if [ ! -z "$4" ]
  then
    cmd="${cmd} -l ${4}"
  fi

  # additional args
  if [ ! -z "$5" ]
  then
    cmd="${cmd} -a ${5}"
  fi

  stopcmd=${cmd}
}

while IFS=, read -r col1 col2 col3 col4 col5
do
  processLine "$col1" "$col2" "$col3" "$col4" "$col5"
  echo "Stopping service with command: ${stopcmd}"
  eval ${stopcmd}
done < $1

cd ${SCRIPT_DIR}

system.csv

kxw_tp,kxw_tp_1,,,
mon_feed,mon_feed_1,,,
kxw_rdb,kxw_rdb_1,,,
mon_agg,mon_agg_1,,,
kxw_lr,kxw_lr_1,,,
kxw_ihdb,kxw_ihdb_1,,,
kxw_hdb,kxw_hdb_1,,,