Skip to content

Communicating via IPC

q IPC connections are often used to connect into a central server / gateway that contains large amounts of historical data. PyKX QConnection instances provide a way to connect into these servers and directly query the data within them. This allows users to access data within a running q process, optionally convert it into a Python object and then locally perform analysis / transformations to the data within python. For licensed users the local object can be used within embedded q, for unlicensed users they will first have to convert it to a python type with one of the helper methods (.py()/.np()/.pd()/.pa()). This allows users to get the best of both worlds where they can harness the power of q as well as the power of other existing python libraries to perform analysis and modifications to q data.

Modalities of use for IPC

Using the IPC module is available to both licensed and unlicensed users. Using a QConnection instance is the only way for an unlicensed user to run q code directly within PyKX. When using a QConnection instance in unlicensed mode you must convert the resulting value back into a python type before it is usable. In licensed mode the resulting value can be directly modified and used within Embedded Q without first converting it. For both licensed and unlicensed users this module can be used to replace the functionality of qPython.

# Licensed mode
with kx.SyncQConnection('localhost', 5001) as q:
    result = q.til(10)
    print(result)
    print(result.py())

0 1 2 3 4 5 6 7 8 9
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# Unlicensed mode
with kx.SyncQConnection('localhost', 5001) as q:
    result = q.til(10)
    print(result)
    print(result.py())

kx.LongVector._from_addr(0x7fcab6800b80)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Methods of Instantiating

There are two ways to create each subclass of [pykx.QConnection][], the first is to directly instantiate the connection instance and the second option is to create them in the form of a context interface. Using the context interface method of declaring these [pykx.QConnection][] instances should be preferred as it will ensure that the connection instance is properly closed automatically when leaving the scope of the context.

Manually creating a QConnection

q = kx.SyncQConnection('localhost', 5001) # Directly instantiate a QConnection instance
q(...) # Make some queries
q.close() # Must manually ensure it is closed when no longer needed

Using a context interface to create and manage the QConnection

with kx.SyncQConnection('localhost', 5001) as q:
    q(...) # Make some queries
# QConnection is automatically closed here

Performance Considerations

When querying [pykx.Table][] instances on the remote process you should avoid directly calling the table object as that will result in the entirety of the table being sent over IPC and then loaded within the Python process. You should ensure that when querying tables over IPC that you are applying sufficient filters to your query, so that you limit the amount of data being converted and transfered between processes.

Execution Contexts

Functions pulled in over IPC execute locally within PyKX by default using embedded q. Symbolic functions can be used to execute in a different context instead, such as over IPC in the q instance where the function was originally defined. The context interface provides symbolic functions for all functions accessed through it by default.

In the following example, q is a [pykx.QConnection][] instance.

The following call to the q function save executes locally using embedded q, because q('save') returns a regular [pykx.Function][] object.

with kx.SyncQConnection('localhost', 5001) as q:
    q('save')('t') # Executes locally within Embedded q

When save is accessed through the context interface, it is a [pykx.SymbolicFunction][] object instead, which means it is simultaneously an instance of [pykx.Function][] and [pykx.SymbolAtom][]. When it is executed, the function retrived within its execution context using its symbol value, and so it is executed in the q server where save is defined.

with kx.SyncQConnection('localhost', 5001) as q:
    q.save('t') # Executes in the q server over IPC

Alternatively, one can simply access & use the function by name manually within a single query. This differs from the first case because the query includes the argument for save, and so what is returned is the result of calling save with the argument t, rather than the save function itself.

with kx.SyncQConnection('localhost', 5001) as q:
    q('save', 't') # Executes in the q server over IPC

Asynchronous Execution

In order to make asynchronous queries to q with PyKX a [pykx.AsyncQConnection][] must be used. When an instance of an [pykx.AsyncQConnection][] is called the query will be sent to the q server and control will be immediately handed back to the running Python program. The __call__ function returns a [pykx.QFuture][] instance that can later be awaited on to block until a result has been received.

If you are using a third party library that runs an eventloop to manage asynchronous calls, you must ensure you use the event_loop keyword argument to pass the event loop into the [pykx.AsyncQConnection][] instance. This will allow the eventloop to properly manage the returned [pykx.QFuture][] objects.

async with kx.AsyncQConnection('localhost', 5001, event_loop=asyncio.get_event_loop()) as q:
    fut = q('til 10') # returns a QFuture that can later be awaited on, this future is attached to the event loop
    await fut # await the future object to get the result

If you are using an [pykx.AsyncQConnection][] to make q queries that respond in a deferred manner , you must make the call using the reuse=False parameter. By using this parameter the query will be made over a dedicated [pykx.AsyncQConnection][] instance that is closed upon the result being received.

async with kx.AsyncQConnection('localhost', 5001, event_loop=asyncio.get_event_loop()) as q:
    fut = q('query', wait=False, reuse=False) # query a q process that is going to return a deferred result
    await fut # await the future object to get the result

File Execution

In addition to the ability to execute code remotely using explicit calls to the various [pykx.QConnection][] instances, it is also possible to pass to these instances the name of a file available locally which can be executed on the remote server. This is supported under the condition that the file being executed remotely contains all of the required logic to be executed, or the server contains sufficient libraries and associated files to allow execution to occur.

The following provide and example of the usage of this functionality on both a syncronous and asyncronous use-case.

with kx.SyncQConnection(port = 5000) as q:
    q.file_execute('/absolute/path/to/file.q')
    ret = q('.test.variable.set.in.file.q', return_all=True)
async with kx.AsyncQConnection('localhost', 5001) as q:
    q.file_execute('../relative/path/to/file.q')
    ret = await q('.test.variable.set.in.file.q')

Reconnecting to a kdb+ server

When generating a client-server architecture it is often the case that for short periods of time your server may be inaccessible due to network issues or planned outages. At such times clients connected to these servers will need to reconnect, this may require them to manually 'close' their existing stale connection and reconnect using the same credentials to the now restarted server. From PyKX 2.4+ the ability to manually configure reconnection attempts for clients connecting to servers has been added via the addition of the reconnection_attempts keyword argument. The following example shows the output of when attempting to make use of a connection which has been cancelled and is subsequently re-established:

>>> conn = kx.SyncQconnection(port=5050, reconnection_attempts=5)
>>> conn('1+1')  # after this call the server on port 5050 is shutdown for 2 seconds
pykx.LongAtom(pykx.q('2')
>>> conn('1+2')
WARNING: Connection lost attempting to reconnect.
Failed to reconnect, trying again in 0.5 seconds.
Failed to reconnect, trying again in 1.0 seconds.
Connection successfully reestablished.
pykx.LongAtom(pykx.q('3'))