Streaming tickerplant
Functionality for the generation and management of streaming infrastructures using PyKX. Fully described here, this allows users to ingest, persist and query vast amounts of real-time and historical data in a unified data-format.
STREAMING
STREAMING(
port=5010, *, process_logs=False, libraries=None, apis=None, init_args=None
)
The STREAMING
class acts as a base parent class for the TICK, RTP, HDB and GATEWAY
class objects. Each of these child classes inherit and may modify the logic of this parent.
In all cases the functions libraries
and
register_api
for example have the same definition
and are available to all process types.
Unless provided with a separate definition as is the case for start
in all class types
a user should assume that the logic used for use of register_api
is consistent across
process types.
__call__
__call__(*args)
Execute a synchronous call against the connected process
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Pass supplied arguments to the |
()
|
Returns:
Type | Description |
---|---|
k.K
|
The result of the executed call on the connection object |
>>> import pykx as kx
>>> tick = kx.tick.TICK(port=5030)
>>> tick('1+1').py()
2
start
start(config=None, print_init=True, custom_start='')
Start/initialise processing of messages on the associated sub-process. This allows users to split the process initialisation from processing of data to allow additional configuration/setup to be completed before messages begin to be processed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
dict
|
A dictionary passed to the sub-process which is used by
the function |
None
|
print_init |
bool
|
A boolean indicating if during initialisation we should print a message stating that the process is being initialised successfully. |
True
|
Returns:
Type | Description |
---|---|
None
|
On successful start this functionality will return None, otherwise will raise an error |
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
Initialising Tickerplant process on port: 5030
Tickerplant initialised successfully on port: 5030
>>> tick.start()
Starting Tickerplant data processing on port: 5030
Tickerplant process successfully started on port: 5030
stop
stop()
Stop processing on the sub-process and kill the process. This allows the port on which the process is deployed to be reclaimed and the process to be restarted if appropriate.
Example:
>>> import pykx as kx
>>> tick = kx.tick.TICK(port=5030)
Initialising Tickerplant process on port: 5030
Tickerplant initialised successfully on port: 5030
>>> tick.stop()
Tickerplant process on port 5030 being stopped
Tickerplant successfully shutdown on port 5030
libraries
libraries(libs=None)
Specify and load the Python libraries which should be available on a process, the libraries should be supplied as a dictionary mapping the alias used for calling the library to the library name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
libs |
dict
|
A dictionary mapping the alias by which a Python library will be referred to the name of library |
None
|
-
In the following example we denote that the process should have access to the Python libraries
numpy
andpykx
which when called by a user will be referred to asnp
andkx
respectively>>> import pykx as kx >>> tick = kx.tick.TICK(port=5030) Initialising Tickerplant process on port: 5030 Tickerplant initialised successfully on port: 5030 >>> tick.libraries({'np': 'numpy', 'kx': 'pykx'})
register_api
register_api(api_name, function)
Define a registered API to be callable by name on a process, this API can be a Python function or a PyKX lambda/projection.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
api_name |
str
|
The name by which the provided function will be called on the process |
required |
function |
Callable
|
The function which is to be defined as a callable API on the process, in the case of a Python function this must be a single independent function which is callable using libraries available on the process |
required |
>>> import pykx as kx
>>> def custom_func(num_vals, added_value):
... return added_value + kx.q.til(num_vals)
>>> hdb = kx.tick.HDB(port=5031)
>>> hdb.libraries({'kx': 'pykx'})
>>> hdb.register_api('custom_api', custom_func)
>>> hdb('custom_api', 5, 10)
pykx.LongVector(pykx.q('10 11 12 13 14'))
set_timer
set_timer(timer=1000)
Set a timer on the connected process, this allows users to configure the intervals at which data is published for example.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
timer |
int
|
The interval at which the timer is triggered in milliseconds. |
1000
|
Returns:
Type | Description |
---|---|
None
|
On successful execution this will return None |
set_tables
set_tables(tables, tick=False)
Define the tables to be available to the process being initialized.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tables |
dict
|
A dictionary mapping the name of a table to be defined on the process to the table schema |
required |
tick |
bool
|
Is the process you are setting the table on a tickerplant? |
False
|
Returns:
Type | Description |
---|---|
None
|
On a process persist the table schema as the supplied name |
Set a table 'trade' with a supplied schema on a tickerplant process
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030)
>>> tick.set_tables({'trade': trade})
>>> tick('trade')
pykx.Table(pykx.q('
time sym exchange sz px
-----------------------
'))
TICK
TICK(
port=5010,
*,
process_logs=True,
tables=None,
log_directory=None,
hard_reset=False,
chained=False,
init_args=None
)
Bases: STREAMING
start
start(config=None)
Start/initialise processing of messages on the associated tickerplant sub-process. This allows users to split the process initialisation from processing of data to allow additional configuration/setup to be completed before messages begin to be processed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
dict
|
A dictionary passed to the sub-process which is used by
the function |
None
|
Returns:
Type | Description |
---|---|
None
|
On successful start this functionality will return None, otherwise will raise an error |
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
>>> tick.start()
restart
restart()
Restart and re-initialise the Tickerplant, this will start the processes with all tables defined on the expected port
Example:
Restart a Tickerplant validating that the expected tables are appropriately defined
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
Initialising Tickerplant process on port: 5030
Tickerplant initialised successfully on port: 5030
>>> tick.start()
Starting Tickerplant data processing on port: 5030
Tickerplant process successfully started on port: 5030
>>> tick.restart()
Restarting Tickerplant on port 5030
Tickerplant process on port 5030 being stopped
Tickerplant successfully shutdown on port 5030
Initialising Tickerplant process on port: 5030
Tickerplant initialised successfully on port: 5030
Tickerplant on port 5030 successfully restarted
>>> tick('trade')
pykx.Table(pykx.q('
time sym exchange sz px
-----------------------
'))
set_tables
set_tables(tables)
Define the tables to be used for consuming and serving messages on the tickerplant process.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tables |
dict
|
A dictionary mapping the name of a table to be defined on the process to the table schema |
required |
Returns:
Type | Description |
---|---|
None
|
On the tickerplant persist the table schema as the supplied name |
Set a table 'trade' with a supplied schema on a tickerplant process
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030)
>>> tick.set_tables({'trade': trade})
>>> tick('trade')
pykx.Table(pykx.q('
time sym exchange sz px
-----------------------
'))
set_snap
set_snap(snap_function)
Define a 'snap' function used by KX Dashboards UI to manage the data presented to a Dashboard process when subscribing to data from a Tickerplant process.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
snap_function |
Callable
|
A Python function or callable PyKX Lambda which takes a single argument and returns the expected tabular dataset for display |
required |
Returns:
Type | Description |
---|---|
None
|
On successful execution will set the streaming function |
Implement a ring-buffer to provide the most recent 1,000 datapoints
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030)
>>> def buffer_ring(x):
... if 1000 < len(kx.q['trade']):
... return trade
... else:
... kx.q['trade'][-1000:]
>>> tick.set_
RTP
RTP(
port=5011,
*,
process_logs=True,
libraries=None,
subscriptions=None,
apis=None,
vanilla=True,
pre_processor=None,
post_processor=None,
init_args=None,
tables=None
)
Bases: STREAMING
start
start(config=None)
Start/initialise processing of messages on the Real-Time Processor. This splits the process initialisation from processing of data to allow additional configuration/setup to be completed before messages begin to be processed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
dict
|
A dictionary passed to the sub-process which is used by
the function
|
None
|
Returns:
Type | Description |
---|---|
None
|
On successful start this functionality will return None, otherwise will raise an error |
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
>>> tick.start()
>>> rdb = kx.tick.RTP(port=5032,
... subscriptions = ['trade', 'quote']
... )
>>> rdb.start({
... 'tickerplant': 'localhost:5030',
... 'hdb': 'localhost:5031',
... 'database': 'db'})
restart
restart()
Restart and re-initialise the Real-Time Processor, this will start the processes with all subscriptions, processing functions etc as defined in the initial configuration of the processes.
Example:
Restart an RTP process validating that defined API's in the restarted process are appropriately defined
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
Initialising Tickerplant process on port: 5030
Tickerplant initialised successfully on port: 5030
>>> tick.start()
Starting Tickerplant data processing on port: 5030
Tickerplant process successfully started on port: 5030
>>>
>>> def query_api(table):
... return kx.q.qsql.select(table)
>>> rdb = kx.tick.RTP(
... port=5032,
... process_logs='test.log',
... libraries = {'kx': 'pykx'},
... api={'custom_query': query_api}
... )
Initialising Real-time processor on port: 5032
Registering callable function 'custom_query' on port 5032
Successfully registed callable function 'custom_query' on port 5032
Real-time processor initialised successfully on port: 5032
>>> rdb.start({'tickerplant': 'localhost:5030'})
Starting Real-time processing on port: 5032
Real-time processing successfully started on port: 5032
>>> rdb.restart()
Restarting Real-time processor on port 5032
Real-time processor process on port 5032 being stopped
Real-time processor successfully shutdown on port 5032
Initialising Real-time processor on port: 5032
Registering callable function 'custom_query' on port 5032
Successfully registed callable function 'custom_query' on port 5032
Real-time processor initialised successfully on port: 5032
Starting Real-time processing on port: 5032
Real-time processing successfully started on port: 5032
Real-time processor on port 5032 successfully restarted
>>> rdb('tab:([]5?1f;5?1f)')
>>> rdb('custom_query', 'tab')
pykx.Table(pykx.q('
x x1
-------------------
0.3017723 0.3927524
0.785033 0.5170911
0.5347096 0.5159796
0.7111716 0.4066642
0.411597 0.1780839
'))
pre_processor
pre_processor(function)
Define a pre-processing function on the RTP process which is called prior to inserting data into the Real-Time Database.
This function must take two parameters:
- table: The name of the table to which data will be inserted
- message: The data which is to be inserted into the table
If this function returns None
or kx.q('::')
then data processing
will not continue for that message and it will not be inserted into
the database.
The pre-processing function should return
Parameters:
Name | Type | Description | Default |
---|---|---|---|
function |
Callable
|
A callable function or PyKX Lambda taking 2 arguments
the name of the table as a |
required |
Returns:
Type | Description |
---|---|
None
|
On successful execution of this method the data pre-processing function defined on the RTP server will be updated |
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
>>> tick.start()
>>> def preprocess(table, message):
... if table in ['trade', 'quote']:
... return message
... else:
... return None
>>> rte = kx.tick.RTP(port=5034,
... libraries = {'kx': 'pykx'},
... subscriptions = ['trade', 'quote'],
... vanilla=False)
>>> rte.pre_processor(preprocess)
post_processor
post_processor(function)
Define a post-processing function on the RTP process which is called after inserting data into the Real-Time Database.
This function must take two parameters:
- table: The name of the table to which data will be inserted
- message: The data which is to be inserted into the table
This function can have side-effects and does not expect a return
Parameters:
Name | Type | Description | Default |
---|---|---|---|
function |
Callable
|
A callable function or PyKX Lambda taking 2 arguments
the name of the table as a |
required |
Returns:
Type | Description |
---|---|
None
|
On successful execution of this method the data pre-processing function defined on the RTP server will be updated |
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
>>> tick.start()
>>> def postprocess(table, message):
... tradeagg = kx.q.qsql.select('trade',
... columns={
... 'trdvol': 'sum px*sz',
... 'maxpx': 'max px',
... 'minpx': 'min px'},
... by='sym')
... quoteagg = kx.q.qsql.select('quote',
... columns={
... 'maxbpx': 'max bid',
... 'minapx': 'min ask',
... 'baspread': 'max[bid]-min[ask]'},
... by='sym')
... kx.q['aggregate'] = kx.q.xcols(['time', 'sym'], tab)
... return None
>>> rte = kx.tick.RTP(port=5034,
... libraries = {'kx': 'pykx'},
... subscriptions = ['trade', 'quote'],
... vanilla=False)
>>> rte.post_processor(postprocess)
set_tables
set_tables(tables)
Define tables to be available on the RTP processes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tables |
dict
|
A dictionary mapping the name of a table to be defined on the process to the table schema |
required |
Returns:
Type | Description |
---|---|
None
|
On the RTP persist the table schemas as the supplied name |
Set a table 'trade' with a supplied schema on a tickerplant process
>>> import pykx as kx
>>> prices = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> rte = kx.tick.RTP(port=5034,
... subscriptions = ['trade', 'quote'],
... vanilla=False)
>>> rte.set_tables({'prices': prices})
>>> rte('prices')
pykx.Table(pykx.q('
time sym exchange sz px
-----------------------
'))
HDB
HDB(
port=5012,
*,
process_logs=True,
libraries=None,
apis=None,
init_args=None,
tables=None
)
Bases: STREAMING
start
start(database=None, config=None)
Start the Historical Database (HDB) process for analytic/query availability. This command allows for the loading of the Database to be used by the process.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
database |
str
|
The path to the database which is to be loaded on the process. |
None
|
config |
dict
|
A dictionary passed to the sub-process which can be used by
the function |
None
|
Returns:
Type | Description |
---|---|
None
|
On successful start this functionality will return None and load the specified database, otherwise will raise an error. |
>>> import pykx as kx
>>> hdb = kx.tick.HDB(port=5031)
>>> hdb.start(database='/tmp/db')
restart
restart()
Restart and re-initialise the HDB Process, this will start the processes with validation and api functions etc as defined in the initial configuration of the processes.
Example:
Restart a HDB process validating that defined API's in the restarted process are appropriately defined
>>> import pykx as kx
>>> def hdb_api(value):
... return kx.q(value)
>>> hdb = kx.tick.HDB(
... port=5035,
... libraries={'kx': 'pykx'},
... apis={'custom_api': gateway_api})
Initialising HDB process on port: 5035
Registering callable function 'custom_api' on port 5035
Successfully registed callable function 'custom_api' on port 5035
HDB process initialised successfully on port: 5035
>>> hdb('custom_api', '1+1')
pykx.LongAtom(pykx.q('2'))
>>> hdb.restart()
Restarting HDB on port 5035
HDB process on port 5035 being stopped
HDB successfully shutdown on port 5035
Initialising HDB process on port: 5035
Registering callable function 'custom_api' on port 5035
Successfully registed callable function 'custom_api' on port 5035
HDB process initialised successfully on port: 5035
HDB process on port 5035 successfully restarted
>>> hdb('custom_api', '1+1')
pykx.LongAtom(pykx.q('2'))
set_tables
set_tables(tables)
Define tables to be available on the HDB processes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tables |
dict
|
A dictionary mapping the name of a table to be defined on the process to the table schema |
required |
Returns:
Type | Description |
---|---|
None
|
On the HDB persist the table schemas as the supplied name |
Set a table 'prices' with a supplied schema on a HDB process
>>> import pykx as kx
>>> prices = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> hdb = kx.tick.HDB(port=5035)
Initialising HDB process on port: 5035
HDB process initialised successfully on port: 5035
>>> hdb.set_tables({'prices': prices})
>>> hdb('prices')
pykx.Table(pykx.q('
time sym exchange sz px
-----------------------
'))
GATEWAY
GATEWAY(
port=5010,
*,
process_logs=False,
libraries=None,
apis=None,
connections=None,
connection_validator=None,
init_args=None
)
Bases: STREAMING
start
start(config=None)
Start the gateway processes connections to external processes. This supplied configuration will be used to create 'named' inter-process connections with remote processes which can be called by users in their gateway functions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
dict
|
UNUSED |
None
|
Returns:
Type | Description |
---|---|
None
|
On successful start this functionality will return None, otherwise will raise an error |
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
>>> def gateway_api(value):
... gw.call('tp', b'{x+1}', value)
>>> gw = kx.tick.GATEWAY(
... port=5031,
... connections={'tp': 'localhost:5030'},
... apis={'custom_api': gateway_api})
>>> gw.start()
add_connection
add_connection(connections=None)
Add additional callable named connections to a gateway process
this functionality is additive to the connections (if established)
when configuring a GATEWAY
process. If the same name is used for
two connections the last added connection will be used in function
execution.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
connections |
dict
|
A dictionary which maps a key denoting the 'name' to
be assigned to a process with the connection string containing the
host/port information as follows:
|
None
|
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
>>> def gateway_api(value):
... gw.call('tp', b'{x+1}', value)
>>> gw = kx.tick.GATEWAY(
... port=5031,
... apis={'custom_api': gateway_api})
>>> gw.add_connection({'tp': 'localhost:5030'})
restart
restart()
Restart and re-initialise the Gateway Process, this will start the processes with validation and api functions etc as defined in the initial configuration of the processes.
Example:
Restart a Gateway process validating that defined API's in the restarted process are appropriately defined
>>> import pykx as kx
>>> def gateway_api(value):
... return kx.q(value)
>>> gateway = kx.tick.GATEWAY(
... port=5035,
... libraries={'kx': 'pykx'},
... apis={'custom_api': gateway_api})
Initialising Gateway process on port: 5035
Registering callable function 'custom_function' on port 5035
Successfully registed callable function 'custom_function' on port 5035
Gateway process initialised successfully on port: 5035
>>> gateway.start()
>>> gateway('gateway_api', '1+1')
pykx.LongAtom(pykx.q('2'))
>>> gateway.restart()
Restarting Gateway on port 5035
Gateway process on port 5035 being stopped
Gateway successfully shutdown on port 5035
Initialising Gateway process on port: 5035
Registering callable function 'custom_function' on port 5035
Successfully registed callable function 'custom_function' on port 5035
Gateway process initialised successfully on port: 5035
Gateway process on port 5035 successfully restarted
>>> gateway('gateway_api', '1+1')
pykx.LongAtom(pykx.q('2'))
connection_validation
connection_validation(function)
Define a function to be used on the Gateway process which validates users connecting to the process. This function should take two inputs, username and password and validate a user connecting is allowed to do so.
This function should return True
if a user is permitted to establish
a connection and False
if they are not.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
function |
Callable
|
A function taking two parameters (username and password) which validates that a user connecting to the process is permitted or not to establish a callable connection. |
required |
Define a function on the gateway process to only accept users with the name 'new_user'.
>>> import pykx as kx
>>> def custom_validation(username, password):
... if username != 'new_user':
... return False
... else:
... return True
>>> gateway = kx.tick.GATEWAY(port=5034, connection_validator=custom_validation)
>>> with kx.SyncQConnection(port=5034, username='user') as q:
... q('1+1')
QError: access
>>> with kx.SyncQConnection(port=5034, username='new_user') as q:
... q('1+1')
pykx.LongAtom(pykx.q('2'))
BASIC
BASIC(
tables,
*,
log_directory=".",
hard_reset=False,
database=None,
ports=_default_ports
)
start
start()
Start a basic streaming architecture configured using kx.tick.BASIC
With this basic infrastructure users can then add functionality to increase overall complexity of their system.
Returns:
Type | Description |
---|---|
None
|
On successful initialisation will start the Tickerplant, RDB and HDB processes, setting appropriate configuration |
Examples:
Configure and start a Tickerplant and RDB process using default parameters
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> basic = kx.tick.BASIC(tables={'trade': trade})
>>> basic.start()
Configure and start a Tickerplant, RDB and HDB process architecture loading a database
at the location '/tmp/db'
and persisting the tickerplant logs to
the folder logs
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> basic = kx.tick.BASIC(
... tables={'trade': trade},
... database='/tmp/db',
... log_directory='logs')
>>> basic.start()
Configure and start a Tickerplant, RDB and HDB process setting these processes on the ports 5030, 5031 and 5032 respectively
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> basic = kx.tick.BASIC(
... tables={'trade': trade},
... ports={'tickerplant': 5030, 'rdb': 5031, 'hdb': 5032}
stop
stop()
Stop processing and kill all processes within the streaming workflow. This allows the port on which the process is deployed to be reclaimed and the process to be restarted if appropriate.
Example:
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> basic = kx.tick.BASIC(
... tables={'trade': trade},
... database='/tmp/db',
... log_directory='logs')
>>> basic.start()
>>> basic.stop()
restart
restart()
Restart and re-initialise a Basic streaming infrastructure, this will start the processes with the configuration initially supplied.
Example:
>>> import pykx as kx
>>> trade = kx.schema.builder({
... 'time': kx.TimespanAtom , 'sym': kx.SymbolAtom,
... 'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
... 'px': kx.FloatAtom})
>>> basic = kx.tick.BASIC(
... tables={'trade': trade},
... database='/tmp/db',
... log_directory='logs')
>>> basic.start()
>>> basic.restart()