Skip to content

queryserver

Demonstrates a REST Server capable of running asynchronous query jobs

Jobs are run in worker processes that execute arbitrary qSQL and trigger a callback when done. A REST Client is expected to poll for async job results.

Resources for the server are organized as a list of projects containing lists of databases. Databases are organized as a list of tables.

Usage patterns:

GET /v1/hc                                                             Simply health check
GET /v1/projects                                                       List all projects
GET /v1/projects/{projectID}                                           List one project's attributes
GET /v1/projects/{projectID}/databases                                 List all databases for projectID
GET /v1/projects/{projectID}/databases/{databaseID}                    List one database's arrtibutes
GET /v1/projects/{projectID}/databases/{databaseID}/tables             List all tables for projectID + databaseID
GET /v1/projects/{projectID}/databases/{databaseID}/tables/{tableID}   List table arributes
POST /v1/projects                                                      Create a new project
POST /v1/projects/{projectID}                                          Update a project (rename/delete)
POST /v1/projects/{projectID}/databases                                Create a database
POST /v1/projects/{projectID}/databases/{databaseID}                   Update a database
POST /v1/projects/{projectID}/databases/{databaseID}/tables            Create a table
POST /v1/projects/{projectID}/databases/{databaseID}/tables/{tableID}  Update a table
GET /v1/projects/{projectID}/jobs/                                     List all running queries
POST /v1/projects/{projectID}/jobs/                                    Run a new query
GET /v1/projects/{projectID}/jobs/{jobID}                              Check on the status of a job
GET /v1/projects/{projectID}/jobs/{jobID}/results                      Get the JSONified query results
.rest:.com_kx_rest / Make an alias for convenience
.rest.init[enlist[`autoBind]!enlist[1b]];

// Querystring params
\d .demo
param.projectID:.rest.reg.data[`projectID;-7h;1b;0;"Project ID"]
param.databaseID:.rest.reg.data[`databaseID;10h;1b;"";"Database ID"]
param.tableID:.rest.reg.data[`tableID;10h;1b;"";"Table ID"]
param.jobID:.rest.reg.data[`jobID;-7h;1b;0;"Job ID"]

// Body params
.rest.reg.object[`project;
  .rest.reg.data[`name;-11h;0b;`;"Project name"],
  .rest.reg.data[`dir;10h;0b;"";"Project directory"]]
.rest.reg.object[`database;
  .rest.reg.data[`name;-11h;0b;`;"Database name"]]
.rest.reg.object[`table;
  .rest.reg.data[`name;-11h;0b;`;"Table name"],
  .rest.reg.data[`table;98h;0b;([] x:());"Table content"]]
.rest.reg.object[`job;
  param.databaseID,
  .rest.reg.data[`query;10h;0b;"";"q query"]]

param.project:.rest.reg.body[`project;0b;::;"Project information"]
param.database:.rest.reg.body[`database;0b;::;"Database information"]
param.table:.rest.reg.body[`table;0b;::;"Table information"]
param.job:.rest.reg.body[`job;0b;::;"Job information"]
.rest.register[`get;"/v1/hc";"simple healthcheck";{"ok"};()!()];

// GETTER API
.rest.register[`get;
  "/v1/projects";
  "List all projects";
  {.demo.projects};
  ::]

.rest.register[`get;
  "/v1/projects/{projectID}";
  "List one projects attributes";
  {first select from .demo.projects where id = x[`arg;`projectID]};
  param.projectID]

.rest.register[`get;
  "/v1/projects/{projectID}/databases";
  "List all databases for a project";
  {.demo.findDB};
  param.projectID]

.rest.register[`get;
  "/v1/projects/{projectID}/databases/{databaseID}";
  "List a databases attributes";
  {
    db:.demo.findDB x;
    enlist[`name]!enlist db `h
    };
  param.projectID,param.databaseID]

.rest.register[`get;
  "/v1/projects/{projectID}/databases/{databaseID}/tables";
  "List all tables in a database";
  {
    db:.demo.findDB x;
    key db `h
    };
  param.projectID,param.databaseID]

.rest.register[`get;
  "/v1/projects/{projectID}/databases/{databaseID}/tables/{tableID}";
  "List table attributes";
  {'`notImplemented};
  param.projectID,param.databaseID,param.tableID]

// CREATE APIs
.rest.register[`post;
  "/v1/projects";
  "Create a new project";
  {
    system "mkdir -p ",di:projRoot,x[`data;`dir];
    .demo.projects,:
      `name`id`dir`created!(x[`data;`name];count .demo.projects;di; .z.p);
    :last .demo.projects
  };
  param.project]

.rest.register[`post;
  "/v1/projects/{projectID}";
  "Update a project";
  {'`notImplemented};
  param.projectID]

.rest.register[`post;
  "/v1/projects/{projectID}/databases";
  "Add a database to a project";
  {
    proj:.demo.findProject x;
    dbh:hsym `$db:proj[`dir],"/",string n: x[`data;`name];
    if[() ~ key dbh; system "mkdir -p ",db];
    :enlist[`id]!enlist n
  };
  param.projectID,param.database]

.rest.register[`post;
  "/v1/projects/{projectID}/databases/{databaseID}";
  "Rename or delete a database";
  {"not implemented"};
  param.projectID,param.databaseID]

.rest.register[`post;
  "/v1/projects/{projectID}/databases/{databaseID}/tables";
  "Add a table to a database";
  { .demo.writePar x };
  param.projectID,param.databaseID,param.table]

.rest.register[`post;
    "/v1/projects/{projectID}/databases/{databaseID}/tables/{tableID}";
    "Add or overwrite dates in a table";
    {`notImplemented};
    param.projectID,param.databaseID,param.tableID]

// JOB API
.rest.register[`get;"/v1/projects/{projectID}/jobs";
  "List all jobs for a project";
  {select from .demo.jobs where projectID = x[`arg; `projectID];};
  param.projectID]

.rest.register[`post;
  "/v1/projects/{projectID}/jobs";
  "Submit a query job";
  {
    proj:findProject x;
    avail:first .demo.workers except exec worker from .demo.jobs;
    db:proj[`dir],"/",x[`data;`databaseID];
    neg[avail] (`.demo.runQuery; db; x[`data;`query]);
    .demo.jobs,:`id`worker`projectID`status!(count .demo.jobs;
      avail;proj `id;`active);
    :last .demo.jobs
  };
  param.projectID,param.job]

.rest.register[`get;
  "/v1/projects/{projectID}/jobs/{jobID}";
  "List details of a job";
  {findJob x};
  param.projectID,param.jobID]

.rest.register[`get;
  "/v1/projects/{projectID}/jobs/{jobID}/results";
  "Get results for a finished job";
  {
    job:select from findJob[x] where status=`done;
    if[1 <> count job;'"Job not finished"];
    job[`worker] ".demo.results"
  };
  param.projectID,param.jobID]

projRoot:system["cd "],"/exampleProjects/"
projects:([] name:(); id:"j"$(); dir:(); created:"p"$())

findProject:{
    proj:select from .demo.projects where id = x[`arg;`projectID];
    if[0 = count proj; '"No such project ", x[`arg, `projectID]];
    :first proj; }

findDB:{
    proj:findProject x;
    dbh:hsym `$db:proj[`dir],"/",n: x[`arg;`databaseID];
    if[() ~ key dbh;'"Database does not exist: ", n];
    :`name`h!(n;dbh) }

findJob:{
    select from .demo.jobs where projectID = x[`arg;`projectID],
        id = x[`arg;`jobID] }

writePar:{
    db:findDB x;
    name:x[`data;`name];
    t:x[`data;`table];
    t[`date]:"D"$t `date;
    a:cols[t] except `date;
    kt:?[t;();`date;a!a];
    (`$string key[kt]) {[root;name;date;d]
        (` sv root,date,name,`) set flip d
        }[db `h;name]' value kt;
    :name }

done:{ .demo.jobs:update status:`done from .demo.jobs where worker = .z.w;}
maxWait:00:00:05
i:0
n:10
workers:()
jobs:([] id:"j"$(); worker:"I"$(); projectID:"j"$(); status:`$())
\d .

.z.po:{.demo.i+:1;}
.z.ts:{[start;now]
  if [now > start + .demo.maxWait;
    -2 "Took longer than ",string[.demo.maxWait],
      " to start ",string[.demo.n]," workers";
    -2 "Exiting...";
    exit 1];
  // Clear timer and uninstall .z.po
  if[.demo.n = count .z.W;
    system "t 0";
    .z.po:{};
    .demo.workers:key .z.W] 
  }[.z.p;]

do[10; system "q queryworker.q -server ",string system "p"]
\t 1000