Publish Data
This page outlines how you can publish new data to your streaming infrastructure.
Disclaimer
The functionality outlined below provides the necessary tools for users to build complex streaming infrastructures. The generation and management of such workflows rest solely with the users. KX supports only individual elements used to create these workflows, not the end-to-end applications.
Publishing data to a PyKX streaming workflow is completed by publishing messages to a tickerplant process using Interprocess Communication (IPC). The sections below show how to achieve this with Python and q in a basic streaming infrastructure. Commonly in KX literature and whitepapers, processes which publish data to a tickerplant are described as Feedhandlers.
Any messages that is published to a tickerplant is a triplet list with the following structure [Function;Table;Data]
, where:
- Function: The name of the function to be called on all downstream subscribers. In this case, it's
.u.upd
. This function takes two arguments: table and data. - Table: The name of the table to be passed as the first argument to the Function above.
- Data: The data which is to be passed as the second argument to the Function above.
Basic examples
The below sections provide examples in Python and q showing the publishing of 10 messages to the trade
table defined in the basic infrastructure here. Data will be randomly generated in each case.
Note
In each example, supply of the timespan
object is optional. If omitted, data will be tagged with arrival time and persisted by the database using this time information.
In a later section of this page, we provide a more complex data feed which you can use to emulate a data feed from Python which can be used in the remaining pages relating to streaming data.
Python
The following Python code allows you to publish 10 messages to the streaming infrastructure created here:
import pykx as kx
import numpy as np
ticker_list = ['AAPL', 'GOOG', 'IBM', 'BRK']
for i in range(1, 10):
with kx.SyncQConnection(port=5010, wait=False) as q:
msg = [kx.TimespanAtom('now'),
np.random.choice(ticker_list),
np.random.random(10) * 10 * i,
np.random.randint(100) * i]
q('.u.upd', 'trade', msg)
In the above code we create a Synchronous Connection against the Tickerplant process on port 5010, sending messages with no expectation of a response denoted through setting wait=False
. We create a message (msg
) containing 4 elements:
- The current time as a
kx.TimespanAtom
type object - Name of the trade symbol (
ticker
) randomly generated from a pre-determined list - The price of the stock randomly generated
- The volume of the stock that was traded.
Finally, this message is sent to the tickerplant alongside the name of the table trade
and the function which is to be called .u.upd
.
q
The following q code allows you to publish 10 messages to the streaming infrastructure created here:
h:hopen 5010
// Function for sending updates to trade table
upd_trades:{neg[x](".u.upd";y;z)}[h;`trade]
// Function for generating a sample message
msg:{
(.z.N;
rand `AAPL`GOOG`IBM`BRK;
x*rand 10.0;
x*rand 100)
}
// Send 10 messages using the values 1-10 to update the price/volume values
(upd_trades msg@)each 1+til 10
In the above code we open a connection to the Tickerplant process on port 5010. Sending 10 messages created using the function msg
and upd_trades
. The message generated contains 4 elements:
- The current time generated using
.z.N
- Name of the trade symbol (
ticker
) randomly generated from a pre-determined list - The price of the stock randomly generated
- The volume of the stock that was traded.
Other languages
It's possible to publish data to PyKX streaming infrastructures using other languages, such as C and Java:
Continuous streaming example
In the below section we generate a script which completes the following:
- Takes a parameter at startup which indicates how many messages should be published per update.
- Generates a random trade message using
kx.random.random
. - Publishes this message to the basic infrastructure tickerplant on port 5010.
- Repeats until a user stops the processing data feed.
You can view this script below or download and run it following the instructions outlined below.
import pykx as kx
import sys
try:
args = sys.argv[1]
except BaseException:
args=''
n = 1 if args=='' else int(args)
print('Starting Data Feed ...')
init = False
def main():
global init
symlist = ['AAPL', 'JPM', 'GOOG', 'BRK', 'WPO', 'IBM']
while True:
trade = [kx.random.random(n, symlist),
10 * kx.random.random(n, 10.0),
10 * kx.random.random(n, 100)
]
with kx.SyncQConnection(port=5010, wait=False, no_ctx=True) as q:
q('.u.upd', 'trade', trade)
if not init:
print('First message(s) sent, data-feed publishing ...')
init=True
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Data feed stopped')
Before you start, ensure you have the basic infrastructure running with default values. To use the above feed.py
script, run it as follows:
-
Publish one message per update
python feed.py
-
Publish ten messages per update
python feed.py 10
Next steps
Now that you have data being published to your system you may be interested in the following:
- Subscribe to real-time updates following the instructions here.
- Query your real-time and historical data using custom APIs here.
- Perform complex analysis on your real-time data following the instructions here.
For some further reading, here are some related topics:
- Learn more about Interprocess Communication (IPC) here.