Skip to content

Custom query API development

This page outlines how you can augment your streaming process with accessible named query APIs.

Disclaimer

The functionality outlined below provides the necessary tools for users to build complex streaming infrastructures. The generation and management of such workflows rest solely with the users. KX supports only individual elements used to create these workflows, not the end-to-end applications.

The addition and use of custom query APIs is often crucial for making your data accessible to users. Users connected to your process via IPC or by a querying Gateway process call these APIs. You can place custom query APIs on any process type discussed in the basic, analysis and subscription sections.

In each case, you can add a query API by calling the register_api method on each of the process types or during the configuration of an RTP , HDB or GATEWAY process in your system. A breakdown of gateway processes follows this section here. In the examples below we add query APIs to the historical database created when configuring the basic infrastructure and the RTP processing the aggregate dataset.

Configure an API for your Real-Time Processor

You can add APIs to your process at configuration time or while the process is in operation, to allow an iterative development. The following sections show how both approaches can be achieved to create a Python function which takes multiple parameters:

  1. The table which is being queried
  2. The symbol which a user is interested in

And returns the number of instances of that symbol:

def custom_api(table, symbol):
    return kx.q.sql(f'select count(*) from {table} where sym like $1', symbol)['xcol'][0]

Add an API to an existing RTP and HDB

Now that you have the function definition, use the register_api function to augment the rtp class created here.

rtp.register_api('symbol_count', custom_api)

Similarly, you can add the equivalent API to your HDB process generated here by accessing the hdb class as follows:

basic.hdb.register_api('symbol_count', custom_api)
API documentation

The following bullet-points provide links to the various functions used within the above section

Add an API when configuring your system

In the previous section you added custom APIs to a running system. To make APIs available on restart, you can add them at the configuration time for the processes. For instance, let's modify the example here to include an API.

If we're adding an API at configuration, it's supplied as a dictionary mapping the name of the API to the API code:

def preprocessor(table, data):
    if table == 'trade':
        return data
    else:
        return None

def postprocessor(table, data):
    agg = kx.q[table].select(
        columns = {'min_px':'min price',
                   'max_px': 'max price',
                   'spread_px': 'max[price] - min price'},
         by = {'symbol': 'symbol'})
    kx.q['agg'] = agg # Make the table accessible from q
    with kx.SyncQConnection(port=5010, wait=False, no_ctx=True) as q:
        q('.u.upd', 'aggregate', agg._values)
    return None

def custom_api(table, symbol):
    return kx.q.sql(f'select count(*) from {table} where sym like $1', symbol)['xcol'][0]

rtp = kx.tick.RTP(port=5014,
                  subscriptions = ['trade'],
                  libraries={'kx': 'pykx'},
                  pre_processor=preprocessor,
                  post_processor=postprocessor,
                  apis={'symbol_count': custom_api},
                  vanilla=False)
rtp.start({'tickerplant': 'localhost:5013'})

Currently we don't support the addition of APIs to the components of the basic infrastructure at startup. To configure a historical database at startup with more fine-grained control, configure it manually as outlined here.

API documentation

The following bullet-points provide links to the various functions used within the above section

Test an API

In the above we are defining that users calling this function will do so by making use of the named function symbol_count. You can directly test this once registered, as it follows:

rtp('symbol_count', 'trade', 'AAPL')

Alternatively, you can test this using IPC:

with kx.SyncQConnection(port=5014, no_ctx=True) as q:
    q('symbol_count', 'trade', 'AAPL')

Next steps

Now that you have data being published to your system you may be interested in the following:

  • Generate a query routing gateway to allow queries across multiple processes here.
  • Manually configuring the basic infrastructure as outlined here.

For some further reading, here are some related topics:

  • Learn more about Interprocess Communication(IPC) here.
  • Learn more about how you can query your data here