IPC
pykx.ipc
This page documents the API functions for using q IPC within PyKX.
MessageType
Bases: Enum
The message types available to q.
- 0 = async message
- 1 = sync message
- 2 = response message
QFuture
QFuture(q_connection, timeout, debug, poll_recv=None)
A Future object to be returned by calls to q from an instance of pykx.AsyncQConnection.
This object can be awaited to receive the resulting value.
Examples:
Await an instance of this class to receive the return value of an
AsyncQConnection
call.
async with pykx.AsyncQConnection('localhost', 5001) as q:
q_future = q('til 10') # returns a QFuture object
q_result = await q_future
__await__
__await__()
Await the result of the QFuture
.
Returns:
Type | Description |
---|---|
Any
|
The result of the |
Raises:
Type | Description |
---|---|
FutureCancelled
|
This QFuture instance has been cancelled and cannot be awaited. |
BaseException
|
If the future has an exception set it will be raised upon awaiting it. |
set_result
set_result(val)
Set the result of the QFuture
and mark it as done.
The result is set first, then any functions in the callback list will execute
with this Qfuture
as the only parameter input.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
val |
Any
|
The value to set as the result of the |
required |
set_exception
set_exception(err)
Set the exception of the QFuture
and mark it as done.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
err |
Exception
|
The exception to set as the exception of the |
required |
result
result()
Get the result of the QFuture
.
Returns:
Type | Description |
---|---|
Any
|
The result of the |
Raises:
Type | Description |
---|---|
FutureCancelled
|
This |
NoResults
|
The result is not ready. |
cancel
cancel(msg='')
Cancel the QFuture
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
msg |
str
|
An optional message to append to the end of the
|
''
|
exception
exception()
Get the exception of the QFuture
.
Returns:
Type | Description |
---|---|
None
|
The excpetion of the |
add_done_callback
add_done_callback(callback)
Add a callback function to the list of callback functions which will be executed after
the QFuture
result is set.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callback |
Callable
|
A callback function to append to the list of callback functions which will be
executed after the |
required |
The callback parameter must accept one parameter.
When it is executed the callback function will be passed the current instance of this class. The callback function is executed after the result of the future is set, allowing the use and modification of the result itself.
remove_done_callback
remove_done_callback(callback)
Remove a callback from the list of callbacks contained within the class.
All matching callbacks will be removed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callback |
Callable
|
The callback function to be removed from the list of callback functions to call. |
required |
Returns:
Type | Description |
---|---|
int
|
The number of functions removed. |
get_loop
get_loop()
Raises:
Type | Description |
---|---|
PyKXException
|
QFutures do not rely on an event loop to drive them, and therefore do not have one. |
QConnection
QConnection(
host="localhost",
port=None,
*args,
username="",
password="",
timeout=0.0,
large_messages=True,
tls=False,
unix=False,
wait=True,
lock=None,
no_ctx=False,
reconnection_attempts=-1,
reconnection_delay=0.5,
reconnection_function=reconnection_function
)
Bases: Q
Interface with a q process using the q IPC protocol.
Users are recommended to instantiate an object of pykx.SyncQConnection instead of using this class directly.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
host |
Union[str, bytes]
|
The hostname to connect to. |
'localhost'
|
port |
int
|
The port to connect to. |
None
|
username |
Union[str, bytes]
|
Username for q connection authorization. |
''
|
password |
Union[str, bytes]
|
Password for q connection authorization. |
''
|
timeout |
float
|
Timeout for blocking socket operations in seconds. If set to 0, the socket will be non-blocking. |
0.0
|
large_messages |
bool
|
Flag to enable support for messages >2GB. |
True
|
tls |
bool
|
Flag to enable tls. |
False
|
unix |
bool
|
Flag to enable Unix domain socket instead of TCP socket. If set to
|
False
|
wait |
bool
|
Whether the q server should send a response to the query (which this connection
will wait to receive). Can be overridden on a per-call basis. If |
True
|
no_ctx |
bool
|
Flag to disable the context interface. Disabling the context interface will not stop extra q queries being sent, but will disable the extra features around the context interface. |
False
|
reconnection_attempts |
int
|
The number of attempts to reconnect to the q server when there is
a disconnect. A negative value will disable reconnect attempts.
A value of 0 indicates no limit on reconnect attempts, with each attempt applying
|
-1
|
reconnection_function |
callable
|
A function to execute on each attempt to reconnect. This function
must take one parameter that must be a |
reconnection_function
|
reconnection_delay |
float
|
A |
0.5
|
The username
and password
parameters are not required.
The username
and password
parameters are only required if the
q server requires authorization. Refer to
ssl documentation for more information.
The timeout
argument may not always be enforced.
When making successive queries if one query times out the next query will wait until
a response has been received from the previous query before starting the timer for its
own timeout. This can be avoided by using a separate QConnection
instance
for each query.
Raises:
Type | Description |
---|---|
PyKXException
|
Using both tls and unix is not possible with a QConnection. |
upd
upd(table, data)
Execute .u.upd
on a remote q process. This function assumes the definition of
.u.upd
on the remote q process takes the same count and data type of arguments
as the default implementation (q keyword insert
). The data
argument
will be converted to a list if it is a PyKX Table
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
str
|
The name of the global variable on the q process to update. |
required |
data |
Union[list, Table]
|
The contents of the update. |
required |
Returns:
Type | Description |
---|---|
None
|
On successful execution this function will return None |
Successfully execute .u.upd
on connected process
>>> import pykx as kx
>>> with kx.SyncQConnection(port=5050) as q:
... q.upd('trade', [kx.TimespanAtom('now') 'AAPL', 1.0])
>>> trades = kx.Table(data = {
... 'time': kx.TimespanAtom('now'),
... 'sym': kx.random.random(N, ['AAPL', 'MSFT', 'GOOG']),
... 'price': kx.random.random(N, 10.0)})
>>> with kx.SyncQConnection(port=5050) as q:
... q.upd('trade', trades)
file_execute
file_execute(file_path, *, return_all=False)
Functionality for the execution of the content of a local file on a remote server
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str
|
Path to the file which is to be executed on a remote server |
required |
return_all |
bool
|
Return the execution result from all lines within the executed script |
False
|
Raises:
Type | Description |
---|---|
PyKXException
|
Will raise error associated with failure to execute the code on the server associated with the given file. |
Examples:
Connect to a q process on localhost and execute a file based on relative path.
conn = pykx.QConnection('localhost', 5000)
conn.file_execute('file.q')
Connect to a q process using an asynchronous QConnection at IP address 127.0.0.1, on port 5000 and execute a file based on absolute path.
conn = pykx.QConnection('127.0.0.1', 5000, wait=False)
conn.file_execute('/User/path/to/file.q')
SyncQConnection
SyncQConnection(
host="localhost",
port=None,
*args,
username="",
password="",
timeout=0.0,
large_messages=True,
tls=False,
unix=False,
wait=True,
lock=None,
no_ctx=False,
reconnection_attempts=-1,
reconnection_delay=0.5,
reconnection_function=reconnection_function
)
Bases: QConnection
Interface with a q process using the q IPC protocol.
Instances of this class represent an open connection to a q process, which can be sent messages synchronously or asynchronously by calling it as a function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
host |
Union[str, bytes]
|
The host name to which a connection is to be established. |
'localhost'
|
port |
int
|
The port to which a connection is to be established. |
None
|
username |
Union[str, bytes]
|
Username for q connection authorization. |
''
|
password |
Union[str, bytes]
|
Password for q connection authorization. |
''
|
timeout |
float
|
Timeout for blocking socket operations in seconds. If set to 0, the socket will be non-blocking. |
0.0
|
large_messages |
bool
|
Whether support for messages >2GB should be enabled. |
True
|
tls |
bool
|
Whether TLS should be used. |
False
|
unix |
bool
|
Whether a Unix domain socket should be used instead of TCP. If set to
|
False
|
wait |
bool
|
Whether the q server should send a response to the query (which this connection
will wait to receive). Can be overridden on a per-call basis. If |
True
|
no_ctx |
bool
|
This parameter determines whether or not the context interface will be disabled. disabling the context interface will stop extra q queries being sent but will disable the extra features around the context interface. |
False
|
reconnection_attempts |
int
|
This parameter specifies how many attempts will be made to
reconnect to the server if the connection is lost. The query will be resent if the
reconnection is successful. The default is -1 which will not attempt to
reconnect, 0 will continuously attempt to reconnect to the server using the backoff
|
-1
|
reconnection_delay |
float
|
This parameter outlines the initial delay between reconnection
attempts, by default this is set to 0.5 seconds and is passed to the function
defined by the |
0.5
|
reconnection_function |
callable
|
This parameter defines the function which is used to modify the
|
reconnection_function
|
The username
and password
parameters are not required.
The username
and password
parameters are only required if the
q server requires authorization. Refer to
ssl documentation for more information.
The timeout
argument may not always be enforced when making successive
queries. When making successive queries if one query times out the next query will
wait until a response has been received from the previous query before starting the
timer for its own timeout. This can be avoided by using a separate
SyncQConnection
instance for each query.
Raises:
Type | Description |
---|---|
PyKXException
|
Using both tls and unix is not possible with a QConnection. |
Examples:
Connect to a q process on localhost with a required username and password.
pykx.SyncQConnection('localhost', 5001, 'username', 'password')
Connect to a q process at IP address 127.0.0.0, on port 5000 with a timeout of 2 seconds and TLS enabled.
pykx.SyncQConnection('127.0.0.1', 5001, timeout=2.0, tls=True)
Connect to a q process via a Unix domain socket on port 5001
pykx.SyncQConnection(port=5001, unix=True)
Automatically reconnect to a q server after a disconnect.
>>> conn = kx.SyncQConnection(port=5001, reconnection_attempts=0)
>>> conn('til 10')
pykx.LongVector(pykx.q('0 1 2 3 4 5 6 7 8 9'))
# server connection is lost here
>>> conn('til 10')
WARNING: Connection lost attempting to reconnect.
Failed to reconnect, trying again in 0.5 seconds.
Connection successfully reestablished.
pykx.LongVector(pykx.q('0 1 2 3 4 5 6 7 8 9'))
__call__
__call__(query, *args, wait=None, debug=False, skip_debug=False)
Evaluate a query on the connected q process over IPC.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
Union[str, bytes, CharVector]
|
A q expression to be evaluated. |
required |
*args |
Any
|
Arguments to the q query. Each argument will be converted into a
|
()
|
wait |
Optional[bool]
|
Whether the q server should execute the query before responding.
If |
None
|
Raises:
Type | Description |
---|---|
RuntimeError
|
A closed IPC connection was used. |
QError
|
Query timed out, may be raised if the time taken to make or receive a query goes over the timeout limit. |
TypeError
|
Too many arguments were provided - q queries cannot have more than 8 parameters. |
ValueError
|
Attempted to send a Python function over IPC. |
Examples:
q = pykx.SyncQConnection(host='localhost', port=5002)
Call an anonymous function with 2 parameters
q('{y+til x}', 10, 5)
Execute a q query with no parameters
q('til 10')
Call an anonymous function with 3 parameters and don't wait for a response
q('{x set y+til z}', 'async_query', 10, 5, wait=False)
Call an anonymous function with 3 parameters and don't wait for a response by default
q = pykx.SyncQConnection(host='localhost', port=5002, wait=False)
# Because `wait=False`, all calls on this q instance are not responded to by default:
q('{x set y+til z}', 'async_query', 10, 5)
# But we can issue calls and wait for results by overriding the `wait` option on a per-call
# basis:
q('{x set y+til z}', 'async_query', 10, 5, wait=True)
Call a PyKX Operator function with supplied parameters
q(kx.q.sum, [1, 2, 3])
Call a PyKX Keyword function with supplied paramters
q(kx.q.floor, [5.2, 10.4])
close
close()
Close the connection.
Examples:
Open and subsequently close a connection to a q process on localhost:
q = pykx.SyncQConnection('localhost', 5001)
q.close()
Using this class with a with-statement should be preferred:
with pykx.SyncQConnection('localhost', 5001) as q:
# do stuff with q
pass
# q is closed automatically
fileno
fileno()
The file descriptor or handle of the connection.
AsyncQConnection
AsyncQConnection(
host="localhost",
port=None,
*args,
username="",
password="",
timeout=0.0,
large_messages=True,
tls=False,
unix=False,
wait=True,
lock=None,
event_loop=None,
no_ctx=False,
reconnection_attempts=-1,
reconnection_delay=0.5,
reconnection_function=reconnection_function
)
Bases: QConnection
Interface with a q process using the q IPC protocol.
Instances of this class represent an open connection to a q process, which can be sent messages asynchronously by calling it as a function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
host |
Union[str, bytes]
|
The host name to which a connection is to be established. |
'localhost'
|
port |
int
|
The port to which a connection is to be established. |
None
|
username |
Union[str, bytes]
|
Username for q connection authorization. |
''
|
password |
Union[str, bytes]
|
Password for q connection authorization. |
''
|
timeout |
float
|
Timeout for blocking socket operations in seconds. If set to 0, the socket will be non-blocking. |
0.0
|
large_messages |
bool
|
Whether support for messages >2GB should be enabled. |
True
|
tls |
bool
|
Whether TLS should be used. |
False
|
unix |
bool
|
Whether a Unix domain socket should be used instead of TCP. If set to
|
False
|
wait |
bool
|
Whether the q server should send a response to the query (which this connection
will wait to receive). Can be overridden on a per-call basis. If |
True
|
event_loop |
Optional[asyncio.AbstractEventLoop]
|
If running an event loop that supports the |
None
|
no_ctx |
bool
|
This parameter determines whether or not the context interface will be disabled. disabling the context interface will stop extra q queries being sent but will disable the extra features around the context interface. |
False
|
reconnection_attempts |
int
|
This parameter specifies how many attempts will be made to
reconnect to the server if the connection is lost. The query will be resent if the
reconnection is successful. The default is -1 which will not attempt to reconnect, 0
will continuously attempt to reconnect to the server using the backoff
|
-1
|
reconnection_delay |
float
|
This parameter outlines the initial delay between reconnection
attempts, by default this is set to 0.5 seconds and is passed to the function
defined by the |
0.5
|
reconnection_function |
callable
|
This parameter defines the function which is used to modify the
|
reconnection_function
|
The username
and password
parameters are not required.
The username
and password
parameters are only required if
the q server requires authorization. Refer to
ssl documentation for more information.
The timeout
argument may not always be enforced when making
successive queries. When making successive queries if one query times out the next query
will wait until a response has been received from the previous query before starting the
timer for its own timeout. This can be avoided by using a separate
QConnection
instance for each query.
AsyncQConnections will not resend queries that have not completed on reconnection.
When using the reconnection_attempts
key word argument any queries that were
not complete before the connection was lost will have to be manually sent again after
the automatic reconnection.
Raises:
Type | Description |
---|---|
PyKXException
|
Using both tls and unix is not possible with a QConnection. |
Examples:
Connect to a q process on localhost with a required username and password.
await pykx.AsyncQConnection('localhost', 5001, 'username', 'password')
Connect to a q process at IP address 127.0.0.0, on port 5000 with a timeout of 2 seconds and TLS enabled.
await pykx.AsyncQConnection('127.0.0.1', 5001, timeout=2.0, tls=True)
Connect to a q process via a Unix domain socket on port 5001
await pykx.AsyncQConnection(port=5001, unix=True)
Automatically reconnect to a q server after a disconnect.
async def main():
conn = await kx.AsyncQConnection(
port=5001,
event_loop=asyncio.get_event_loop(),
reconnection_attempts=0
)
print(await conn('til 10'))
# Connection lost here
# All unfinished futures are cancelled on connection loss
print(await conn('til 10')) # First call only causes a reconnection but wont send the query and returns none
print(await conn('til 10')) # Second one completes
print(await conn('til 10'))
asyncio.run(main())
# Outputs
0 1 2 3 4 5 6 7 8 9
WARNING: Connection lost attempting to reconnect.
Connection successfully reestablished.
Connection was lost no result
None
0 1 2 3 4 5 6 7 8 9
_initobj
async
_initobj()
Crutch used for __await__
after spawning.
__call__
__call__(query, *args, wait=True, reuse=True, debug=False)
Evaluate a query on the connected q process over IPC.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
Union[str, bytes, CharVector]
|
A q expression to be evaluated. |
required |
*args |
Any
|
Arguments to the q query. Each argument will be converted into a
|
()
|
wait |
bool
|
Whether the q server should execute the query before responding.
If |
True
|
reuse |
bool
|
Whether the AsyncQConnection instance should be reused for subsequent queries,
if using q queries that respond in a deferred/asynchronous manner this should be set
to |
True
|
Returns:
Type | Description |
---|---|
QFuture
|
A QFuture object that can be awaited on to get the result of the query. |
Raises:
Type | Description |
---|---|
RuntimeError
|
A closed IPC connection was used. |
QError
|
Query timed out, may be raised if the time taken to make or receive a query goes over the timeout limit. |
TypeError
|
Too many arguments were provided - q queries cannot have more than 8 parameters. |
ValueError
|
Attempted to send a Python function over IPC. |
Examples:
q = await pykx.AsyncQConnection(host='localhost', port=5002)
Call an anonymous function with 2 parameters
await q('{y+til x}', 10, 5)
Execute a q query with no parameters
await q('til 10')
Call an anonymous function with 3 parameters and don't wait for a response
await q('{x set y+til z}', 'async_query', 10, 5, wait=False)
Call an anonymous function with 3 parameters and don't wait for a response by default
q = await pykx.AsyncQConnection(host='localhost', port=5002, wait=False)
# Because `wait=False`, all calls on this q instance are not responded to by default:
await q('{x set y+til z}', 'async_query', 10, 5)
# But we can issue calls and wait for results by overriding the `wait` option on a per-call
# basis:
await q('{x set y+til z}', 'async_query', 10, 5, wait=True)
Call a PyKX Operator function with supplied parameters
await q(kx.q.sum, [1, 2, 3])
Call a PyKX Keyword function with supplied paramters
await q(kx.q.floor, [5.2, 10.4])
close
async
close()
Close the connection.
Examples:
Open and subsequently close a connection to a q process on localhost:
q = await pykx.AsyncQConnection('localhost', 5001)
await q.close()
Using this class with a with-statement should be preferred:
async with pykx.AsyncQConnection('localhost', 5001) as q:
# do stuff with q
pass
# q is closed automatically
fileno
fileno()
The file descriptor or handle of the connection.
RawQConnection
RawQConnection(
host="localhost",
port=None,
*args,
username="",
password="",
timeout=0.0,
large_messages=True,
tls=False,
unix=False,
wait=True,
event_loop=None,
no_ctx=False,
as_server=False,
conn_gc_time=0.0
)
Bases: QConnection
Interface with a q process using the q IPC protocol.
Instances of this class represent an open connection to a q process, which can be sent messages asynchronously by calling it as a function, the send and receive selector queues can also, be polled directly using this class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
host |
Union[str, bytes]
|
The host name to which a connection is to be established. |
'localhost'
|
port |
int
|
The port to which a connection is to be established. |
None
|
username |
Union[str, bytes]
|
Username for q connection authorization. |
''
|
password |
Union[str, bytes]
|
Password for q connection authorization. |
''
|
timeout |
float
|
Timeout for blocking socket operations in seconds. If set to 0, the socket will be non-blocking. |
0.0
|
large_messages |
bool
|
Whether support for messages >2GB should be enabled. |
True
|
tls |
bool
|
Whether TLS should be used. |
False
|
unix |
bool
|
Whether a Unix domain socket should be used instead of TCP. If set to
|
False
|
wait |
bool
|
Whether the q server should send a response to the query (which this connection
will wait to receive). Can be overridden on a per-call basis. If |
True
|
event_loop |
Optional[asyncio.AbstractEventLoop]
|
If running an event loop that supports the |
None
|
no_ctx |
bool
|
This parameter determines whether or not the context interface will be disabled. disabling the context interface will stop extra q queries being sent but will disable the extra features around the context interface. |
False
|
as_server |
bool
|
If this parameter is set to True the QConnection will act as a q server, that other processes can connect to, and will not create a connection. this functionality is licensed only. |
False
|
conn_gc_time |
float
|
When running as a server this will determine the number of seconds between
going through the list of opened connections and closing any that the clients have
closed. If not set the default of 0.0 will cause any old connections to never be
closed unless |
0.0
|
The username
and password
parameters are not required.
The username
and password
parameters are only required if the q
server requires authorization. Refer to
ssl documentation for more information.
The timeout
argument may not always be enforced when making successive
queries. When making successive queries if one query times out the next query will wait
until a response has been received from the previous query before starting the timer for
its own timeout. This can be avoided by using a separate QConnection
instance
for each query.
The overhead of calling clean_open_connections
is large.
When running as a server you should ensure that clean_open_connections
is
called fairly infrequently as the overhead of clearing all the dead connections can be
quite large. It is recommended to have a large delay on successive clears or manage it
manually.
Raises:
Type | Description |
---|---|
PyKXException
|
Using both tls and unix is not possible with a QConnection. |
Examples:
Connect to a q process on localhost with a required username and password.
await pykx.RawQConnection('localhost', 5001, 'username', 'password')
Connect to a q process at IP address 127.0.0.0, on port 5000 with a timeout of 2 seconds and TLS enabled.
await pykx.RawQConnection('127.0.0.1', 5001, timeout=2.0, tls=True)
Connect to a q process via a Unix domain socket on port 5001
await pykx.RawQConnection(port=5001, unix=True)
_initobj
async
_initobj()
Crutch used for __await__
after spawning.
__call__
__call__(query, *args, wait=True, debug=False)
Evaluate a query on the connected q process over IPC.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
Union[str, bytes, CharVector]
|
A q expression to be evaluated. |
required |
*args |
Any
|
Arguments to the q query. Each argument will be converted into a
|
()
|
wait |
bool
|
Whether the q server should execute the query before responding. If
|
True
|
Returns:
Type | Description |
---|---|
QFuture
|
A QFuture object that can be awaited on to get the result of the query. |
Raises:
Type | Description |
---|---|
RuntimeError
|
A closed IPC connection was used. |
QError
|
Query timed out, may be raised if the time taken to make or receive a query goes over the timeout limit. |
TypeError
|
Too many arguments were provided - q queries cannot have more than 8 parameters. |
ValueError
|
Attempted to send a Python function over IPC. |
Note: When querying KX Insights the no_ctx=True
keyword argument must be used.
Examples:
q = await pykx.RawQConnection(host='localhost', port=5002)
Call an anonymous function with 2 parameters
await q('{y+til x}', 10, 5)
Execute a q query with no parameters
await q('til 10')
Call an anonymous function with 3 parameters and don't wait for a response
await q('{x set y+til z}', 'async_query', 10, 5, wait=False)
Call an anonymous function with 3 parameters and don't wait for a response by default
q = await pykx.RawQConnection(host='localhost', port=5002, wait=False)
# Because `wait=False`, all calls on this q instance are not responded to by default:
await q('{x set y+til z}', 'async_query', 10, 5)
# But we can issue calls and wait for results by overriding the `wait` option on a per-call
# basis:
await q('{x set y+til z}', 'async_query', 10, 5, wait=True)
poll_send
poll_send(amount=1)
Send queued queries to the process connected to over IPC.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
amount |
int
|
The number of send requests to handle, defaults to one, if 0 is used then all currently waiting queries will be sent. |
1
|
Raises:
Type | Description |
---|---|
QError
|
Query timed out, may be raised if the time taken to make or receive a query goes over the timeout limit. |
Examples:
q = await pykx.RawQConnection(host='localhost', port=5002)
Send a single queued message.
q_fut = q('til 10') # not sent yet
q.poll_send() # 1 message is sent
Send two queued messages.
q_fut = q('til 10') # not sent yet
q_fut2 = q('til 10') # not sent yet
q.poll_send(2) # 2 messages are sent
Send all queued messages.
q_fut = q('til 10') # not sent yet
q_fut2 = q('til 10') # not sent yet
q.poll_send(0) # 2 messages are sent
poll_recv_async
poll_recv_async()
Asynchronously receive a query from the process connected to over IPC.
Raises:
Type | Description |
---|---|
QError
|
Query timed out, may be raised if the time taken to make or receive a query goes over the timeout limit. |
Examples:
Receive a single queued message.
q = await pykx.RawQConnection(host='localhost', port=5002)
q_fut = q('til 10') # not sent yet
q.poll_send() # message is sent
await q.poll_recv_async() # message response is received
poll_recv
poll_recv(amount=1)
Recieve queries from the process connected to over IPC.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
amount |
int
|
The number of receive requests to handle, defaults to one, if 0 is used then all currently waiting responses will be received. |
1
|
Raises:
Type | Description |
---|---|
QError
|
Query timed out, may be raised if the time taken to make or receive a query goes over the timeout limit. |
Examples:
q = await pykx.RawQConnection(host='localhost', port=5002)
Receive a single queued message.
q_fut = q('til 10') # not sent yet
q.poll_send() # message is sent
q.poll_recv() # message response is received
Receive two queued messages.
q_fut = q('til 10') # not sent yet
q_fut2 = q('til 10') # not sent yet
q.poll_send(2) # messages are sent
q.poll_recv(2) # message responses are received
Receive all queued messages.
q_fut = q('til 10') # not sent yet
q_fut2 = q('til 10') # not sent yet
q.poll_send(0) # all messages are sent
q.poll_recv(0) # all message responses are received
close
async
close()
Close the connection.
Examples:
Open and subsequently close a connection to a q process on localhost:
q = await pykx.RawQConnection('localhost', 5001)
await q.close()
Using this class with a with-statement should be preferred:
async with pykx.RawQConnection('localhost', 5001) as q:
# do stuff with q
pass
# q is closed automatically
SecureQConnection
SecureQConnection(
host="localhost",
port=None,
*args,
username="",
password="",
timeout=0.0,
large_messages=True,
tls=False,
unix=False,
wait=True,
lock=None,
no_ctx=False,
reconnection_attempts=-1,
reconnection_delay=0.5,
reconnection_function=reconnection_function
)
Bases: QConnection
Interface with a q process using the q IPC protocol.
Instances of this class represent an open connection to a q process, which can be sent messages synchronously or asynchronously by calling it as a function. This class is automatically created when using TLS to encrypt your queries.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
host |
Union[str, bytes]
|
The host name to which a connection is to be established. |
'localhost'
|
port |
int
|
The port to which a connection is to be established. |
None
|
username |
Union[str, bytes]
|
Username for q connection authorization. |
''
|
password |
Union[str, bytes]
|
Password for q connection authorization. |
''
|
timeout |
float
|
Timeout for blocking socket operations in seconds. If set to 0, the socket will be non-blocking. |
0.0
|
large_messages |
bool
|
Whether support for messages >2GB should be enabled. |
True
|
tls |
bool
|
Whether TLS should be used. |
False
|
unix |
bool
|
Whether a Unix domain socket should be used instead of TCP. If set to
|
False
|
wait |
bool
|
Whether the q server should send a response to the query (which this connection
will wait to receive). Can be overridden on a per-call basis. If |
True
|
no_ctx |
bool
|
This parameter determines whether or not the context interface will be disabled. disabling the context interface will stop extra q queries being sent but will disable the extra features around the context interface. |
False
|
reconnection_attempts |
int
|
This parameter specifies how many attempts will be made to
reconnect to the server if the connection is lost. The query will be resent if the
reconnection is successful. The default is -1 which will not attempt to reconnect, 0
will continuously attempt to reconnect to the server using the backoff
|
-1
|
reconnection_delay |
float
|
This parameter outlines the initial delay between reconnection
attempts, by default this is set to 0.5 seconds and is passed to the function
defined by the |
0.5
|
reconnection_function |
callable
|
This parameter defines the function which is used to modify the
|
reconnection_function
|
The username
and password
parameters are not required.
The username
and password
parameters are only required if
the q server requires authorization. Refer to
ssl documentation for more information.
The timeout
argument may not always be enforced when making successive
queries. When making successive queries if one query times out the next query will wait
until a response has been received from the previous query before starting the timer for
its own timeout. This can be avoided by using a separate SecureQConnection
instance for each query.
Raises:
Type | Description |
---|---|
PyKXException
|
Using both tls and unix is not possible with a QConnection. |
Examples:
Connect to a q process at IP address 127.0.0.0, on port 5000 with a timeout of 2 seconds and TLS enabled.
pykx.SecureQConnection('127.0.0.1', 5001, timeout=2.0, tls=True)
__call__
__call__(query, *args, wait=None, debug=False)
Evaluate a query on the connected q process over IPC.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
Union[str, bytes, CharVector, K]
|
A q expression to be evaluated. |
required |
*args |
Any
|
Arguments to the q query. Each argument will be converted into a
|
()
|
wait |
Optional[bool]
|
Whether the q server should execute the query before responding. If
|
None
|
Raises:
Type | Description |
---|---|
RuntimeError
|
A closed IPC connection was used. |
QError
|
Query timed out, may be raised if the time taken to make or receive a query goes over the timeout limit. |
TypeError
|
Too many arguments were provided - q queries cannot have more than 8 parameters. |
ValueError
|
Attempted to send a Python function over IPC. |
Examples:
q = pykx.SecureQConnection(host='localhost', port=5002, tls=True)
Call an anonymous function with 2 parameters
q('{y+til x}', 10, 5)
Execute a q query with no parameters
q('til 10')
Call an anonymous function with 3 parameters and don't wait for a response
q('{x set y+til z}', 'async_query', 10, 5, wait=False)
Call an anonymous function with 3 parameters and don't wait for a response by default
q = pykx.SecureQConnection(host='localhost', port=5002, wait=False, tls=True)
# Because `wait=False`, all calls on this q instance are not responded to by default:
q('{x set y+til z}', 'async_query', 10, 5)
# But we can issue calls and wait for results by overriding the `wait` option on a per-call
# basis:
q('{x set y+til z}', 'async_query', 10, 5, wait=True)
Call a PyKX Operator function with supplied parameters
q(kx.q.sum, [1, 2, 3])
Call a PyKX Keyword function with supplied paramters
q(kx.q.floor, [5.2, 10.4])
Automatically reconnect to a q server after a disconnect.
>>> conn = kx.SecureQConnection(port=5001, reconnection_attempts=0)
>>> conn('til 10')
pykx.LongVector(pykx.q('0 1 2 3 4 5 6 7 8 9'))
>>> conn('til 10')
WARNING: Connection lost attempting to reconnect.
Failed to reconnect, trying again in 0.5 seconds.
Connection successfully reestablished.
pykx.LongVector(pykx.q('0 1 2 3 4 5 6 7 8 9'))
close
close()
Close the connection.
Examples:
Open and subsequently close a connection to a q process on localhost:
q = pykx.SecureQConnection('localhost', 5001, tls=True)
q.close()
Using this class with a with-statement should be preferred:
with pykx.SecureQConnection('localhost', 5001, tls=True) as q:
# do stuff with q
pass
# q is closed automatically
fileno
fileno()
The file descriptor or handle of the connection.