Skip to content

Deferred response

Overview

Ideally, for concurrency, all messaging would be async. However, sync messaging is a convenient paradigm for client apps.

You can use -30!x to allow processing of a sync message to be ‘suspended’, by indicating the response for the currently-executing sync message to be sent explicitly later. This allows other messages to be processed prior to sending a response message.

You can use -30!(::) at any place in the execution path of .z.pg, start up some work, allow .z.pg to complete without sending a response, and then when the workers complete the task, send the response explicitly.

Handle Tracking

kdb+ tracks which handles are expecting a response. If you try to send a response to a handle that is not expecting one, you’ll see

q)key .z.W / list of socket handles being monitored by kdb+ main thread
, 8i 
q)-30!(8i;0b;`hello`world) / try to send a response of (0b;`hello`world)
'Handle 8 was not expecting a response msg
  [0]  -30!(8i;0b;`hello`world)
          ^

and if the handle is not a member of .z.W, you’ll observe a 'domain error.

Example

Below is a simple script to demonstrate the mechanics of -30!x in a gateway. Further error checking, .z.pc, timeouts, sequence numbers, load-balancing, etc., are left as an exercise for the reader.

workerHandles:hopen each 6000 6001 / open handles to worker processes

pending:()!() / keep track of received results for each clientHandle

/ this example fn joins the results when all are received from the workers
reduceFunction:raze

/ each worker calls this with (0b;result) or (1b;errorString) 
callback:{[clientHandle;result] 
 pending[clientHandle],:enlist result; / store the received result
 / check whether we have all expected results for this client
 if[count[workerHandles]=count pending clientHandle; 
   / test whether any response (0|1b;...) included an error
   isError:0<sum pending[clientHandle][;0]; 
   result:pending[clientHandle][;1]; / grab the error strings or results
   / send the first error or the reduced result
   r:$[isError;{first x where 10h=type each x};reduceFunction]result; 
   -30!(clientHandle;isError;r); 
   pending[clientHandle]:(); / clear the temp results
 ]
 }

.z.pg:{[query]
  remoteFunction:{[clntHandle;query]
    neg[.z.w](`callback;clntHandle;@[(0b;)value@;query;{[errorString](1b;errorString)}])
  };
  neg[workerHandles]@\:(remoteFunction;.z.w;query); / send the query to each worker
  -30!(::); / defer sending a response message i.e. return value of .z.pg is ignored
 }

Basics: Internal -30!x
Blog: Deferred Response
Namespace .z