Skip to content

Deferred response

Overview

Ideally, for concurrency, all messaging would be async. However, sync messaging is a convenient paradigm for client apps.

On the server side of client request, .z.pg handles the sync request, performs the required task and returns the outcome.

You can redefine the sync callback .z.pg to implement custom logic for a sync request. When you use -30!x, .z.pg no longer needs to return the outcome immediately. -30!x flags that the response will come later, allowing the callback to return without sending anything to the client. Once the result is ready, call -30!x again to send the data to the waiting client.

This allows kdb+ to process other messages before sending the response.

Example

There are many situations in which deferred responses can be applied. For this example, we will show 2 worker nodes that will perform the requested work for the client. The client will communicate via a gateway.

The gateway will use custom logic that uses deferred response, in order to send request to the 2 worker nodes and respond later when all responses have been received. This allows the gateway to service other clients while the worker nodes perform their task. Only when the 2 worker nodes reply, is the amalgamated response sent to the client. The instances will be run on the same machine for simplicity of the example, but it can be altered to run on multiple machines.

Workers

Start 2 worker nodes. Each will contain a very small table with different data so that we can easily identify the data received by the client. In practice, this could be an extremely large dataset.

Start one worker node to listen on port 6000, e.g. q -p 6000

q)t:([]a:1 2 3;b:4 5 6)
Start one worker node to listen on port 6001, e.g. q -p 6001
q)t:([]a:11 12 13;b:14 15 16)

Gateway

Below is a simple script to demonstrate the mechanics of -30!x in a gateway, via a custom script gateway.q

workerHandles:hopen each 6000 6001 / open handles to worker processes

pending:()!() / keep track of received results for each clientHandle

/ this example fn joins the results when all are received from the workers
reduceFunction:raze

/ each worker calls this with (0b;result) or (1b;errorString) 
callback:{[clientHandle;result] 
 pending[clientHandle],:enlist result; / store the received result
 / check whether we have all expected results for this client
 if[count[workerHandles]=count pending clientHandle; 
   / test whether any response (0|1b;...) included an error
   isError:0<sum pending[clientHandle][;0]; 
   result:pending[clientHandle][;1]; / grab the error strings or results
   / send the first error or the reduced result
   r:$[isError;{first x where 10h=type each x};reduceFunction]result; 
   -30!(clientHandle;isError;r); 
   pending[clientHandle]:(); / clear the temp results
 ]
 }

.z.pg:{[query]
  remoteFunction:{[clntHandle;query]
    neg[.z.w](`callback;clntHandle;@[(0b;)value@;query;{[errorString](1b;errorString)}])
  };
  neg[workerHandles]@\:(remoteFunction;.z.w;query); / send the query to each worker
  -30!(::); / defer sending a response message i.e. return value of .z.pg is ignored
 }

Run the gateway script and listen to port 5000 for client requests i.e. q gateway.q -p 5000

Further error checking, .z.pc, timeouts, sequence numbers, load-balancing, etc., are left as an exercise for the reader.

Client

Run kdb+ (q) and connect to the gateway. The gateway will pass the instruction to both worker nodes. Only when both worker nodes reply, is the raze function applied to both sets of data to form a single data set. It is then returned to the client.

q)h:hopen 5000
q)h"select from t"
a  b
-----
11 14
12 15
13 16
1  4
2  5
3  6

Blog: Deferred Response