Manage query routing
This page outlines how to provide a central, password-protected query location for users.
Disclaimer
The functionality outlined below provides the necessary tools for users to build complex streaming infrastructures. The generation and management of such workflows rest solely with the users. KX supports only individual elements used to create these workflows, not the end-to-end applications.
When providing users with access to data within your system, you must consider the following priorities:
- How can you provide a central point for users to query your data?
- How do you regulate the users who can query your system?
- How do you route queries to multiple processes containing different datasets and aggregate this data?
A Gateway
process can handle each of these. The gateway is responsible for defining the processes that can be queried within your system and regulates via user-configured logic what is required for a user to establish a connection to the gateway.
PyKX provides a simplistic gateway which allows connections to multiple processes and supports synchronous queries against your processes. Although it doesn't scale to large numbers of high traffic use-cases, it provides a starting infrastructure suitable for small teams of developers.
Have your say
The above usage patterns provide a basic gateway design but does not cover all cases/usage patterns, if there is functionality that you would like to see let us know by opening an issue here.
Create a gateway
In the following sections we will generate a Gateway process to help you to complete the following:
- Limit users permitted to query your APIs to those with a username and password provided in a supplied text file.
- Generate a custom function which queries the APIs generated on your
RTP
andHDB
processes here aggregating the results.
Configure your gateway
Before adding custom gateway APIs and secure login to the process, configure the gateway to operate on port 5015 with established connections against two processes:
'rtp'
: The Real-Time Processor established here on port 5014'hdb'
: The Historical Database established here on port 5012
gateway = kx.tick.GATEWAY(port=5015, connections={'rtp': 'localhost:5014', 'hdb': 'localhost:5012'})
If you need to add additional connections once you initialized the GATEWAY
, use the add_connections
function as shown below:
gateway.add_connections({'rtp': 'localhost:5014'})
API documentation
The following bullet-points provide links to the various functions used within the above section
Add a custom username/password check
Once you have an initialized Gateway process, define a custom username/password check which any user connecting to the gateway will be validated against. In the example below, the validation function checks that a user is named test_user
and has a password matching the regex password.*
def validation_function(username, password):
if username == 'test_user':
pattern = re.compile("password.*")
if bool(pattern.match(password)):
return True
return False
Now that you have specified the validation function, set this function on the Gateway
process. For this to operate, you need to ensure the library re
is available:
gateway.libraries({'re': 're'})
gateway.connection_validation(validation_function)
Users attempting to interact with this gateway will now need to adhere to the above conditions providing the username test_user
and a password password.*
.
API documentation
The following bullet-points provide links to the various functions used within the above section
Define a custom API for users to call
After establishing the gateway and defining a validation function for connecting processes, add a Custom Gateway API.
Within the Gateway process, there is a Python class defined gateway
which contains a function call_port
. This function takes the name given to a port when establishing remote connections here and the parameters required to call this function.
When we developed our custom query APIs here we registered an API symbol_count
on both the rtp
and hdb
processes, the following function definition makes use of the call_port
function to invoke these functions for a specified table and symbol combination.
def gateway_function(table, symbol):
rtp = gateway.call_port('rtp', table, symbol)
try:
hdb = gateway.call_port('hdb', table, symbol)
except BaseException:
print('Failed to retrieve data from HDB')
hdb = 0
return rtp + hdb
gateway.register_api('sum_of_symbols', gateway_function)
Now that your gateway function has been registered, start the gateway:
gateway.start()
Users should now be in a position to query the sum_of_symbols
API on the Gateway process as follows:
with kx.SyncQConnection(port=5015, username='test_user', password='password123') as q:
ret = q('sum_of_symbols', 'trade', 'AAPL')
ret
API documentation
The following bullet-points provide links to the various functions used within the above section
Run all setup at once
To help with restart and to simplify the configuration of your system, you can complete each of the sections above at configuration time for your initialized class. The following code block contains all the code used to configure the gateway:
def validation_function(username, password):
if username == 'test_user':
pattern = re.compile("password.*")
if bool(pattern.match(password)):
return True
return False
def gateway_function(table, symbol):
rtp = gateway.call_port('rtp', table, symbol)
try:
hdb = gateway.call_port('hdb', table, symbol)
except BaseException:
print('Failed to retrieve data from HDB')
hdb = 0
return rtp + hdb
gateway = kx.tick.GATEWAY(
port=5015,
connections={'rtp': 'localhost:5014', 'hdb': 'localhost:5012'},
libraries={'re':'re'},
apis={'sum_of_symbols': gateway_function},
connection_validator=validation_function
)
gateway.start()
The advantage of this approach is that it allows process/workflow restart, for example, in case you lose connection to a downstream process. As all definitions are cached in configuration, you can easily restart them.
gateway.restart()
API documentation
The following bullet-points provide links to the various functions used within the above section
Next steps
For some further reading, here are some related topics you may find interesting: