Skip to content

PyKX Subscribing to a q Process

This example demonstrates using PyKX to setup a python process as a subscriber to data messages published from a q process.

Pre-requisites

A kdb+ license is required to complete this example. Sign-up for a license.

The following python libraries are required to run this example:

  1. pykx
  2. asyncio

The source code for this example is available in the examples directory here:

  1. Synchronous subscriber
  2. Asynchronous subscriber

Summary of steps

Both example scripts for setting up a subscriber follow the same steps:

  1. Start a q process running with some open port (5001 is used for the example, but you may choose any open port).
  2. Run the python subscriber by executing the script from the github repository.

Run the subscriber example

  1. Begin by running a q process with an open port:

    // run q
    $ q -p 5001
    q)
    1. In a separate terminal start a python process running the subscriber script:

    // run the subscriber, which connects automatically
    $ python subscriber.py
    The python process opens an IPC connection to the q process and sets a new global variable on the q process as part of the main function:

        async def main():
            global table
            async with kx.RawQConnection(port=5001) as q:
                print('===== Initial Table =====')
                print(table)
                print('===== Initial Table =====')
                await q('py_server:neg .z.w')
                await main_loop(q)
    The q process now has the variable py_server set to the handle of the python process once the python process connects.

  2. Once this variable is set, you can send rows of the table to the python process and they are appended as they are received.

    // run the subscriber, which automatically connects
    $ python subscriber.py
    ===== Initial Table =====
    a b
    ---
    4 8
    9 1
    2 9
    7 5
    0 4
    1 6
    9 6
    2 1
    1 8
    8 5
    ===== Initial Table =====
    
  3. As the Python process is initiated, it connects to the q server and sets the py_server variable and creates the initial table.

    q)py_server[1 2]
    
  4. Send a new table row (1, 2) to the python process from q.

    Received new table row from q: 1 2
    a b
    ---
    4 8
    9 1
    2 9
    7 5
    0 4
    1 6
    9 6
    2 1
    1 8
    8 5
    1 2

    The new row has been appended to the table.

Run the asynchronous subscriber example

  1. Begin by running a q process with an open port:

    // run q
    $ q -p 5001
    q)
    1. In a separate terminal start a python process running the asynchronous subscriber script:

    // run the asynchronous subscriber which automatically connects
    $ python subscriber_async.py
    The python process opens an IPC connection to the q process and sets a new global variable on the q process as part of the main function:

        async def main():
            global table
            async with kx.RawQConnection(port=5001) as q:
                print('===== Initial Table =====')
                print(table)
                print('===== Initial Table =====')
                await q('py_server:neg .z.w')
                await main_loop(q)
    The q process now has the variable py_server set to the handle of the python process once the python process connects.

  2. Once this variable is set, you can send rows of the table to the python process and they are appended as they are received.

    // run the subscriber, which automatically connects
    $ python subscriber_async.py
    ===== Initial Table =====
    a b
    ---
    4 8
    9 1
    2 9
    7 5
    0 4
    1 6
    9 6
    2 1
    1 8
    8 5
    ===== Initial Table =====
    
  3. As the Python process is initiated, it connects to the q server and sets the py_server variable and creates the initial table.

    q)py_server[1 2]
    
  4. Send a new table row (1, 2) to the python process from q.

    Received new table row from q: 1 2
    a b
    ---
    4 8
    9 1
    2 9
    7 5
    0 4
    1 6
    9 6
    2 1
    1 8
    8 5
    1 2

    The new row has been appended to the table.

Summary

This example has demonstrated how to initiate a q process, subscribe to an existing table, and append rows to it either synchronously or asynchronously.

Next steps

Check out more examples such as:

  • [Real-Time Streaming]
  • [Compression and Encryption]