Streaming tickerplant
Functionality for the generation and management of streaming infrastructures using PyKX. Fully described here, this allows users to ingest, persist and query vast amounts of real-time and historical data in a unified data-format.
STREAMING
STREAMING(
port=5010, *, process_logs=False, libraries=None, apis=None, init_args=None
)
The STREAMING
class acts as a base parent class for the TICK, RTP, HDB and GATEWAY
class objects. Each of these child classes inherit and may modify the logic of this parent.
In all cases the functions libraries
and
register_api
for example have the same definition
and are available to all process types.
Unless provided with a separate definition as is the case for start
in all class types
a user should assume that the logic used for use of register_api
is consistent across
process types.
__call__
__call__(*args)
Execute a synchronous call against the connected process
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Pass supplied arguments to the |
()
|
Returns:
Type | Description |
---|---|
k.K
|
The result of the executed call on the connection object |
>>> import pykx as kx
>>> tick = kx.tick.TICK(port=5030)
>>> tick('1+1').py()
2
start
start(config=None, print_init=True, custom_start='')
Start/initialise processing of messages on the associated sub-process. This allows users to split the process initialisation from processing of data to allow additional configuration/setup to be completed before messages begin to be processed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
dict
|
A dictionary passed to the sub-process which is used by
the function |
None
|
print_init |
bool
|
A boolean indicating if during initialisation we should print a message stating that the process is being initialised successfully. |
True
|
Returns:
Type | Description |
---|---|
None
|
On successful start this functionality will return None, otherwise will raise an error |
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
Initialising Tickerplant process on port: 5030
Tickerplant initialised successfully on port: 5030
>>> tick.start()
Starting Tickerplant data processing on port: 5030
Tickerplant process successfully started on port: 5030
stop
stop()
Stop processing on the sub-process and kill the process. This allows the port on which the process is deployed to be reclaimed and the process to be restarted if appropriate.
Example:
>>> import pykx as kx
>>> tick = kx.tick.TICK(port=5030)
Initialising Tickerplant process on port: 5030
Tickerplant initialised successfully on port: 5030
>>> tick.stop()
Tickerplant process on port 5030 being stopped
Tickerplant successfully shutdown on port 5030
libraries
libraries(libs=None)
Specify and load the Python libraries which should be available on a process, the libraries should be supplied as a dictionary mapping the alias used for calling the library to the library name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
libs |
dict
|
A dictionary mapping the alias by which a Python library will be referred to the name of library |
None
|
-
In the following example we denote that the process should have access to the Python libraries
numpy
andpykx
which when called by a user will be referred to asnp
andkx
respectively>>> import pykx as kx >>> tick = kx.tick.TICK(port=5030) Initialising Tickerplant process on port: 5030 Tickerplant initialised successfully on port: 5030 >>> tick.libraries({'np': 'numpy', 'kx': 'pykx'})
register_api
register_api(api_name, function)
Define a registered API to be callable by name on a process, this API can be a Python function or a PyKX lambda/projection.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
api_name |
str
|
The name by which the provided function will be called on the process |
required |
function |
Callable
|
The function which is to be defined as a callable API on the process, in the case of a Python function this must be a single independent function which is callable using libraries available on the process |
required |
>>> import pykx as kx
>>> def custom_func(num_vals, added_value):
... return added_value + kx.q.til(num_vals)
>>> hdb = kx.tick.HDB(port=5031)
>>> hdb.libraries({'kx': 'pykx'})
>>> hdb.register_api('custom_api', custom_func)
>>> hdb('custom_api', 5, 10)
pykx.LongVector(pykx.q('10 11 12 13 14'))
set_timer
set_timer(timer=1000)
Set a timer on the connected process, this allows users to configure the intervals at which data is published for example.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
timer |
int
|
The interval at which the timer is triggered in milliseconds. |
1000
|
Returns:
Type | Description |
---|---|
None
|
On successful execution this will return None |
TICK
TICK(
port=5010,
*,
process_logs=True,
tables=None,
log_directory=None,
hard_reset=False,
chained=False,
init_args=None
)
Bases: STREAMING
Initialise a tickerplant subprocess establishing a communication connection. This can either be a process which publishes data to subscribing processes only (chained) or a process which logs incoming messages for replay and triggers end-of-day events on subscribing processes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
port |
The port on which the tickerplant process will be established |
required | |
process_logs |
Should the logs of the generated tickerplant process be published to standard-out of the Python process (True), suppressed (False) or published to a supplied file-name |
required | |
tables |
A dictionary mapping the names of tables and their schemas which are used to denote the tables which the tickerplant will process |
required | |
hard_reset |
Reset logfiles for the current date when starting tickerplant |
required | |
log_directory |
The location of the directory to which logfiles will be published |
required | |
chained |
If the tickerplant is 'chained' or not, if chained the process will not log messages or run end-of-day processing |
required | |
init_args |
A list of arguments passed to the initialized q process at startup denoting the command line options to be used for the initialized q process see here for a full breakdown. |
required |
Returns:
Type | Description |
---|---|
On successful initialisation will initialise the tickerplant process and set appropriate configuration |
Examples:
Initialise a tickerplant on port 5030, defining a trade table.
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
Initialising Tickerplant process on port: 5030
Tickerplant initialised successfully on port: 5030
Initialise a chained tickerplant on port 5031 receiving messages from an upstream tickerplant on port 5030. Publish stdout/stderr from the process to a file 'test.log'.
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade}, process_logs='test.log')
Initialising Tickerplant process on port: 5030
Tickerplant initialised successfully on port: 5030
>>> tick.start()
Starting Tickerplant data processing on port: 5030
Tickerplant process successfully started on port: 5030
>>>
>>> tick_chained = kx.tick.TICK(port=5031, chained=True)
Initialising Tickerplant process on port: 5031
Tickerplant initialised successfully on port: 5031
>>> tick_chained.start({'tickerplant': 'localhost:5030'})
Starting Tickerplant data processing on port: 5031
Tickerplant process successfully started on port: 5031
start
start(config=None)
Start/initialise processing of messages on the associated tickerplant sub-process. This allows users to split the process initialisation from processing of data to allow additional configuration/setup to be completed before messages begin to be processed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
dict
|
A dictionary passed to the sub-process which is used by
the function |
None
|
Returns:
Type | Description |
---|---|
None
|
On successful start this functionality will return None, otherwise will raise an error |
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
>>> tick.start()
restart
restart()
Restart and re-initialise the Tickerplant, this will start the processes with all tables defined on the expected port
Example:
Restart a Tickerplant validating that the expected tables are appropriately defined
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
Initialising Tickerplant process on port: 5030
Tickerplant initialised successfully on port: 5030
>>> tick.start()
Starting Tickerplant data processing on port: 5030
Tickerplant process successfully started on port: 5030
>>> tick.restart()
Restarting Tickerplant on port 5030
Tickerplant process on port 5030 being stopped
Tickerplant successfully shutdown on port 5030
Initialising Tickerplant process on port: 5030
Tickerplant initialised successfully on port: 5030
Tickerplant on port 5030 successfully restarted
>>> tick('trade')
pykx.Table(pykx.q('
time sym exchange sz px
-----------------------
'))
set_tables
set_tables(tables)
Define the tables to be used for consuming and serving messages on the tickerplant process.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tables |
dict
|
A dictionary mapping the name of a table to be defined on the process to the table schema |
required |
Returns:
Type | Description |
---|---|
None
|
On the tickerplant persist the table schema as the supplied name |
Set a table 'trade' with a supplied schema on a tickerplant process
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030)
>>> tick.set_tables({'trade': trade})
>>> tick('trade')
pykx.Table(pykx.q('
time sym exchange sz px
-----------------------
'))
set_snap
set_snap(snap_function)
Define a 'snap' function used by KX Dashboards UI to manage the data presented to a Dashboard process when subscribing to data from a Tickerplant process.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
snap_function |
Callable
|
A Python function or callable PyKX Lambda which takes a single argument and returns the expected tabular dataset for display |
required |
Returns:
Type | Description |
---|---|
None
|
On successful execution will set the streaming function |
Implement a ring-buffer to provide the most recent 1,000 datapoints
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030)
>>> def buffer_ring(x):
... if 1000 < len(kx.q['trade']):
... return trade
... else:
... kx.q['trade'][-1000:]
>>> tick.set_
RTP
RTP(
port=5011,
*,
process_logs=True,
libraries=None,
subscriptions=None,
apis=None,
vanilla=True,
pre_processor=None,
post_processor=None,
init_args=None
)
Bases: STREAMING
Initialise a Real-Time Processor (RTP), establishing a communication connection to this process. An RTP at it's most fundamental level comprises the following actions and is known as a 'vanilla' RTP:
- Receives messages from an upstream tickerplant process via subscription.
- Inserts data into an in-memory table which will be written to disk at a defined time interval.
- Triggers end-of-day processing which writes the data to disk and telling connected historical databases to reload if needed.
In a more complex case an RTP will run analytics on data prior to and post data insert as noted in step 2 above. These analytics can either be Python or q/PyKX functions. Additionally users can define 'apis' on the server which can be called explicitly by users.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
port |
The port on which the RTP process will be established |
required | |
process_logs |
Should the logs of the generated RTP process be published to standard-out of the Python process (True), suppressed (False) or published to a supplied file-name |
required | |
libraries |
A dictionary mapping the alias by which a Python library will be referred to the name of library |
required | |
subscriptions |
A list of tables (str) from which to receive updates, if None the RTP will receive updates from all tables |
required | |
apis |
A dictionary mapping the names to be used by users when calling a defined API to the callable Python functions or PyKX lambdas/projections which will be called. |
required | |
vanilla |
In the case that the RTP is defined as 'vanilla' data received from a downstream tickerplant will be inserted into an in-memory table. If vanilla is False then a 'pre_processor' and 'post_processor' function can be defined using the below parameters to modify data prior to and post insert. |
required | |
pre_processor |
A function taking the name of a table and message as parameters,
this function should/can modify the message prior to insertion into an
in-memory table. If this function returns |
required | |
post_processor |
A function taking the name of a table and message as parameters, this function can publish data to other processes, update global variables etc. In most examples post_processor functions are used to publish data to a tickerplant or persist derived analytics for use by other users. |
required | |
init_args |
A list of arguments passed to the initialized q process at startup denoting the command line options to be used for the initialized q process see here for a full breakdown. |
required |
Returns:
Type | Description |
---|---|
On successful initialisation will initialise the RTP process and set appropriate configuration |
Examples:
Initialise a vanilla Real-Time Processor on port 5032 subscribing to all messages from a tickerplant on port 5030.
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade}, process_logs='test.log')
Initialising Tickerplant process on port: 5030
Tickerplant initialised successfully on port: 5030
>>> tick.start()
Starting Tickerplant data processing on port: 5030
Tickerplant process successfully started on port: 5030
>>>
>>> rdb = kx.tick.RTP(port=5032)
Initialising Real-time processor on port: 5032
Real-time processor initialised successfully on port: 5032
>>> rdb.start({'tickerplant': 'localhost:5030'})
Starting Real-time processing on port: 5032
Real-time processing successfully started on port: 5032
Initialise a vanilla Real-Time Processor on port 5032 logging process logs to 'test.log',
subscribing to a table trade
only and defining a query API named custom_query
.
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
>>> tick.start()
>>>
>>> def query_api(table):
... return kx.q.qsql.select(table)
>>> rdb = kx.tick.RTP(
... port=5032,
... process_logs='test.log',
... libraries = {'kx': 'pykx'},
... api={'custom_query': query_api}
... )
Initialising Real-time processor on port: 5032
Registering callable function 'custom_query' on port 5032
Successfully registed callable function 'custom_query' on port 5032
Real-time processor initialised successfully on port: 5032
>>> rdb.start({'tickerplant': 'localhost:5030'})
Starting Real-time processing on port: 5032
Real-time processing successfully started on port: 5032
Initialise a complex Real-Time Processor which includes data pre-processing prior to insertion of data into the Real-Time Database and which contains a post-processing step to derive analytics after data has been inserted into the in-memory table.
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
>>> tick.start()
>>> def pre_process(table, message):
... if table in ['trade', 'quote']:
... return message
... else:
... return None
>>> def post_process(table, message):
... tradeagg = kx.q.qsql.select('trade',
... columns={'trdvol': 'sum px*sz',
... 'maxpx': 'max px',
... 'minpx': 'min px'},
... by='sym')
... quoteagg = kx.q.qsql.select('quote',
... columns={'maxbpx': 'max bid',
... 'minapx': 'min ask',
... 'baspread': 'max[bid]-min[ask]'},
... by='sym')
... tab = tradeagg.merge(quoteagg, how='left', q_join=True).reset_index()
... tab['time'] = kx.TimespanAtom('now')
... kx.q['aggregate'] = kx.q.xcols(['time', 'sym'], tab)
... return None
>>> rte = kx.tick.RTP(port=5031,
... libraries = {'kx': 'pykx'},
... subscriptions = ['trade', 'quote'],
... pre_processor = q_preproc,
... post_processor = rte_post_analytic,
... vanilla=False)
>>> rte.start({'tickerplant': 'localhost:5030'})
start
start(config=None)
Start/initialise processing of messages on the Real-Time Processor. This splits the process initialisation from processing of data to allow additional configuration/setup to be completed before messages begin to be processed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
dict
|
A dictionary passed to the sub-process which is used by
the function
|
None
|
Returns:
Type | Description |
---|---|
None
|
On successful start this functionality will return None, otherwise will raise an error |
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
>>> tick.start()
>>> rdb = kx.tick.RTP(port=5032,
... subscriptions = ['trade', 'quote']
... )
>>> rdb.start({
... 'tickerplant': 'localhost:5030',
... 'hdb': 'localhost:5031',
... 'database': 'db'})
restart
restart()
Restart and re-initialise the Real-Time Processor, this will start the processes with all subscriptions, processing functions etc as defined in the initial configuration of the processes.
Example:
Restart an RTP process validating that defined API's in the restarted process are appropriately defined
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
Initialising Tickerplant process on port: 5030
Tickerplant initialised successfully on port: 5030
>>> tick.start()
Starting Tickerplant data processing on port: 5030
Tickerplant process successfully started on port: 5030
>>>
>>> def query_api(table):
... return kx.q.qsql.select(table)
>>> rdb = kx.tick.RTP(
... port=5032,
... process_logs='test.log',
... libraries = {'kx': 'pykx'},
... api={'custom_query': query_api}
... )
Initialising Real-time processor on port: 5032
Registering callable function 'custom_query' on port 5032
Successfully registed callable function 'custom_query' on port 5032
Real-time processor initialised successfully on port: 5032
>>> rdb.start({'tickerplant': 'localhost:5030'})
Starting Real-time processing on port: 5032
Real-time processing successfully started on port: 5032
>>> rdb.restart()
Restarting Real-time processor on port 5032
Real-time processor process on port 5032 being stopped
Real-time processor successfully shutdown on port 5032
Initialising Real-time processor on port: 5032
Registering callable function 'custom_query' on port 5032
Successfully registed callable function 'custom_query' on port 5032
Real-time processor initialised successfully on port: 5032
Starting Real-time processing on port: 5032
Real-time processing successfully started on port: 5032
Real-time processor on port 5032 successfully restarted
>>> rdb('tab:([]5?1f;5?1f)')
>>> rdb('custom_query', 'tab')
pykx.Table(pykx.q('
x x1
-------------------
0.3017723 0.3927524
0.785033 0.5170911
0.5347096 0.5159796
0.7111716 0.4066642
0.411597 0.1780839
'))
pre_processor
pre_processor(function)
Define a pre-processing function on the RTP process which is called prior to inserting data into the Real-Time Database.
This function must take two parameters:
- table: The name of the table to which data will be inserted
- message: The data which is to be inserted into the table
If this function returns None
or kx.q('::')
then data processing
will not continue for that message and it will not be inserted into
the database.
The pre-processing function should return
Parameters:
Name | Type | Description | Default |
---|---|---|---|
function |
Callable
|
A callable function or PyKX Lambda taking 2 arguments
the name of the table as a |
required |
Returns:
Type | Description |
---|---|
None
|
On successful execution of this method the data pre-processing function defined on the RTP server will be updated |
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
>>> tick.start()
>>> def preprocess(table, message):
... if table in ['trade', 'quote']:
... return message
... else:
... return None
>>> rte = kx.tick.RTP(port=5034,
... libraries = {'kx': 'pykx'},
... subscriptions = ['trade', 'quote'],
... vanilla=False)
>>> rte.pre_processor(preprocess)
post_processor
post_processor(function)
Define a post-processing function on the RTP process which is called after inserting data into the Real-Time Database.
This function must take two parameters:
- table: The name of the table to which data will be inserted
- message: The data which is to be inserted into the table
This function can have side-effects and does not expect a return
Parameters:
Name | Type | Description | Default |
---|---|---|---|
function |
Callable
|
A callable function or PyKX Lambda taking 2 arguments
the name of the table as a |
required |
Returns:
Type | Description |
---|---|
None
|
On successful execution of this method the data pre-processing function defined on the RTP server will be updated |
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
>>> tick.start()
>>> def postprocess(table, message):
... tradeagg = kx.q.qsql.select('trade',
... columns={
... 'trdvol': 'sum px*sz',
... 'maxpx': 'max px',
... 'minpx': 'min px'},
... by='sym')
... quoteagg = kx.q.qsql.select('quote',
... columns={
... 'maxbpx': 'max bid',
... 'minapx': 'min ask',
... 'baspread': 'max[bid]-min[ask]'},
... by='sym')
... kx.q['aggregate'] = kx.q.xcols(['time', 'sym'], tab)
... return None
>>> rte = kx.tick.RTP(port=5034,
... libraries = {'kx': 'pykx'},
... subscriptions = ['trade', 'quote'],
... vanilla=False)
>>> rte.post_processor(postprocess)
HDB
HDB(port=5012, *, process_logs=True, libraries=None, apis=None, init_args=None)
Bases: STREAMING
Initialise a Historical Database (HDB) subprocess establishing a communication connection. This process may contain a loaded database and APIs used for analytic transformations on historical data
Parameters:
Name | Type | Description | Default |
---|---|---|---|
port |
The port on which the tickerplant process will be established |
required | |
process_logs |
Should the logs of the generated tickerplant process be published to standard-out of the Python process (True), suppressed (False) or published to a supplied file-name |
required | |
libraries |
A dictionary mapping the alias by which a Python library will be referred to the name of library |
required | |
apis |
A dictionary mapping the names to be used by users when calling a defined API to the callable Python functions or PyKX lambdas/projections which will be called. |
required | |
init_args |
A list of arguments passed to the initialized q process at startup denoting the command line options to be used for the initialized q process see here for a full breakdown. |
required |
Returns:
Type | Description |
---|---|
On successful initialisation will initialise the HDB process and set appropriate configuration |
Examples:
Initialise a HDB on port 5035
>>> import pykx as kx
>>> hdb = kx.tick.HDB(port=5035)
Initialising HDB process on port: 5035
HDB initialised successfully on port: 5035
Initialise a HDB on port 5035, defining a custom api on the process
and stating that the library pykx
must be available.
>>> import pykx as kx
>>> def custom_api(values):
... return kx.q(values)
>>> hdb = kx.tick.HDB(
... port=5035,
... libraries={'kx': 'pykx'},
... apis={'hdb_query': custom_api}
... )
Initialising HDB process on port: 5035
Registering callable function 'hdb_query' on port 5035
Successfully registed callable function 'hdb_query' on port 5035
HDB initialised successfully on port: 5035
>>> hdb('hdb_query', '1+1')
pykx.LongAtom(pykx.q('2'))
start
start(database=None, config=None)
Start the Historical Database (HDB) process for analytic/query availability. This command allows for the loading of the Database to be used by the process.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
database |
str
|
The path to the database which is to be loaded on the process. |
None
|
config |
dict
|
A dictionary passed to the sub-process which can be used by
the function |
None
|
Returns:
Type | Description |
---|---|
None
|
On successful start this functionality will return None and load the specified database, otherwise will raise an error. |
>>> import pykx as kx
>>> hdb = kx.tick.HDB(port=5031)
>>> hdb.start(database='/tmp/db')
restart
restart()
Restart and re-initialise the HDB Process, this will start the processes with validation and api functions etc as defined in the initial configuration of the processes.
Example:
Restart a HDB process validating that defined API's in the restarted process are appropriately defined
>>> import pykx as kx
>>> def hdb_api(value):
... return kx.q(value)
>>> hdb = kx.tick.HDB(
... port=5035,
... libraries={'kx': 'pykx'},
... apis={'custom_api': gateway_api})
Initialising HDB process on port: 5035
Registering callable function 'custom_api' on port 5035
Successfully registed callable function 'custom_api' on port 5035
HDB process initialised successfully on port: 5035
>>> hdb('custom_api', '1+1')
pykx.LongAtom(pykx.q('2'))
>>> hdb.restart()
Restarting HDB on port 5035
HDB process on port 5035 being stopped
HDB successfully shutdown on port 5035
Initialising HDB process on port: 5035
Registering callable function 'custom_api' on port 5035
Successfully registed callable function 'custom_api' on port 5035
HDB process initialised successfully on port: 5035
HDB process on port 5035 successfully restarted
>>> hdb('custom_api', '1+1')
pykx.LongAtom(pykx.q('2'))
GATEWAY
GATEWAY(
port=5010,
*,
process_logs=False,
libraries=None,
apis=None,
connections=None,
connection_validator=None,
init_args=None
)
Bases: STREAMING
Initialise a Gateway subprocess establishing a communication connection. A gateway provides a central location for external users to query named API's within a streaming infrastructure which retrieves data from multiple processes within the infrastructure.
A gateway within this implementation provides helper functions for the application of basic user validation and functionality to allow custom API's to call named process connections.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
port |
The port on which the tickerplant process will be established |
required | |
process_logs |
Should the logs of the generated tickerplant process be published to standard-out of the Python process (True), suppressed (False) or published to a supplied file-name |
required | |
libraries |
A dictionary mapping the alias by which a Python library will be referred to the name of library |
required | |
apis |
A dictionary mapping the names to be used by users when calling a defined API to the callable Python functions or PyKX lambdas/projections which will be called. |
required | |
connections |
A dictionary passed to the sub-process which is used by
maps a key denoting the 'name' to be assigned
to a process with the connection string as follows.
|
required | |
connection_validator |
A function taking username and password which returns
|
required | |
init_args |
A list of arguments passed to the initialized q process at startup denoting the command line options to be used for the initialized q process see here for a full breakdown. |
required |
Returns:
Type | Description |
---|---|
On successful initialisation will initialise the Gateway process and set appropriate configuration |
Examples:
Initialise a Gateway defining a callable API against a HDB and RDB process. This will allow free-form function calls on both processes.
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
>>> tick.start()
>>> hdb = kx.tick.HDB(port=5031)
>>> hdb.start(database='/tmp/db')
>>> rdb = kx.tick.RTP(port=5032)
>>> rdb.start({'tickerplant': 'localhost:5030'})
>>> def gateway_func(x):
... # The 'module' gateway is a populated class
... # on the PyKX Gateway processes
... rdb_data = gateway.call_port('rdb', b'{x+1}', x)
... hdb_data = gateway.call_port('hdb', b'{x+2}', x)
... return([rdb_data, hdb_data])
>>> gw = kx.tick.GATEWAY(
... port=5033,
... connections={'rdb': 'localhost:5032', 'hdb: 'localhost:5031'},
... apis={'custom_api': gateway_func}
... )
>>> gw.start()
>>> with kx.SyncQConnection(port=5033) as q:
... print(q('custom_api', 2))
start
start(config=None)
Start the gateway processes connections to external processes. This supplied configuration will be used to create 'named' inter-process connections with remote processes which can be called by users in their gateway functions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
dict
|
UNUSED |
None
|
Returns:
Type | Description |
---|---|
None
|
On successful start this functionality will return None, otherwise will raise an error |
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
>>> def gateway_api(value):
... gw.call('tp', b'{x+1}', value)
>>> gw = kx.tick.GATEWAY(
... port=5031,
... connections={'tp': 'localhost:5030'},
... apis={'custom_api': gateway_api})
>>> gw.start()
add_connection
add_connection(connections=None)
Add additional callable named connections to a gateway process
this functionality is additive to the connections (if established)
when configuring a GATEWAY
process. If the same name is used for
two connections the last added connection will be used in function
execution.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
connections |
dict
|
A dictionary which maps a key denoting the 'name' to
be assigned to a process with the connection string containing the
host/port information as follows:
|
None
|
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
>>> def gateway_api(value):
... gw.call('tp', b'{x+1}', value)
>>> gw = kx.tick.GATEWAY(
... port=5031,
... apis={'custom_api': gateway_api})
>>> gw.add_connection({'tp': 'localhost:5030'})
restart
restart()
Restart and re-initialise the Gateway Process, this will start the processes with validation and api functions etc as defined in the initial configuration of the processes.
Example:
Restart a Gateway process validating that defined API's in the restarted process are appropriately defined
>>> import pykx as kx
>>> def gateway_api(value):
... return kx.q(value)
>>> gateway = kx.tick.GATEWAY(
... port=5035,
... libraries={'kx': 'pykx'},
... apis={'custom_api': gateway_api})
Initialising Gateway process on port: 5035
Registering callable function 'custom_function' on port 5035
Successfully registed callable function 'custom_function' on port 5035
Gateway process initialised successfully on port: 5035
>>> gateway.start()
>>> gateway('gateway_api', '1+1')
pykx.LongAtom(pykx.q('2'))
>>> gateway.restart()
Restarting Gateway on port 5035
Gateway process on port 5035 being stopped
Gateway successfully shutdown on port 5035
Initialising Gateway process on port: 5035
Registering callable function 'custom_function' on port 5035
Successfully registed callable function 'custom_function' on port 5035
Gateway process initialised successfully on port: 5035
Gateway process on port 5035 successfully restarted
>>> gateway('gateway_api', '1+1')
pykx.LongAtom(pykx.q('2'))
connection_validation
connection_validation(function)
Define a function to be used on the Gateway process which validates users connecting to the process. This function should take two inputs, username and password and validate a user connecting is allowed to do so.
This function should return True
if a user is permitted to establish
a connection and False
if they are not.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
function |
Callable
|
A function taking two parameters (username and password) which validates that a user connecting to the process is permitted or not to establish a callable connection. |
required |
Define a function on the gateway process to only accept users with the name 'new_user'.
>>> import pykx as kx
>>> def custom_validation(username, password):
... if username != 'new_user':
... return False
... else:
... return True
>>> gateway = kx.tick.GATEWAY(port=5034, connection_validator=custom_validation)
>>> with kx.SyncQConnection(port=5034, username='user') as q:
... q('1+1')
QError: access
>>> with kx.SyncQConnection(port=5034, username='new_user') as q:
... q('1+1')
pykx.LongAtom(pykx.q('2'))
BASIC
BASIC(
tables,
*,
log_directory=".",
hard_reset=False,
database=None,
ports=_default_ports
)
Initialise a configuration for a basic PyKX streaming workflow.
This configuration will be used to (by default) start the following processes:
- A Tickerplant process on port 5010 to which messages can be published for logging and consumption by down-stream subscribers.
- A Real-Time Database process (RDB) on port 5011 which subscribes to the tickerplant and maintains an in-memory representation of all the data consumed that day.
- If a database is denoted at initialisation initialise a Historical Database (HDB) process which loads the database and makes available historical data to a user.
With this basic infrastructure users can then add functionality to increase overall complexity of their system.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tables |
A dictionary mapping the names of tables and their schemas which are used to denote the tables which the tickerplant will process |
required | |
log_directory |
The location of the directory to which logfiles will be published |
required | |
database |
The path to the database which is to be loaded on the HDB process and the working directory of the RDB process |
required | |
hard_reset |
Reset logfiles for the current date when starting tickerplant |
required | |
ports |
A dictionary mapping the process type to the IPC communication port on which it should be made available. Dictionary "Values" must be supplied as integers denoting the desired port while "Keys" should be a str object of value "tickerplant", "rdb" and "hdb". |
required |
Returns:
Type | Description |
---|---|
On successful initialisation will initialise the Tickerplant, RDB and HDB processes, setting appropriate configuration |
Examples:
Configure a Tickerplant and RDB process using default parameters
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> basic = kx.tick.BASIC(tables={'trade': trade})
Configure a Tickerplant, RDB and HDB process architecture loading a database
at the location '/tmp/db'
and persisting the tickerplant logs to
the folder logs
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> basic = kx.tick.BASIC(
... tables={'trade': trade},
... database='/tmp/db',
... log_directory='logs')
Configure a Tickerplant, RDB and HDB process setting these processes on the ports 5030, 5031 and 5032 respectively
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> basic = kx.tick.BASIC(
... tables={'trade': trade},
... ports={'tickerplant': 5030, 'rdb': 5031, 'hdb': 5032}
start
start()
Start a basic streaming architecture configured using kx.tick.BASIC
With this basic infrastructure users can then add functionality to increase overall complexity of their system.
Returns:
Type | Description |
---|---|
None
|
On successful initialisation will start the Tickerplant, RDB and HDB processes, setting appropriate configuration |
Examples:
Configure and start a Tickerplant and RDB process using default parameters
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> basic = kx.tick.BASIC(tables={'trade': trade})
>>> basic.start()
Configure and start a Tickerplant, RDB and HDB process architecture loading a database
at the location '/tmp/db'
and persisting the tickerplant logs to
the folder logs
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> basic = kx.tick.BASIC(
... tables={'trade': trade},
... database='/tmp/db',
... log_directory='logs')
>>> basic.start()
Configure and start a Tickerplant, RDB and HDB process setting these processes on the ports 5030, 5031 and 5032 respectively
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> basic = kx.tick.BASIC(
... tables={'trade': trade},
... ports={'tickerplant': 5030, 'rdb': 5031, 'hdb': 5032}
stop
stop()
Stop processing and kill all processes within the streaming workflow. This allows the port on which the process is deployed to be reclaimed and the process to be restarted if appropriate.
Example:
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> basic = kx.tick.BASIC(
... tables={'trade': trade},
... database='/tmp/db',
... log_directory='logs')
>>> basic.start()
>>> basic.stop()
restart
restart()
Restart and re-initialise a Basic streaming infrastructure, this will start the processes with the configuration initially supplied.
Example:
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> basic = kx.tick.BASIC(
... tables={'trade': trade},
... database='/tmp/db',
... log_directory='logs')
>>> basic.start()
>>> basic.restart()