IPC
pykx.ipc
PyKX q IPC interface.
The IPC communication module provided here works differently than may be expected for users
familiar with the KX IPC interfaces provided for Java and C#. Unlike these interfaces it does not
directly convert the encoded data received over the q IPC protocol to an analogous type in Python,
but rather stores the object within q memory space as a pykx.K
object for deferred conversion.
This has major benefits with regards to the flexibility of the interface. In particular, the
pykx.K
conversion methods (i.e. py
, np
, pd
, and pa
), use the same logic as they
do when converting pykx.K
objects that were created by an embedded q instance.
The IPC interface works when running with or without a q license. Refer to the modes of operation documentation for more details.
The IPC Interface is split between two classes pykx.AsyncQConnection
and pykx.SyncQConnection
.
Both of which extend the base QConnection
class, instantiating a QConnection
directly remains
possible for backward compatibility but will now return an instance of pykx.SyncQConnection
. There
is also the pykx.RawQConnection
class that is a superset of the pykx.AsyncQConnection
class that
has extra functionality around manually polling the send an receive message queues.
For more examples of usage of the IPC interface you can look at the
interface overview
.
MessageType
QFuture
QFuture(q_connection, timeout, debug, poll_recv=None)
A Future object to be returned by calls to q from an instance of pykx.AsyncQConnection
.
This object can be awaited to receive the resulting value.
Examples:
Awaiting an instance of this class to receive the return value of an AsyncQConnection
call.
async with pykx.AsyncQConnection('localhost', 5001) as q:
q_future = q('til 10') # returns a QFuture object
q_result = await q_future
__await__
__await__()
Await the result of the QFuture
.
Returns:
Type | Description |
---|---|
Any
|
The result of the |
Raises:
Type | Description |
---|---|
FutureCancelled
|
This QFuture instance has been cancelled and cannot be awaited. |
BaseException
|
If the future has an exception set it will be raised upon awaiting it. |
set_result
set_result(val)
Set the result of the QFuture
and mark it as done.
If there are functions in the callback list they will be called using this QFuture
instance as the only parameter after the result is set.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
val |
Any
|
The value to set as the result of the |
required |
set_exception
set_exception(err)
Set the exception of the QFuture
and mark it as done.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
err |
Exception
|
The exception to set as the exception of the |
required |
result
result()
Get the result of the QFuture
.
Returns:
Type | Description |
---|---|
Any
|
The result of the |
Raises:
Type | Description |
---|---|
FutureCancelled
|
This QFuture instance has been cancelled and cannot be awaited. |
NoResults
|
The result is not ready. |
cancel
cancel(msg='')
Cancel the QFuture
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
msg |
str
|
An optional message to append to the end of the |
''
|
exception
exception()
Get the exception of the QFuture
.
Returns:
Type | Description |
---|---|
None
|
The excpetion of the |
add_done_callback
add_done_callback(callback)
The callback should expect one parameter that is the current instance of this class.
The functions are called when the result of the future is set and therefore can use the result and modify it.
Add a callback function to be ran when the QFuture
is done.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callback |
Callable
|
The callback function to be called when the result is set. |
required |
remove_done_callback
remove_done_callback(callback)
Remove a callback from the list of callbacks contained within the class.
All matching callbacks will be removed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callback |
Callable
|
The callback function to be removed from the list of callback functions to call. |
required |
Returns:
Type | Description |
---|---|
int
|
The number of functions removed. |
get_loop
get_loop()
Raises:
Type | Description |
---|---|
PyKXException
|
QFutures do not rely on an event loop to drive them, and therefore do not have one. |
QConnection
QConnection(
host="localhost",
port=None,
*args,
username="",
password="",
timeout=0.0,
large_messages=True,
tls=False,
unix=False,
wait=True,
lock=None,
no_ctx=False,
reconnection_attempts=-1
)
Bases: Q
Creating an instance of this class returns an instance of pykx.SyncQConnection
.
Directly instantiating an instance of pykx.SyncQConnection
is recommended, but
this behavior will remain for backwards compatibility.
Interface with a q process using the q IPC protocol.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
host |
Union[str, bytes]
|
The host name to which a connection is to be established. |
'localhost'
|
port |
int
|
The port to which a connection is to be established. |
None
|
username |
Union[str, bytes]
|
Username for q connection authorization. |
''
|
password |
Union[str, bytes]
|
Password for q connection authorization. |
''
|
timeout |
float
|
Timeout for blocking socket operations in seconds. If set to |
0.0
|
large_messages |
bool
|
Whether support for messages >2GB should be enabled. |
True
|
tls |
bool
|
Whether TLS should be used. |
False
|
unix |
bool
|
Whether a Unix domain socket should be used instead of TCP. If set to |
False
|
wait |
bool
|
Whether the q server should send a response to the query (which this connection
will wait to receive). Can be overridden on a per-call basis. If |
True
|
no_ctx |
bool
|
This parameter determines whether or not the context interface will be disabled. disabling the context interface will stop extra q queries being sent but will disable the extra features around the context interface. |
False
|
reconnection_attempts |
int
|
This parameter specifies how many attempts will be made to reconnect to the server if the connection is lost. The query will be resent if the reconnection is successful. The default is -1 which will not attempt to reconnect, 0 will continuosly attempt to reconnect to the server with no stop and an exponential backoff between successive attempts. Any positive integer will specify the maximum number of tries to reconnect before throwing an error if a connection can not be made. |
-1
|
The username
and password
parameters are not required.
The username
and password
parameters are only required if the q server requires
authorization. Refer to ssl documentation for more
information.
The timeout
argument may not always be enforced when making successive queries.
When making successive queries if one query times out the next query will wait until a
response has been received from the previous query before starting the timer for its own
timeout. This can be avoided by using a separate QConnection
instance for each query.
Raises:
Type | Description |
---|---|
PyKXException
|
Using both tls and unix is not possible with a QConnection. |
file_execute
file_execute(file_path, *, return_all=False)
Functionality for the execution of the content of a local file on a remote server
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str
|
Path to the file which is to be executed on a remote server |
required |
return_all |
bool
|
Return the execution result from all lines within the executed script |
False
|
Raises:
Type | Description |
---|---|
PyKXException
|
Will raise error associated with failure to execute the code on the server associated with the given file. |
Examples:
Connect to a q process on localhost and execute a file based on relative path.
conn = pykx.QConnection('localhost', 5000)
conn.file_execute('file.q')
Connect to a q process using an asynchronous QConnection at IP address 127.0.0.1, on port 5000 and execute a file based on absolute path.
conn = pykx.QConnection('127.0.0.1', 5000, wait=False)
conn.file_execute('/User/path/to/file.q')
SyncQConnection
SyncQConnection(
host="localhost",
port=None,
*args,
username="",
password="",
timeout=0.0,
large_messages=True,
tls=False,
unix=False,
wait=True,
lock=None,
no_ctx=False,
reconnection_attempts=-1
)
Bases: QConnection
Interface with a q process using the q IPC protocol.
Instances of this class represent an open connection to a q process, which can be sent messages synchronously or asynchronously by calling it as a function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
host |
Union[str, bytes]
|
The host name to which a connection is to be established. |
'localhost'
|
port |
int
|
The port to which a connection is to be established. |
None
|
username |
Union[str, bytes]
|
Username for q connection authorization. |
''
|
password |
Union[str, bytes]
|
Password for q connection authorization. |
''
|
timeout |
float
|
Timeout for blocking socket operations in seconds. If set to |
0.0
|
large_messages |
bool
|
Whether support for messages >2GB should be enabled. |
True
|
tls |
bool
|
Whether TLS should be used. |
False
|
unix |
bool
|
Whether a Unix domain socket should be used instead of TCP. If set to |
False
|
wait |
bool
|
Whether the q server should send a response to the query (which this connection
will wait to receive). Can be overridden on a per-call basis. If |
True
|
no_ctx |
bool
|
This parameter determines whether or not the context interface will be disabled. disabling the context interface will stop extra q queries being sent but will disable the extra features around the context interface. |
False
|
reconnection_attempts |
int
|
This parameter specifies how many attempts will be made to reconnect to the server if the connection is lost. The query will be resent if the reconnection is successful. The default is -1 which will not attempt to reconnect, 0 will continuosly attempt to reconnect to the server with no stop and an exponential backoff between successive attempts. Any positive integer will specify the maximum number of tries to reconnect before throwing an error if a connection can not be made. |
-1
|
The username
and password
parameters are not required.
The username
and password
parameters are only required if the q server requires
authorization. Refer to ssl documentation for more
information.
The timeout
argument may not always be enforced when making successive queries.
When making successive queries if one query times out the next query will wait until a
response has been received from the previous query before starting the timer for its own
timeout. This can be avoided by using a separate SyncQConnection
instance for each
query.
Raises:
Type | Description |
---|---|
PyKXException
|
Using both tls and unix is not possible with a QConnection. |
Examples:
Connect to a q process on localhost with a required username and password.
pykx.SyncQConnection('localhost', 5001, 'username', 'password')
Connect to a q process at IP address 127.0.0.0, on port 5000 with a timeout of 2 seconds and TLS enabled.
pykx.SyncQConnection('127.0.0.1', 5001, timeout=2.0, tls=True)
Connect to a q process via a Unix domain socket on port 5001
pykx.SyncQConnection(port=5001, unix=True)
Automatically reconnect to a q server after a disconnect.
>>> conn = kx.SyncQConnection(port=5001, reconnection_attempts=0)
>>> conn('til 10')
pykx.LongVector(pykx.q('0 1 2 3 4 5 6 7 8 9'))
# server connection is lost here
>>> conn('til 10')
WARNING: Connection lost attempting to reconnect.
Failed to reconnect, trying again in 0.5 seconds.
Connection successfully reestablished.
pykx.LongVector(pykx.q('0 1 2 3 4 5 6 7 8 9'))
__call__
__call__(query, *args, wait=None, debug=False, skip_debug=False)
Evaluate a query on the connected q process over IPC.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
Union[str, bytes, CharVector]
|
A q expression to be evaluated. |
required |
*args |
Any
|
Arguments to the q query. Each argument will be converted into a |
()
|
wait |
Optional[bool]
|
Whether the q server should execute the query before responding. If |
None
|
Raises:
Type | Description |
---|---|
RuntimeError
|
A closed IPC connection was used. |
QError
|
Query timed out, may be raised if the time taken to make or receive a query goes over the timeout limit. |
TypeError
|
Too many arguments were provided - q queries cannot have more than 8 parameters. |
ValueError
|
Attempted to send a Python function over IPC. |
Examples:
q = pykx.SyncQConnection(host='localhost', port=5002)
Call an anonymous function with 2 parameters
q('{y+til x}', 10, 5)
Execute a q query with no parameters
q('til 10')
Call an anonymous function with 3 parameters and don't wait for a response
q('{x set y+til z}', 'async_query', 10, 5, wait=False)
Call an anonymous function with 3 parameters and don't wait for a response by default
q = pykx.SyncQConnection(host='localhost', port=5002, wait=False)
# Because `wait=False`, all calls on this q instance are not responded to by default:
q('{x set y+til z}', 'async_query', 10, 5)
# But we can issue calls and wait for results by overriding the `wait` option on a per-call
# basis:
q('{x set y+til z}', 'async_query', 10, 5, wait=True)
close
close()
Close the connection.
Examples:
Open and subsequently close a connection to a q process on localhost:
q = pykx.SyncQConnection('localhost', 5001)
q.close()
Using this class with a with-statement should be preferred:
with pykx.SyncQConnection('localhost', 5001) as q:
# do stuff with q
pass
# q is closed automatically
fileno
fileno()
The file descriptor or handle of the connection.
AsyncQConnection
AsyncQConnection(
host="localhost",
port=None,
*args,
username="",
password="",
timeout=0.0,
large_messages=True,
tls=False,
unix=False,
wait=True,
lock=None,
event_loop=None,
no_ctx=False,
reconnection_attempts=-1
)
Bases: QConnection
Interface with a q process using the q IPC protocol.
Instances of this class represent an open connection to a q process, which can be sent messages asynchronously by calling it as a function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
host |
Union[str, bytes]
|
The host name to which a connection is to be established. |
'localhost'
|
port |
int
|
The port to which a connection is to be established. |
None
|
username |
Union[str, bytes]
|
Username for q connection authorization. |
''
|
password |
Union[str, bytes]
|
Password for q connection authorization. |
''
|
timeout |
float
|
Timeout for blocking socket operations in seconds. If set to |
0.0
|
large_messages |
bool
|
Whether support for messages >2GB should be enabled. |
True
|
tls |
bool
|
Whether TLS should be used. |
False
|
unix |
bool
|
Whether a Unix domain socket should be used instead of TCP. If set to |
False
|
wait |
bool
|
Whether the q server should send a response to the query (which this connection
will wait to receive). Can be overridden on a per-call basis. If |
True
|
event_loop |
Optional[asyncio.AbstractEventLoop]
|
If running an event loop that supports the |
None
|
no_ctx |
bool
|
This parameter determines whether or not the context interface will be disabled. disabling the context interface will stop extra q queries being sent but will disable the extra features around the context interface. |
False
|
reconnection_attempts |
int
|
This parameter specifies how many attempts will be made to reconnect to the server if the connection is lost. The query will not be resent if the reconnection is successful. The default is -1 which will not attempt to reconnect, 0 will continuosly attempt to reconnect to the server with no stop and an exponential backoff between successive attempts. Any positive integer will specify the maximum number of tries to reconnect before throwing an error if a connection can not be made. |
-1
|
The username
and password
parameters are not required.
The username
and password
parameters are only required if the q server requires
authorization. Refer to ssl documentation for more
information.
The timeout
argument may not always be enforced when making successive queries.
When making successive queries if one query times out the next query will wait until a
response has been received from the previous query before starting the timer for its own
timeout. This can be avoided by using a separate QConnection
instance for each query.
AsyncQConnections will not resend queries that have not completed on reconnection.
When using the reconnection_attempts
key word argument any queries that were not
complete before the connection was lost will have to be manually sent again after the
automatic reconnection.
Raises:
Type | Description |
---|---|
PyKXException
|
Using both tls and unix is not possible with a QConnection. |
Examples:
Connect to a q process on localhost with a required username and password.
await pykx.AsyncQConnection('localhost', 5001, 'username', 'password')
Connect to a q process at IP address 127.0.0.0, on port 5000 with a timeout of 2 seconds and TLS enabled.
await pykx.AsyncQConnection('127.0.0.1', 5001, timeout=2.0, tls=True)
Connect to a q process via a Unix domain socket on port 5001
await pykx.AsyncQConnection(port=5001, unix=True)
Automatically reconnect to a q server after a disconnect.
async def main():
conn = await kx.AsyncQConnection(
port=5001,
event_loop=asyncio.get_event_loop(),
reconnection_attempts=0
)
print(await conn('til 10'))
# Connection lost here
# All unfinished futures are cancelled on connection loss
print(await conn('til 10')) # First call only causes a reconnection but wont send the query and returns none
print(await conn('til 10')) # Second one completes
print(await conn('til 10'))
asyncio.run(main())
# Outputs
0 1 2 3 4 5 6 7 8 9
WARNING: Connection lost attempting to reconnect.
Connection successfully reestablished.
Connection was lost no result
None
0 1 2 3 4 5 6 7 8 9
_initobj
async
_initobj()
Crutch used for __await__
after spawning.
__call__
__call__(query, *args, wait=True, reuse=True, debug=False)
Evaluate a query on the connected q process over IPC.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
Union[str, bytes, CharVector]
|
A q expression to be evaluated. |
required |
*args |
Any
|
Arguments to the q query. Each argument will be converted into a |
()
|
wait |
bool
|
Whether the q server should execute the query before responding. If |
True
|
reuse |
bool
|
Whether the AsyncQConnection instance should be reused for subsequent queries,
if using q queries that respond in a deferred/asynchronous manner this should be set
to |
True
|
Returns:
Type | Description |
---|---|
QFuture
|
A QFuture object that can be awaited on to get the result of the query. |
Raises:
Type | Description |
---|---|
RuntimeError
|
A closed IPC connection was used. |
QError
|
Query timed out, may be raised if the time taken to make or receive a query goes over the timeout limit. |
TypeError
|
Too many arguments were provided - q queries cannot have more than 8 parameters. |
ValueError
|
Attempted to send a Python function over IPC. |
Examples:
q = await pykx.AsyncQConnection(host='localhost', port=5002)
Call an anonymous function with 2 parameters
await q('{y+til x}', 10, 5)
Execute a q query with no parameters
await q('til 10')
Call an anonymous function with 3 parameters and don't wait for a response
await q('{x set y+til z}', 'async_query', 10, 5, wait=False)
Call an anonymous function with 3 parameters and don't wait for a response by default
q = await pykx.AsyncQConnection(host='localhost', port=5002, wait=False)
# Because `wait=False`, all calls on this q instance are not responded to by default:
await q('{x set y+til z}', 'async_query', 10, 5)
# But we can issue calls and wait for results by overriding the `wait` option on a per-call
# basis:
await q('{x set y+til z}', 'async_query', 10, 5, wait=True)
close
async
close()
Close the connection.
Examples:
Open and subsequently close a connection to a q process on localhost:
q = await pykx.AsyncQConnection('localhost', 5001)
await q.close()
Using this class with a with-statement should be preferred:
async with pykx.AsyncQConnection('localhost', 5001) as q:
# do stuff with q
pass
# q is closed automatically
fileno
fileno()
The file descriptor or handle of the connection.
RawQConnection
RawQConnection(
host="localhost",
port=None,
*args,
username="",
password="",
timeout=0.0,
large_messages=True,
tls=False,
unix=False,
wait=True,
event_loop=None,
no_ctx=False,
as_server=False,
conn_gc_time=0.0
)
Bases: QConnection
Interface with a q process using the q IPC protocol.
Instances of this class represent an open connection to a q process, which can be sent messages asynchronously by calling it as a function, the send and receive selector queues can also, be polled directly using this class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
host |
Union[str, bytes]
|
The host name to which a connection is to be established. |
'localhost'
|
port |
int
|
The port to which a connection is to be established. |
None
|
username |
Union[str, bytes]
|
Username for q connection authorization. |
''
|
password |
Union[str, bytes]
|
Password for q connection authorization. |
''
|
timeout |
float
|
Timeout for blocking socket operations in seconds. If set to |
0.0
|
large_messages |
bool
|
Whether support for messages >2GB should be enabled. |
True
|
tls |
bool
|
Whether TLS should be used. |
False
|
unix |
bool
|
Whether a Unix domain socket should be used instead of TCP. If set to |
False
|
wait |
bool
|
Whether the q server should send a response to the query (which this connection
will wait to receive). Can be overridden on a per-call basis. If |
True
|
event_loop |
Optional[asyncio.AbstractEventLoop]
|
If running an event loop that supports the |
None
|
no_ctx |
bool
|
This parameter determines whether or not the context interface will be disabled. disabling the context interface will stop extra q queries being sent but will disable the extra features around the context interface. |
False
|
as_server |
bool
|
If this parameter is set to True the QConnection will act as a |
False
|
conn_gc_time |
float
|
When running as a server this will determine the number of seconds between
going through the list of opened connections and closing any that the clients have
closed. If not set the default of |
0.0
|
The username
and password
parameters are not required.
The username
and password
parameters are only required if the q server requires
authorization. Refer to ssl documentation for more
information.
The timeout
argument may not always be enforced when making successive queries.
When making successive queries if one query times out the next query will wait until a
response has been received from the previous query before starting the timer for its own
timeout. This can be avoided by using a separate QConnection
instance for each query.
The overhead of calling clean_open_connections
is large.
When running as a server you should ensure that clean_open_connections
is called
fairly infrequently as the overhead of clearing all the dead connections can be quite
large. It is recommended to have a large delay on successive clears or manage it
manually.
Raises:
Type | Description |
---|---|
PyKXException
|
Using both tls and unix is not possible with a QConnection. |
Examples:
Connect to a q process on localhost with a required username and password.
await pykx.RawQConnection('localhost', 5001, 'username', 'password')
Connect to a q process at IP address 127.0.0.0, on port 5000 with a timeout of 2 seconds and TLS enabled.
await pykx.RawQConnection('127.0.0.1', 5001, timeout=2.0, tls=True)
Connect to a q process via a Unix domain socket on port 5001
await pykx.RawQConnection(port=5001, unix=True)
_initobj
async
_initobj()
Crutch used for __await__
after spawning.
__call__
__call__(query, *args, wait=True, debug=False)
Evaluate a query on the connected q process over IPC.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
Union[str, bytes, CharVector]
|
A q expression to be evaluated. |
required |
*args |
Any
|
Arguments to the q query. Each argument will be converted into a |
()
|
wait |
bool
|
Whether the q server should execute the query before responding. If |
True
|
reuse |
Whether the AsyncQConnection instance should be reused for subsequent queries,
if using q queries that respond in a deferred/asynchronous manner this should be set
to |
required |
Returns:
Type | Description |
---|---|
QFuture
|
A QFuture object that can be awaited on to get the result of the query. |
Raises:
Type | Description |
---|---|
RuntimeError
|
A closed IPC connection was used. |
QError
|
Query timed out, may be raised if the time taken to make or receive a query goes over the timeout limit. |
TypeError
|
Too many arguments were provided - q queries cannot have more than 8 parameters. |
ValueError
|
Attempted to send a Python function over IPC. |
Note: When querying KX Insights
the no_ctx=True
keyword argument must be used.
Examples:
q = await pykx.RawQConnection(host='localhost', port=5002)
Call an anonymous function with 2 parameters
await q('{y+til x}', 10, 5)
Execute a q query with no parameters
await q('til 10')
Call an anonymous function with 3 parameters and don't wait for a response
await q('{x set y+til z}', 'async_query', 10, 5, wait=False)
Call an anonymous function with 3 parameters and don't wait for a response by default
q = await pykx.RawQConnection(host='localhost', port=5002, wait=False)
# Because `wait=False`, all calls on this q instance are not responded to by default:
await q('{x set y+til z}', 'async_query', 10, 5)
# But we can issue calls and wait for results by overriding the `wait` option on a per-call
# basis:
await q('{x set y+til z}', 'async_query', 10, 5, wait=True)
poll_send
poll_send(amount=1)
Send queued queries to the process connected to over IPC.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
amount |
int
|
The number of send requests to handle, defaults to one, if 0 is used then all currently waiting queries will be sent. |
1
|
Raises:
Type | Description |
---|---|
QError
|
Query timed out, may be raised if the time taken to make or receive a query goes over the timeout limit. |
Examples:
q = await pykx.RawQConnection(host='localhost', port=5002)
Send a single queued message.
q_fut = q('til 10') # not sent yet
q.poll_send() # 1 message is sent
Send two queued messages.
q_fut = q('til 10') # not sent yet
q_fut2 = q('til 10') # not sent yet
q.poll_send(2) # 2 messages are sent
Send all queued messages.
q_fut = q('til 10') # not sent yet
q_fut2 = q('til 10') # not sent yet
q.poll_send(0) # 2 messages are sent
poll_recv_async
poll_recv_async()
Asynchronously receive a query from the process connected to over IPC.
Raises:
Type | Description |
---|---|
QError
|
Query timed out, may be raised if the time taken to make or receive a query goes over the timeout limit. |
Examples:
Receive a single queued message.
q = await pykx.RawQConnection(host='localhost', port=5002)
q_fut = q('til 10') # not sent yet
q.poll_send() # message is sent
await q.poll_recv_async() # message response is received
poll_recv
poll_recv(amount=1)
Recieve queries from the process connected to over IPC.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
amount |
int
|
The number of receive requests to handle, defaults to one, if 0 is used then all currently waiting responses will be received. |
1
|
Raises:
Type | Description |
---|---|
QError
|
Query timed out, may be raised if the time taken to make or receive a query goes over the timeout limit. |
Examples:
q = await pykx.RawQConnection(host='localhost', port=5002)
Receive a single queued message.
q_fut = q('til 10') # not sent yet
q.poll_send() # message is sent
q.poll_recv() # message response is received
Receive two queued messages.
q_fut = q('til 10') # not sent yet
q_fut2 = q('til 10') # not sent yet
q.poll_send(2) # messages are sent
q.poll_recv(2) # message responses are received
Receive all queued messages.
q_fut = q('til 10') # not sent yet
q_fut2 = q('til 10') # not sent yet
q.poll_send(0) # all messages are sent
q.poll_recv(0) # all message responses are received
close
async
close()
Close the connection.
Examples:
Open and subsequently close a connection to a q process on localhost:
q = await pykx.RawQConnection('localhost', 5001)
await q.close()
Using this class with a with-statement should be preferred:
async with pykx.RawQConnection('localhost', 5001) as q:
# do stuff with q
pass
# q is closed automatically
SecureQConnection
SecureQConnection(
host="localhost",
port=None,
*args,
username="",
password="",
timeout=0.0,
large_messages=True,
tls=False,
unix=False,
wait=True,
lock=None,
no_ctx=False,
reconnection_attempts=-1
)
Bases: QConnection
Interface with a q process using the q IPC protocol.
Instances of this class represent an open connection to a q process, which can be sent messages synchronously or asynchronously by calling it as a function. This class is automatically created when using TLS to encrypt your queries.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
host |
Union[str, bytes]
|
The host name to which a connection is to be established. |
'localhost'
|
port |
int
|
The port to which a connection is to be established. |
None
|
username |
Union[str, bytes]
|
Username for q connection authorization. |
''
|
password |
Union[str, bytes]
|
Password for q connection authorization. |
''
|
timeout |
float
|
Timeout for blocking socket operations in seconds. If set to |
0.0
|
large_messages |
bool
|
Whether support for messages >2GB should be enabled. |
True
|
tls |
bool
|
Whether TLS should be used. |
False
|
unix |
bool
|
Whether a Unix domain socket should be used instead of TCP. If set to |
False
|
wait |
bool
|
Whether the q server should send a response to the query (which this connection
will wait to receive). Can be overridden on a per-call basis. If |
True
|
no_ctx |
bool
|
This parameter determines whether or not the context interface will be disabled. disabling the context interface will stop extra q queries being sent but will disable the extra features around the context interface. |
False
|
reconnection_attempts |
int
|
This parameter specifies how many attempts will be made to reconnect to the server if the connection is lost. The query will be resent if the reconnection is successful. The default is -1 which will not attempt to reconnect, 0 will continuosly attempt to reconnect to the server with no stop and an exponential backoff between successive attempts. Any positive integer will specify the maximum number of tries to reconnect before throwing an error if a connection can not be made. |
-1
|
The username
and password
parameters are not required.
The username
and password
parameters are only required if the q server requires
authorization. Refer to ssl documentation for more
information.
The timeout
argument may not always be enforced when making successive queries.
When making successive queries if one query times out the next query will wait until a
response has been received from the previous query before starting the timer for its own
timeout. This can be avoided by using a separate SecureQConnection
instance for each
query.
Raises:
Type | Description |
---|---|
PyKXException
|
Using both tls and unix is not possible with a QConnection. |
Examples:
Connect to a q process at IP address 127.0.0.0, on port 5000 with a timeout of 2 seconds and TLS enabled.
pykx.SecureQConnection('127.0.0.1', 5001, timeout=2.0, tls=True)
__call__
__call__(query, *args, wait=None, debug=False)
Evaluate a query on the connected q process over IPC.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
Union[str, bytes, CharVector]
|
A q expression to be evaluated. |
required |
*args |
Any
|
Arguments to the q query. Each argument will be converted into a |
()
|
wait |
Optional[bool]
|
Whether the q server should execute the query before responding. If |
None
|
Raises:
Type | Description |
---|---|
RuntimeError
|
A closed IPC connection was used. |
QError
|
Query timed out, may be raised if the time taken to make or receive a query goes over the timeout limit. |
TypeError
|
Too many arguments were provided - q queries cannot have more than 8 parameters. |
ValueError
|
Attempted to send a Python function over IPC. |
Examples:
q = pykx.SecureQConnection(host='localhost', port=5002, tls=True)
Call an anonymous function with 2 parameters
q('{y+til x}', 10, 5)
Execute a q query with no parameters
q('til 10')
Call an anonymous function with 3 parameters and don't wait for a response
q('{x set y+til z}', 'async_query', 10, 5, wait=False)
Call an anonymous function with 3 parameters and don't wait for a response by default
q = pykx.SecureQConnection(host='localhost', port=5002, wait=False, tls=True)
# Because `wait=False`, all calls on this q instance are not responded to by default:
q('{x set y+til z}', 'async_query', 10, 5)
# But we can issue calls and wait for results by overriding the `wait` option on a per-call
# basis:
q('{x set y+til z}', 'async_query', 10, 5, wait=True)
Automatically reconnect to a q server after a disconnect.
>>> conn = kx.SecureQConnection(port=5001, reconnection_attempts=0)
>>> conn('til 10')
pykx.LongVector(pykx.q('0 1 2 3 4 5 6 7 8 9'))
>>> conn('til 10')
WARNING: Connection lost attempting to reconnect.
Failed to reconnect, trying again in 0.5 seconds.
Connection successfully reestablished.
pykx.LongVector(pykx.q('0 1 2 3 4 5 6 7 8 9'))
close
close()
Close the connection.
Examples:
Open and subsequently close a connection to a q process on localhost:
q = pykx.SecureQConnection('localhost', 5001, tls=True)
q.close()
Using this class with a with-statement should be preferred:
with pykx.SecureQConnection('localhost', 5001, tls=True) as q:
# do stuff with q
pass
# q is closed automatically
fileno
fileno()
The file descriptor or handle of the connection.