queryserver
Demonstrates a REST Server capable of running asynchronous query jobs
Jobs are run in worker processes that execute arbitrary qSQL and trigger a callback when done. A REST Client is expected to poll for async job results.
Resources for the server are organized as a list of projects containing lists of databases. Databases are organized as a list of tables.
Usage patterns:
GET /v1/hc Simply health check
GET /v1/projects List all projects
GET /v1/projects/{projectID} List one project's attributes
GET /v1/projects/{projectID}/databases List all databases for projectID
GET /v1/projects/{projectID}/databases/{databaseID} List one database's arrtibutes
GET /v1/projects/{projectID}/databases/{databaseID}/tables List all tables for projectID + databaseID
GET /v1/projects/{projectID}/databases/{databaseID}/tables/{tableID} List table arributes
POST /v1/projects Create a new project
POST /v1/projects/{projectID} Update a project (rename/delete)
POST /v1/projects/{projectID}/databases Create a database
POST /v1/projects/{projectID}/databases/{databaseID} Update a database
POST /v1/projects/{projectID}/databases/{databaseID}/tables Create a table
POST /v1/projects/{projectID}/databases/{databaseID}/tables/{tableID} Update a table
GET /v1/projects/{projectID}/jobs/ List all running queries
POST /v1/projects/{projectID}/jobs/ Run a new query
GET /v1/projects/{projectID}/jobs/{jobID} Check on the status of a job
GET /v1/projects/{projectID}/jobs/{jobID}/results Get the JSONified query results
.rest:.com_kx_rest / Make an alias for convenience
.rest.init[enlist[`autoBind]!enlist[1b]];
// Querystring params
\d .demo
param.projectID:.rest.reg.data[`projectID;-7h;1b;0;"Project ID"]
param.databaseID:.rest.reg.data[`databaseID;10h;1b;"";"Database ID"]
param.tableID:.rest.reg.data[`tableID;10h;1b;"";"Table ID"]
param.jobID:.rest.reg.data[`jobID;-7h;1b;0;"Job ID"]
// Body params
.rest.reg.object[`project;
.rest.reg.data[`name;-11h;0b;`;"Project name"],
.rest.reg.data[`dir;10h;0b;"";"Project directory"]]
.rest.reg.object[`database;
.rest.reg.data[`name;-11h;0b;`;"Database name"]]
.rest.reg.object[`table;
.rest.reg.data[`name;-11h;0b;`;"Table name"],
.rest.reg.data[`table;98h;0b;([] x:());"Table content"]]
.rest.reg.object[`job;
param.databaseID,
.rest.reg.data[`query;10h;0b;"";"q query"]]
param.project:.rest.reg.body[`project;0b;::;"Project information"]
param.database:.rest.reg.body[`database;0b;::;"Database information"]
param.table:.rest.reg.body[`table;0b;::;"Table information"]
param.job:.rest.reg.body[`job;0b;::;"Job information"]
.rest.register[`get;"/v1/hc";"simple healthcheck";{"ok"};()!()];
// GETTER API
.rest.register[`get;
"/v1/projects";
"List all projects";
{.demo.projects};
::]
.rest.register[`get;
"/v1/projects/{projectID}";
"List one projects attributes";
{first select from .demo.projects where id = x[`arg;`projectID]};
param.projectID]
.rest.register[`get;
"/v1/projects/{projectID}/databases";
"List all databases for a project";
{.demo.findDB};
param.projectID]
.rest.register[`get;
"/v1/projects/{projectID}/databases/{databaseID}";
"List a databases attributes";
{
db:.demo.findDB x;
enlist[`name]!enlist db `h
};
param.projectID,param.databaseID]
.rest.register[`get;
"/v1/projects/{projectID}/databases/{databaseID}/tables";
"List all tables in a database";
{
db:.demo.findDB x;
key db `h
};
param.projectID,param.databaseID]
.rest.register[`get;
"/v1/projects/{projectID}/databases/{databaseID}/tables/{tableID}";
"List table attributes";
{'`notImplemented};
param.projectID,param.databaseID,param.tableID]
// CREATE APIs
.rest.register[`post;
"/v1/projects";
"Create a new project";
{
system "mkdir -p ",di:projRoot,x[`data;`dir];
.demo.projects,:
`name`id`dir`created!(x[`data;`name];count .demo.projects;di; .z.p);
:last .demo.projects
};
param.project]
.rest.register[`post;
"/v1/projects/{projectID}";
"Update a project";
{'`notImplemented};
param.projectID]
.rest.register[`post;
"/v1/projects/{projectID}/databases";
"Add a database to a project";
{
proj:.demo.findProject x;
dbh:hsym `$db:proj[`dir],"/",string n: x[`data;`name];
if[() ~ key dbh; system "mkdir -p ",db];
:enlist[`id]!enlist n
};
param.projectID,param.database]
.rest.register[`post;
"/v1/projects/{projectID}/databases/{databaseID}";
"Rename or delete a database";
{"not implemented"};
param.projectID,param.databaseID]
.rest.register[`post;
"/v1/projects/{projectID}/databases/{databaseID}/tables";
"Add a table to a database";
{ .demo.writePar x };
param.projectID,param.databaseID,param.table]
.rest.register[`post;
"/v1/projects/{projectID}/databases/{databaseID}/tables/{tableID}";
"Add or overwrite dates in a table";
{`notImplemented};
param.projectID,param.databaseID,param.tableID]
// JOB API
.rest.register[`get;"/v1/projects/{projectID}/jobs";
"List all jobs for a project";
{select from .demo.jobs where projectID = x[`arg; `projectID];};
param.projectID]
.rest.register[`post;
"/v1/projects/{projectID}/jobs";
"Submit a query job";
{
proj:findProject x;
avail:first .demo.workers except exec worker from .demo.jobs;
db:proj[`dir],"/",x[`data;`databaseID];
neg[avail] (`.demo.runQuery; db; x[`data;`query]);
.demo.jobs,:`id`worker`projectID`status!(count .demo.jobs;
avail;proj `id;`active);
:last .demo.jobs
};
param.projectID,param.job]
.rest.register[`get;
"/v1/projects/{projectID}/jobs/{jobID}";
"List details of a job";
{findJob x};
param.projectID,param.jobID]
.rest.register[`get;
"/v1/projects/{projectID}/jobs/{jobID}/results";
"Get results for a finished job";
{
job:select from findJob[x] where status=`done;
if[1 <> count job;'"Job not finished"];
job[`worker] ".demo.results"
};
param.projectID,param.jobID]
projRoot:system["cd "],"/exampleProjects/"
projects:([] name:(); id:"j"$(); dir:(); created:"p"$())
findProject:{
proj:select from .demo.projects where id = x[`arg;`projectID];
if[0 = count proj; '"No such project ", x[`arg, `projectID]];
:first proj; }
findDB:{
proj:findProject x;
dbh:hsym `$db:proj[`dir],"/",n: x[`arg;`databaseID];
if[() ~ key dbh;'"Database does not exist: ", n];
:`name`h!(n;dbh) }
findJob:{
select from .demo.jobs where projectID = x[`arg;`projectID],
id = x[`arg;`jobID] }
writePar:{
db:findDB x;
name:x[`data;`name];
t:x[`data;`table];
t[`date]:"D"$t `date;
a:cols[t] except `date;
kt:?[t;();`date;a!a];
(`$string key[kt]) {[root;name;date;d]
(` sv root,date,name,`) set flip d
}[db `h;name]' value kt;
:name }
done:{ .demo.jobs:update status:`done from .demo.jobs where worker = .z.w;}
maxWait:00:00:05
i:0
n:10
workers:()
jobs:([] id:"j"$(); worker:"I"$(); projectID:"j"$(); status:`$())
\d .
.z.po:{.demo.i+:1;}
.z.ts:{[start;now]
if [now > start + .demo.maxWait;
-2 "Took longer than ",string[.demo.maxWait],
" to start ",string[.demo.n]," workers";
-2 "Exiting...";
exit 1];
// Clear timer and uninstall .z.po
if[.demo.n = count .z.W;
system "t 0";
.z.po:{};
.demo.workers:key .z.W]
}[.z.p;]
do[10; system "q queryworker.q -server ",string system "p"]
\t 1000