IPC
pykx.ipc
PyKX q IPC interface.
The IPC communication module provided here works differently than may be expected for users
familiar with the KX IPC interfaces provided for Java and C#. Unlike these interfaces it does not
directly convert the encoded data received over the q IPC protocol to an analogous type in Python,
but rather stores the object within q memory space as a pykx.K
object for deferred conversion.
This has major benefits with regards to the flexibility of the interface. In particular, the
pykx.K
conversion methods (i.e. py
, np
, pd
, and pa
), use the same logic as they
do when converting pykx.K
objects that were created by an embedded q instance.
The IPC interface works when running with or without a q license. Refer to the modes of operation documentation for more details.
The IPC Interface is split between two classes pykx.AsyncQConnection
and pykx.SyncQConnection
.
Both of which extend the base QConnection
class, instantiating a QConnection
directly remains
possible for backward compatibility but will now return an instance of pykx.SyncQConnection
Execution Contexts
Functions pulled in over IPC execute locally within PyKX by default using embedded q. Symbolic functions can be used to execute in a different context instead, such as over IPC in the q instance where the function was originally defined. The context interface provides symbolic functions for all functions accessed through it by default.
In the following example, q
is a pykx.QConnection
instance.
The following call to the q function save
executes locally using embedded q,
because q('save')
returns a regular pykx.Function
object.
q('save')('t') # Executes locally
When save
is accessed through the context interface, it is a
pykx.SymbolicFunction
object instead, which means it is simultaneously an instance of
pykx.Function
and pykx.SymbolAtom
. When it is executed, the function retrived within
its execution context using its symbol value, and so it is executed in the q server where
save
is defined.
q.save('t') # Executes in the q server over IPC
Alternatively, one can simply access & use the function by name manually within a single query.
This differs from the first case because the query includes the argument for save
,
and so what is returned is the result of calling save
with the argument t
,
rather than the save
function itself.
q('save', 't') # Executes in the q server over IPC
Asynchronous Execution
In order to make asynchronous queries to q
with PyKX
an AsyncQConnection
must be used. When an
instance of an AsyncQConnection
is called the query will be sent to the q
server and control
will be immediately handed back to the running Python program. The __call__
function returns a
QFuture
instance that can later be awaited on to block until a result has been received.
If you are using an AsyncQConnection
to make q queries that respond in a deferred manner, you must
make the call using the reuse=False
parameter. By using that parameter the query will be made over
a dedicated AsyncQConnection
instance that is closed upon the result be received.
QFuture
QFuture(q_connection, timeout)
A Future object to be returned by calls to q from an instance of pykx.AsyncQConnection
.
This object can be awaited to receive the resulting value.
Examples:
Awaiting an instance of this class to receive the return value of an AsyncQConnection
call.
async with pykx.AsyncQConnection('localhost', 5001) as q:
q_future = q('til 10') # returns a QFuture object
q_result = await q_future
__await__
__await__()
Await the result of the QFuture
.
Returns:
Type | Description |
---|---|
Any
|
The result of the |
Raises:
Type | Description |
---|---|
FutureCancelled
|
This QFuture instance has been cancelled and cannot be awaited. |
BaseException
|
If the future has an exception set it will be raised upon awaiting it. |
set_result
set_result(val)
Set the result of the QFuture
and mark it as done.
If there are functions in the callback list they will be called using this QFuture
instance as the only parameter after the result is set.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
val |
Any
|
The value to set as the result of the |
required |
set_exception
set_exception(err)
Set the exception of the QFuture
and mark it as done.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
err |
Exception
|
The exception to set as the exception of the |
required |
result
result()
Get the result of the QFuture
.
Returns:
Type | Description |
---|---|
Any
|
The result of the |
Raises:
Type | Description |
---|---|
FutureCancelled
|
This QFuture instance has been cancelled and cannot be awaited. |
NoResults
|
The result is not ready. |
cancel
cancel(msg='')
Cancel the QFuture
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
msg |
str
|
An optional message to append to the end of the |
''
|
exception
exception()
Get the exception of the QFuture
.
Returns:
Type | Description |
---|---|
None
|
The excpetion of the |
add_done_callback
add_done_callback(callback)
The callback should expect one parameter that is the current instance of this class.
The functions are called when the result of the future is set and therefore can use the result and modify it.
Add a callback function to be ran when the QFuture
is done.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callback |
Callable
|
The callback function to be called when the result is set. |
required |
remove_done_callback
remove_done_callback(callback)
Remove a callback from the list of callbacks contained within the class.
All matching callbacks will be removed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callback |
Callable
|
The callback function to be removed from the list of callback functions to call. |
required |
Returns:
Type | Description |
---|---|
int
|
The number of functions removed. |
get_loop
get_loop()
Raises:
Type | Description |
---|---|
PyKXException
|
QFutures do not rely on an event loop to drive them, and therefore do not have one. |
QConnection
QConnection(
host="localhost",
port=None,
*args,
username="",
password="",
timeout=0.0,
large_messages=True,
tls=False,
unix=False,
sync=None,
wait=True,
lock=None
)
Bases: Q
Creating an instance of this class returns an instance of pykx.SyncQConnection
.
Directly instantiating an instance of pykx.SyncQConnection
is recommended, but
this behaviour will remain for backwards compatibility.
Interface with a q process using the q IPC protocol.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
host |
Union[str, bytes]
|
The host name to which a connection is to be established. |
'localhost'
|
port |
int
|
The port to which a connection is to be established. |
None
|
username |
Union[str, bytes]
|
Username for q connection authorization. |
''
|
password |
Union[str, bytes]
|
Password for q connection authorization. |
''
|
timeout |
float
|
Timeout for blocking socket operations in seconds. If set to |
0.0
|
large_messages |
bool
|
Whether support for messages >2GB should be enabled. |
True
|
tls |
bool
|
Whether TLS should be used. |
False
|
unix |
bool
|
Whether a Unix domain socket should be used instead of TCP. If set to |
False
|
sync |
bool
|
This parameter is deprecated - use |
None
|
wait |
bool
|
Whether the q server should send a response to the query (which this connection
will wait to receive). Can be overridden on a per-call basis. If |
True
|
The username
and password
parameters are not required.
The username
and password
parameters are only required if the q server requires
authorization. Refer to ssl documentation for more
information.
The timeout
argument may not always be enforced when making succesive querys.
When making successive queries if one query times out the next query will wait until a
response has been recieved from the previous query before starting the timer for its own
timeout. This can be avioded by using a seperate QConnection
instance for each query.
Raises:
Type | Description |
---|---|
PyKXException
|
Using both tls and unix is not possible with a QConnection. |
SyncQConnection
SyncQConnection(
host="localhost",
port=None,
*args,
username="",
password="",
timeout=0.0,
large_messages=True,
tls=False,
unix=False,
sync=None,
wait=True,
lock=None
)
Bases: QConnection
Interface with a q process using the q IPC protocol.
Instances of this class represent an open connection to a q process, which can be sent messages synchronously or asynchronously by calling it as a function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
host |
Union[str, bytes]
|
The host name to which a connection is to be established. |
'localhost'
|
port |
int
|
The port to which a connection is to be established. |
None
|
username |
Union[str, bytes]
|
Username for q connection authorization. |
''
|
password |
Union[str, bytes]
|
Password for q connection authorization. |
''
|
timeout |
float
|
Timeout for blocking socket operations in seconds. If set to |
0.0
|
large_messages |
bool
|
Whether support for messages >2GB should be enabled. |
True
|
tls |
bool
|
Whether TLS should be used. |
False
|
unix |
bool
|
Whether a Unix domain socket should be used instead of TCP. If set to |
False
|
sync |
Optional[bool]
|
This parameter is deprecated - use |
None
|
wait |
bool
|
Whether the q server should send a response to the query (which this connection
will wait to receive). Can be overridden on a per-call basis. If |
True
|
The username
and password
parameters are not required.
The username
and password
parameters are only required if the q server requires
authorization. Refer to ssl documentation for more
information.
The timeout
argument may not always be enforced when making succesive querys.
When making successive queries if one query times out the next query will wait until a
response has been recieved from the previous query before starting the timer for its own
timeout. This can be avioded by using a seperate SyncQConnection
instance for each
query.
Raises:
Type | Description |
---|---|
PyKXException
|
Using both tls and unix is not possible with a QConnection. |
Examples:
Connect to a q process on localhost with a required username and password.
pykx.SyncQConnection('localhost', 5001, 'username', 'password')
Connect to a q process at IP address 127.0.0.0, on port 5000 with a timeout of 2 seconds and TLS enabled.
pykx.SyncQConnection('127.0.0.1', 5001, timeout=2.0, tls=True)
Connect to a q process via a Unix domain socket on port 5001
pykx.SyncQConnection(port=5001, unix=True)
__call__
__call__(query, *args, wait=None, sync=None)
Evaluate a query on the connected q process over IPC.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
Union[str, bytes, CharVector]
|
A q expression to be evaluated. |
required |
*args |
Any
|
Arguments to the q query. Each argument will be converted into a |
()
|
sync |
Optional[bool]
|
This parameter is deprecated - use |
None
|
wait |
Optional[bool]
|
Whether the q server should execute the query before responding. If |
None
|
Raises:
Type | Description |
---|---|
RuntimeError
|
A closed IPC connection was used. |
QError
|
Query timed out, may be raised if the time taken to make or receive a query goes over the timeout limit. |
TypeError
|
Too many arguments were provided - q queries cannot have more than 8 parameters. |
ValueError
|
Attempted to send a Python function over IPC. |
Examples:
q = pykx.SyncQConnection(host='localhost', port=5002)
Call an anonymous function with 2 parameters
q('{y+til x}', 10, 5)
Execute a q query with no parameters
q('til 10')
Call an anonymous function with 3 parameters and don't wait for a response
q('{x set y+til z}', 'async_query', 10, 5, wait=False)
Call an anonymous function with 3 parameters and don't wait for a response by default
q = pykx.SyncQConnection(host='localhost', port=5002, wait=False)
# Because `wait=False`, all calls on this q instance are not responded to by default:
q('{x set y+til z}', 'async_query', 10, 5)
# But we can issue calls and wait for results by overriding the `wait` option on a per-call
# basis:
q('{x set y+til z}', 'async_query', 10, 5, wait=True)
close
close()
Close the connection.
Examples:
Open and subsequently close a connection to a q process on localhost:
q = pykx.SyncQConnection('localhost', 5001)
q.close()
Using this class with a with-statement should be preferred:
with pykx.SyncQConnection('localhost', 5001) as q:
# do stuff with q
pass
# q is closed automatically
fileno
fileno()
The file descriptor or handle of the connection.
AsyncQConnection
AsyncQConnection(
host="localhost",
port=None,
*args,
username="",
password="",
timeout=0.0,
large_messages=True,
tls=False,
unix=False,
wait=True,
lock=None,
event_loop=None
)
Bases: QConnection
Interface with a q process using the q IPC protocol.
Instances of this class represent an open connection to a q process, which can be sent messages asynchronously by calling it as a function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
host |
Union[str, bytes]
|
The host name to which a connection is to be established. |
'localhost'
|
port |
int
|
The port to which a connection is to be established. |
None
|
username |
Union[str, bytes]
|
Username for q connection authorization. |
''
|
password |
Union[str, bytes]
|
Password for q connection authorization. |
''
|
timeout |
float
|
Timeout for blocking socket operations in seconds. If set to |
0.0
|
large_messages |
bool
|
Whether support for messages >2GB should be enabled. |
True
|
tls |
bool
|
Whether TLS should be used. |
False
|
unix |
bool
|
Whether a Unix domain socket should be used instead of TCP. If set to |
False
|
wait |
bool
|
Whether the q server should send a response to the query (which this connection
will wait to receive). Can be overridden on a per-call basis. If |
True
|
event_loop |
Optional[asyncio.AbstractEventLoop]
|
If running an event loop that supports the |
None
|
The username
and password
parameters are not required.
The username
and password
parameters are only required if the q server requires
authorization. Refer to ssl documentation for more
information.
The timeout
argument may not always be enforced when making succesive querys.
When making successive queries if one query times out the next query will wait until a
response has been recieved from the previous query before starting the timer for its own
timeout. This can be avioded by using a seperate QConnection
instance for each query.
Raises:
Type | Description |
---|---|
PyKXException
|
Using both tls and unix is not possible with a QConnection. |
Examples:
Connect to a q process on localhost with a required username and password.
await pykx.AsyncQConnection('localhost', 5001, 'username', 'password')
Connect to a q process at IP address 127.0.0.0, on port 5000 with a timeout of 2 seconds and TLS enabled.
await pykx.AsyncQConnection('127.0.0.1', 5001, timeout=2.0, tls=True)
Connect to a q process via a Unix domain socket on port 5001
await pykx.AsyncQConnection(port=5001, unix=True)
_initobj
async
_initobj()
Crutch used for __await__
after spawning.
__call__
__call__(query, *args, wait=True, reuse=True)
Evaluate a query on the connected q process over IPC.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
Union[str, bytes, CharVector]
|
A q expression to be evaluated. |
required |
*args |
Any
|
Arguments to the q query. Each argument will be converted into a |
()
|
wait |
bool
|
Whether the q server should execute the query before responding. If |
True
|
reuse |
bool
|
Whether the AsyncQConnection instance should be reused for subsequent queries,
if using q queries that respond in a deferred/asynchronous manner this should be set
to |
True
|
Returns:
Type | Description |
---|---|
QFuture
|
A QFuture object that can be awaited on to get the result of the query. |
Raises:
Type | Description |
---|---|
RuntimeError
|
A closed IPC connection was used. |
QError
|
Query timed out, may be raised if the time taken to make or receive a query goes over the timeout limit. |
TypeError
|
Too many arguments were provided - q queries cannot have more than 8 parameters. |
ValueError
|
Attempted to send a Python function over IPC. |
Examples:
q = await pykx.AsyncQConnection(host='localhost', port=5002)
Call an anonymous function with 2 parameters
await q('{y+til x}', 10, 5)
Execute a q query with no parameters
await q('til 10')
Call an anonymous function with 3 parameters and don't wait for a response
await q('{x set y+til z}', 'async_query', 10, 5, wait=False)
Call an anonymous function with 3 parameters and don't wait for a response by default
q = await pykx.AsyncQConnection(host='localhost', port=5002, wait=False)
# Because `wait=False`, all calls on this q instance are not responded to by default:
await q('{x set y+til z}', 'async_query', 10, 5)
# But we can issue calls and wait for results by overriding the `wait` option on a per-call
# basis:
await q('{x set y+til z}', 'async_query', 10, 5, wait=True)
close
async
close()
Close the connection.
Examples:
Open and subsequently close a connection to a q process on localhost:
q = await pykx.AsyncQConnection('localhost', 5001)
await q.close()
Using this class with a with-statement should be preferred:
async with pykx.AsyncQConnection('localhost', 5001) as q:
# do stuff with q
pass
# q is closed automatically
fileno
fileno()
The file descriptor or handle of the connection.