Skip to content

File watcher

Kx Stream contains a File Watcher (FW) component for loading batch data in from files. This data can be in a variety of formats; JSON, CSV, fixed-width binary etc. FW processes are configured with a list of directories and regex file patterns to scan for. If a file is located corresponding to a known pattern, the file will be loaded using custom-defined logic.

Directory and File Name

This section assumes that historical data will be placed under ${DELTADATA_HOME}/KxWarehouseData/filewatcher directory. CPU data will be saved as CSV, Mem as JSON and Disk as fixed-width binary. Each file should contain a single day's worth of data of a single table. The file names will be comprised of the raw table, date, and the file format. e.g. monCPU_20190910.csv

Create a FW service class and assign the analytics that will read and load historical data from files.

Analytics

  • Create an analytic group named fwCore.

  • Create an analytic for each of the three raw tables; .mon.fileLoadCPU, .mon.fileLoadMem, and .mon.fileLoadDisk. Add them to the group.

  • Each of these analytics contain logic to read from different file types, save to disk, and reload the HDB. They take the same two parameters; directory and file name. This logic could be modified to publish data into the system as opposed to writing it to disk.

Screenshot

.mon.fileLoadCPU

{[dir;file]
  .log.out[.z.h; "Replaying file"; file];

  // Extract date from filename for partitioning 
  date:"D"$-4_ last "_" vs string file;
  path:.Q.dd[dir;file];

  // Load CSV and sort sym column to apply the parted attribute later
  monCPU:`sym xasc ("PSF"; enlist ",") 0: path;
  hdbDir:hsym .utils.checkForEnvVar["ENV=DELTADATA_HOME=/KxWarehouseData/hdblog"];
  hdbPar:.Q.dd[hdbDir; date, `monCPU, `];

  // Save to disk  
  hdbPar set .Q.en[hdbDir; monCPU];
  // Apply parted attribute to the sym column
  @[hdbPar; `sym; `p#];

  // TODO: load hdb
 }

.mon.fileLoadCPU

Create the other two analytics by referencing below.

.mon.fileLoadMem

.mon.fileLoadDisk

FW config

  • Create a filewatcher config parameter.

Screenshot

  • In the Attributes Values subtab, specify the following:
    • filename regex
    • directory
    • analytic to be called for this filetype

Screenshot

File watcher processs

  • Create a file watcher service class.

Screenshot

  • Add the override parameter to the Service Params subtab
parameter value
messagingServer DS_MESSAGING_SERVER:DS
fileListConfigName DS_FILEWATCHER_MATCHABLE:KxWarehouse
  • Add the fwCore analytic group.

  • Run the service class.

Dummy historical data

In reality, the historical data would contain information that you want to migrate into the system.

For the purpose of this example, the code below creates and saves dummy data as a CSV, JSON and fixed-width binary text to your filewatcher directory.

  • Call this function from any process.
saveHistData:{[days]

  // Create past date
  d:(.z.d - days);
  ts:d+.z.n + til 1000;
  date:ssr[string d; "."; ""];

  host:`$"server_",/:("A","B","C","D");
  fwDir:.utils.checkForEnvVar["ENV=DELTADATA_HOME=/KxWarehouseData/filewatcher"];
  // If using the KxWarehouse package
  // fwDir:`$getenv`KXW_FWDIR;

  // Create dummy data 
  cpu:([]time:ts; sym:1000?host; usage:1000?80f);
  mem:([]time:ts; sym:1000?host; virtual:1000?80f; physical:1000?80f);
  disk:([]time:ts; sym:1000?host; usage:1000?80f);
  // Change disk data format
  fixedBinaryDisk:update 0x0 vs/: "j"$time, sym:"x"$15$'string sym, 0x0 vs/: usage from disk;
  diskData:raze/[flip fixedBinaryDisk cols fixedBinaryDisk];

  // Create paths
  cpuPath:.Q.dd[hsym fwDir; `$"monCPU_",date,".csv"];
  memPath:.Q.dd[hsym fwDir; `$"monMem_",date,".json"];
  diskPath:.Q.dd[hsym fwDir; `$"monDisk_",date,".txt"];

  // Save to directory
  cpuPath 0: "," 0: cpu; 
  memPath 0: enlist .j.j mem; 
  diskPath 1: diskData;
 }

// Call the function above
saveHistData[20];
  • Once the files are placed, the filewatcher process will find them and call the relevant analytics.