File watcher
KX Stream contains a File Watcher (FW) component for loading batch data in from files. This data can be in a variety of formats; JSON, CSV, fixed-width binary etc. FW processes are configured with a list of directories and regex file patterns to scan for. If a file is located corresponding to a known pattern, the file will be loaded using custom-defined logic.
Directory and File Name
This section assumes that historical data will be placed under ${DELTADATA_HOME}/KxWarehouseData/filewatcher
directory. CPU data will be saved as CSV, Mem as JSON and Disk as fixed-width binary. Each file should contain a single day's worth of data of a single table. The file names will be comprised of the raw table, date, and the file format. e.g. monCPU_20190910.csv
Create a FW service class and assign the analytics that will read and load historical data from files.
Analytics
-
Create an analytic group named
fwCore
. -
Create an analytic for each of the three raw tables;
.mon.fileLoadCPU
,.mon.fileLoadMem
, and.mon.fileLoadDisk
. Add them to the group. -
Each of these analytics contain logic to read from different file types, save to disk, and reload the HDB. They take the same two parameters; directory and file name. This logic could be modified to publish data into the system as opposed to writing it to disk.
.mon.fileLoadCPU
{[dir;file]
.log.out[.z.h; "Replaying file"; file];
// Extract date from filename for partitioning
date:"D"$-4_ last "_" vs string file;
path:.Q.dd[dir;file];
// Load CSV and sort sym column to apply the parted attribute later
monCPU:`sym xasc ("PSF"; enlist ",") 0: path;
hdbDir:hsym .utils.checkForEnvVar["ENV=DELTADATA_HOME=/KxWarehouseData/hdblog"];
hdbPar:.Q.dd[hdbDir; date, `monCPU, `];
// Save to disk
hdbPar set .Q.en[hdbDir; monCPU];
// Apply parted attribute to the sym column
@[hdbPar; `sym; `p#];
// TODO: load hdb
}
Create the other two analytics by referencing below.
FW config
- Create a filewatcher config parameter.
- In the Attributes Values subtab, specify the following:
- filename regex
- directory
- analytic to be called for this filetype
File watcher processs
- Create a file watcher service class.
- Add the override parameter to the Service Params subtab
parameter | value |
---|---|
messagingServer | DS_MESSAGING_SERVER:DS |
fileListConfigName | DS_FILEWATCHER_MATCHABLE:KxWarehouse |
-
Add the
fwCore
analytic group. -
Run the service class.
Dummy historical data
In reality, the historical data would contain information that you want to migrate into the system.
For the purpose of this example, the code below creates and saves dummy data as a CSV, JSON and fixed-width binary text to your filewatcher directory.
- Call this function from any process.
saveHistData:{[days]
// Create past date
d:(.z.d - days);
ts:d+.z.n + til 1000;
date:ssr[string d; "."; ""];
host:`$"server_",/:("A","B","C","D");
fwDir:.utils.checkForEnvVar["ENV=DELTADATA_HOME=/KxWarehouseData/filewatcher"];
// If using the KxWarehouse package
// fwDir:`$getenv`KXW_FWDIR;
// Create dummy data
cpu:([]time:ts; sym:1000?host; usage:1000?80f);
mem:([]time:ts; sym:1000?host; virtual:1000?80f; physical:1000?80f);
disk:([]time:ts; sym:1000?host; usage:1000?80f);
// Change disk data format
fixedBinaryDisk:update 0x0 vs/: "j"$time, sym:"x"$15$'string sym, 0x0 vs/: usage from disk;
diskData:raze/[flip fixedBinaryDisk cols fixedBinaryDisk];
// Create paths
cpuPath:.Q.dd[hsym fwDir; `$"monCPU_",date,".csv"];
memPath:.Q.dd[hsym fwDir; `$"monMem_",date,".json"];
diskPath:.Q.dd[hsym fwDir; `$"monDisk_",date,".txt"];
// Save to directory
cpuPath 0: "," 0: cpu;
memPath 0: enlist .j.j mem;
diskPath 1: diskData;
}
// Call the function above
saveHistData[20];
- Once the files are placed, the filewatcher process will find them and call the relevant analytics.