Skip to content

New Documentation Site!

We are excited to announce the launch of our enhanced product documentation site for PyKX at docs.kx.com. It offers improved search capabilities, organized navigation, and developer-focused content. Please, take a moment to explore the site and share your feedback with us.

Streaming tickerplant

Functionality for the generation and management of streaming infrastructures using PyKX. Fully described here, this allows users to ingest, persist and query vast amounts of real-time and historical data in a unified data-format.

STREAMING

STREAMING(
    port=5010, *, process_logs=False, libraries=None, apis=None, init_args=None
)

The STREAMING class acts as a base parent class for the TICK, RTP, HDB and GATEWAY class objects. Each of these child classes inherit and may modify the logic of this parent. In all cases the functions libraries and register_api for example have the same definition and are available to all process types.

Unless provided with a separate definition as is the case for start in all class types a user should assume that the logic used for use of register_api is consistent across process types.

__call__

__call__(*args)

Execute a synchronous call against the connected process

Parameters:

Name Type Description Default
*args

Pass supplied arguments to the pykx.SyncQConnection object

()

Returns:

Type Description
k.K

The result of the executed call on the connection object

>>> import pykx as kx
>>> tick = kx.tick.TICK(port=5030)
>>> tick('1+1').py()
2

start

start(config=None, print_init=True, custom_start='')

Start/initialise processing of messages on the associated sub-process. This allows users to split the process initialisation from processing of data to allow additional configuration/setup to be completed before messages begin to be processed.

Parameters:

Name Type Description Default
config dict

A dictionary passed to the sub-process which is used by the function .tick.init when the process is started, the supported parameters for this function will be different depending on process type.

None
print_init bool

A boolean indicating if during initialisation we should print a message stating that the process is being initialised successfully.

True

Returns:

Type Description
None

On successful start this functionality will return None, otherwise will raise an error

>>> import pykx as kx
>>> trade = kx.schema.builder({
...     'time': kx.TimespanAtom  , 'sym': kx.SymbolAtom,
...     'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
...     'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
Initialising Tickerplant process on port: 5030
Tickerplant initialised successfully on port: 5030
>>> tick.start()
Starting Tickerplant data processing on port: 5030
Tickerplant process successfully started on port: 5030

stop

stop()

Stop processing on the sub-process and kill the process. This allows the port on which the process is deployed to be reclaimed and the process to be restarted if appropriate.

Example:

>>> import pykx as kx
>>> tick = kx.tick.TICK(port=5030)
Initialising Tickerplant process on port: 5030
Tickerplant initialised successfully on port: 5030
>>> tick.stop()
Tickerplant process on port 5030 being stopped
Tickerplant successfully shutdown on port 5030

libraries

libraries(libs=None)

Specify and load the Python libraries which should be available on a process, the libraries should be supplied as a dictionary mapping the alias used for calling the library to the library name.

Parameters:

Name Type Description Default
libs dict

A dictionary mapping the alias by which a Python library will be referred to the name of library

None
  • In the following example we denote that the process should have access to the Python libraries numpy and pykx which when called by a user will be referred to as np and kx respectively

    >>> import pykx as kx
    >>> tick = kx.tick.TICK(port=5030)
    Initialising Tickerplant process on port: 5030
    Tickerplant initialised successfully on port: 5030
    >>> tick.libraries({'np': 'numpy', 'kx': 'pykx'})

register_api

register_api(api_name, function)

Define a registered API to be callable by name on a process, this API can be a Python function or a PyKX lambda/projection.

Parameters:

Name Type Description Default
api_name str

The name by which the provided function will be called on the process

required
function Callable

The function which is to be defined as a callable API on the process, in the case of a Python function this must be a single independent function which is callable using libraries available on the process

required
>>> import pykx as kx
>>> def custom_func(num_vals, added_value):
...    return added_value + kx.q.til(num_vals)
>>> hdb = kx.tick.HDB(port=5031)
>>> hdb.libraries({'kx': 'pykx'})
>>> hdb.register_api('custom_api', custom_func)
>>> hdb('custom_api', 5, 10)
pykx.LongVector(pykx.q('10 11 12 13 14'))

set_timer

set_timer(timer=1000)

Set a timer on the connected process, this allows users to configure the intervals at which data is published for example.

Parameters:

Name Type Description Default
timer int

The interval at which the timer is triggered in milliseconds.

1000

Returns:

Type Description
None

On successful execution this will return None

set_tables

set_tables(tables, tick=False)

Define the tables to be available to the process being initialized.

Parameters:

Name Type Description Default
tables dict

A dictionary mapping the name of a table to be defined on the process to the table schema

required
tick bool

Is the process you are setting the table on a tickerplant?

False

Returns:

Type Description
None

On a process persist the table schema as the supplied name

Set a table 'trade' with a supplied schema on a tickerplant process

>>> import pykx as kx
>>> trade = kx.schema.builder({
...     'time': kx.TimespanAtom  , 'sym': kx.SymbolAtom,
...     'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
...     'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030)
>>> tick.set_tables({'trade': trade})
>>> tick('trade')
pykx.Table(pykx.q('
time sym exchange sz px
-----------------------
'))

TICK

TICK(
    port=5010,
    *,
    process_logs=True,
    tables=None,
    log_directory=None,
    hard_reset=False,
    chained=False,
    init_args=None
)

Bases: STREAMING

start

start(config=None)

Start/initialise processing of messages on the associated tickerplant sub-process. This allows users to split the process initialisation from processing of data to allow additional configuration/setup to be completed before messages begin to be processed.

Parameters:

Name Type Description Default
config dict

A dictionary passed to the sub-process which is used by the function .tick.init when the process is started, the supported parameters for this function will be different depending on process type.

None

Returns:

Type Description
None

On successful start this functionality will return None, otherwise will raise an error

>>> import pykx as kx
>>> trade = kx.schema.builder({
...     'time': kx.TimespanAtom  , 'sym': kx.SymbolAtom,
...     'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
...     'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
>>> tick.start()

restart

restart()

Restart and re-initialise the Tickerplant, this will start the processes with all tables defined on the expected port

Example:

Restart a Tickerplant validating that the expected tables are appropriately defined

>>> import pykx as kx
>>> trade = kx.schema.builder({
...     'time': kx.TimespanAtom  , 'sym': kx.SymbolAtom,
...     'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
...     'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
Initialising Tickerplant process on port: 5030
Tickerplant initialised successfully on port: 5030
>>> tick.start()
Starting Tickerplant data processing on port: 5030
Tickerplant process successfully started on port: 5030
>>> tick.restart()
Restarting Tickerplant on port 5030

Tickerplant process on port 5030 being stopped
Tickerplant successfully shutdown on port 5030

Initialising Tickerplant process on port: 5030
Tickerplant initialised successfully on port: 5030

Tickerplant on port 5030 successfully restarted
>>> tick('trade')
pykx.Table(pykx.q('
time sym exchange sz px
-----------------------
'))

set_tables

set_tables(tables)

Define the tables to be used for consuming and serving messages on the tickerplant process.

Parameters:

Name Type Description Default
tables dict

A dictionary mapping the name of a table to be defined on the process to the table schema

required

Returns:

Type Description
None

On the tickerplant persist the table schema as the supplied name

Set a table 'trade' with a supplied schema on a tickerplant process

>>> import pykx as kx
>>> trade = kx.schema.builder({
...     'time': kx.TimespanAtom  , 'sym': kx.SymbolAtom,
...     'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
...     'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030)
>>> tick.set_tables({'trade': trade})
>>> tick('trade')
pykx.Table(pykx.q('
time sym exchange sz px
-----------------------
'))

set_snap

set_snap(snap_function)

Define a 'snap' function used by KX Dashboards UI to manage the data presented to a Dashboard process when subscribing to data from a Tickerplant process.

Parameters:

Name Type Description Default
snap_function Callable

A Python function or callable PyKX Lambda which takes a single argument and returns the expected tabular dataset for display

required

Returns:

Type Description
None

On successful execution will set the streaming function .u.snap and return None

Implement a ring-buffer to provide the most recent 1,000 datapoints

>>> import pykx as kx
>>> trade = kx.schema.builder({
...     'time': kx.TimespanAtom  , 'sym': kx.SymbolAtom,
...     'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
...     'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030)
>>> def buffer_ring(x):
...     if 1000 < len(kx.q['trade']):
...         return trade
...     else:
...         kx.q['trade'][-1000:]
>>> tick.set_

RTP

RTP(
    port=5011,
    *,
    process_logs=True,
    libraries=None,
    subscriptions=None,
    apis=None,
    vanilla=True,
    pre_processor=None,
    post_processor=None,
    init_args=None,
    tables=None
)

Bases: STREAMING

start

start(config=None)

Start/initialise processing of messages on the Real-Time Processor. This splits the process initialisation from processing of data to allow additional configuration/setup to be completed before messages begin to be processed.

Parameters:

Name Type Description Default
config dict

A dictionary passed to the sub-process which is used by the function .tick.init when the process is started. The following are the supported config options for RTP processes

1. `tickerplant`: a string denoting the host+port of the
    tickerplant from which messages are received. By default
    port 5010 will be used
2. `hdb`: a string denoting the host+port of the HDB
    which will be re-loaded at end-of-day
3. `database: a string denoting the directory where your current
    days data will be persisted. This should be the same directory
    as the `database` keyword for your HDB process should it be used.
    By default the location "db" will be used in the directory PyKX was
    imported.
None

Returns:

Type Description
None

On successful start this functionality will return None, otherwise will raise an error

>>> import pykx as kx
>>> trade = kx.schema.builder({
...     'time': kx.TimespanAtom  , 'sym': kx.SymbolAtom,
...     'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
...     'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
>>> tick.start()
>>> rdb = kx.tick.RTP(port=5032,
...     subscriptions = ['trade', 'quote']
...     )
>>> rdb.start({
...     'tickerplant': 'localhost:5030',
...     'hdb': 'localhost:5031',
...     'database': 'db'})

restart

restart()

Restart and re-initialise the Real-Time Processor, this will start the processes with all subscriptions, processing functions etc as defined in the initial configuration of the processes.

Example:

Restart an RTP process validating that defined API's in the restarted process are appropriately defined

>>> import pykx as kx
>>> trade = kx.schema.builder({
...     'time': kx.TimespanAtom  , 'sym': kx.SymbolAtom,
...     'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
...     'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
Initialising Tickerplant process on port: 5030
Tickerplant initialised successfully on port: 5030
>>> tick.start()
Starting Tickerplant data processing on port: 5030
Tickerplant process successfully started on port: 5030
>>>
>>> def query_api(table):
...     return kx.q.qsql.select(table)
>>> rdb = kx.tick.RTP(
...     port=5032,
...     process_logs='test.log',
...     libraries = {'kx': 'pykx'},
...     api={'custom_query': query_api}
...     )
Initialising Real-time processor on port: 5032
Registering callable function 'custom_query' on port 5032
Successfully registed callable function 'custom_query' on port 5032
Real-time processor initialised successfully on port: 5032
>>> rdb.start({'tickerplant': 'localhost:5030'})
Starting Real-time processing on port: 5032
Real-time processing successfully started on port: 5032
>>> rdb.restart()
Restarting Real-time processor on port 5032

Real-time processor process on port 5032 being stopped
Real-time processor successfully shutdown on port 5032

Initialising Real-time processor on port: 5032
Registering callable function 'custom_query' on port 5032
Successfully registed callable function 'custom_query' on port 5032
Real-time processor initialised successfully on port: 5032

Starting Real-time processing on port: 5032
Real-time processing successfully started on port: 5032

Real-time processor on port 5032 successfully restarted
>>> rdb('tab:([]5?1f;5?1f)')
>>> rdb('custom_query', 'tab')
pykx.Table(pykx.q('
x         x1
-------------------
0.3017723 0.3927524
0.785033  0.5170911
0.5347096 0.5159796
0.7111716 0.4066642
0.411597  0.1780839
'))

pre_processor

pre_processor(function)

Define a pre-processing function on the RTP process which is called prior to inserting data into the Real-Time Database.

This function must take two parameters:

  1. table: The name of the table to which data will be inserted
  2. message: The data which is to be inserted into the table

If this function returns None or kx.q('::') then data processing will not continue for that message and it will not be inserted into the database.

The pre-processing function should return

Parameters:

Name Type Description Default
function Callable

A callable function or PyKX Lambda taking 2 arguments the name of the table as a str and the message to be processed

required

Returns:

Type Description
None

On successful execution of this method the data pre-processing function defined on the RTP server will be updated

>>> import pykx as kx
>>> trade = kx.schema.builder({
...     'time': kx.TimespanAtom  , 'sym': kx.SymbolAtom,
...     'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
...     'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
>>> tick.start()
>>> def preprocess(table, message):
...     if table in ['trade', 'quote']:
...         return message
...     else:
...         return None
>>> rte = kx.tick.RTP(port=5034,
...                   libraries = {'kx': 'pykx'},
...                   subscriptions = ['trade', 'quote'],
...                   vanilla=False)
>>> rte.pre_processor(preprocess)

post_processor

post_processor(function)

Define a post-processing function on the RTP process which is called after inserting data into the Real-Time Database.

This function must take two parameters:

  1. table: The name of the table to which data will be inserted
  2. message: The data which is to be inserted into the table

This function can have side-effects and does not expect a return

Parameters:

Name Type Description Default
function Callable

A callable function or PyKX Lambda taking 2 arguments the name of the table as a str and the message to be processed

required

Returns:

Type Description
None

On successful execution of this method the data pre-processing function defined on the RTP server will be updated

>>> import pykx as kx
>>> trade = kx.schema.builder({
...     'time': kx.TimespanAtom  , 'sym': kx.SymbolAtom,
...     'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
...     'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
>>> tick.start()
>>> def postprocess(table, message):
...     tradeagg = kx.q.qsql.select('trade',
...                                 columns={
...                                   'trdvol': 'sum px*sz',
...                                   'maxpx': 'max px',
...                                   'minpx': 'min px'},
...                                 by='sym')
...     quoteagg = kx.q.qsql.select('quote',
...                                 columns={
...                                   'maxbpx': 'max bid',
...                                   'minapx': 'min ask',
...                                   'baspread': 'max[bid]-min[ask]'},
...                                 by='sym')
...     kx.q['aggregate'] = kx.q.xcols(['time', 'sym'], tab)
...     return None
>>> rte = kx.tick.RTP(port=5034,
...                   libraries = {'kx': 'pykx'},
...                   subscriptions = ['trade', 'quote'],
...                   vanilla=False)
>>> rte.post_processor(postprocess)

set_tables

set_tables(tables)

Define tables to be available on the RTP processes.

Parameters:

Name Type Description Default
tables dict

A dictionary mapping the name of a table to be defined on the process to the table schema

required

Returns:

Type Description
None

On the RTP persist the table schemas as the supplied name

Set a table 'trade' with a supplied schema on a tickerplant process

>>> import pykx as kx
>>> prices = kx.schema.builder({
...     'time': kx.TimespanAtom  , 'sym': kx.SymbolAtom,
...     'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
...     'px': kx.FloatAtom})
>>> rte = kx.tick.RTP(port=5034,
...                   subscriptions = ['trade', 'quote'],
...                   vanilla=False)
>>> rte.set_tables({'prices': prices})
>>> rte('prices')
pykx.Table(pykx.q('
time sym exchange sz px
-----------------------
'))

HDB

HDB(
    port=5012,
    *,
    process_logs=True,
    libraries=None,
    apis=None,
    init_args=None,
    tables=None
)

Bases: STREAMING

start

start(database=None, config=None)

Start the Historical Database (HDB) process for analytic/query availability. This command allows for the loading of the Database to be used by the process.

Parameters:

Name Type Description Default
database str

The path to the database which is to be loaded on the process.

None
config dict

A dictionary passed to the sub-process which can be used by the function .tick.init when the process is started.

None

Returns:

Type Description
None

On successful start this functionality will return None and load the specified database, otherwise will raise an error.

>>> import pykx as kx
>>> hdb = kx.tick.HDB(port=5031)
>>> hdb.start(database='/tmp/db')

restart

restart()

Restart and re-initialise the HDB Process, this will start the processes with validation and api functions etc as defined in the initial configuration of the processes.

Example:

Restart a HDB process validating that defined API's in the restarted process are appropriately defined

>>> import pykx as kx
>>> def hdb_api(value):
...     return kx.q(value)
>>> hdb = kx.tick.HDB(
...     port=5035,
...     libraries={'kx': 'pykx'},
...     apis={'custom_api': gateway_api})
Initialising HDB process on port: 5035
Registering callable function 'custom_api' on port 5035
Successfully registed callable function 'custom_api' on port 5035
HDB process initialised successfully on port: 5035
>>> hdb('custom_api', '1+1')
pykx.LongAtom(pykx.q('2'))
>>> hdb.restart()
Restarting HDB on port 5035

HDB process on port 5035 being stopped
HDB successfully shutdown on port 5035

Initialising HDB process on port: 5035
Registering callable function 'custom_api' on port 5035
Successfully registed callable function 'custom_api' on port 5035
HDB process initialised successfully on port: 5035

HDB process on port 5035 successfully restarted
>>> hdb('custom_api', '1+1')
pykx.LongAtom(pykx.q('2'))

set_tables

set_tables(tables)

Define tables to be available on the HDB processes.

Parameters:

Name Type Description Default
tables dict

A dictionary mapping the name of a table to be defined on the process to the table schema

required

Returns:

Type Description
None

On the HDB persist the table schemas as the supplied name

Set a table 'prices' with a supplied schema on a HDB process

>>> import pykx as kx
>>> prices = kx.schema.builder({
...     'time': kx.TimespanAtom  , 'sym': kx.SymbolAtom,
...     'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
...     'px': kx.FloatAtom})
>>> hdb = kx.tick.HDB(port=5035)
Initialising HDB process on port: 5035
HDB process initialised successfully on port: 5035
>>> hdb.set_tables({'prices': prices})
>>> hdb('prices')
pykx.Table(pykx.q('
time sym exchange sz px
-----------------------
'))

GATEWAY

GATEWAY(
    port=5010,
    *,
    process_logs=False,
    libraries=None,
    apis=None,
    connections=None,
    connection_validator=None,
    init_args=None
)

Bases: STREAMING

start

start(config=None)

Start the gateway processes connections to external processes. This supplied configuration will be used to create 'named' inter-process connections with remote processes which can be called by users in their gateway functions.

Parameters:

Name Type Description Default
config dict

UNUSED

None

Returns:

Type Description
None

On successful start this functionality will return None, otherwise will raise an error

>>> import pykx as kx
>>> trade = kx.schema.builder({
...     'time': kx.TimespanAtom  , 'sym': kx.SymbolAtom,
...     'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
...     'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
>>> def gateway_api(value):
...     gw.call('tp', b'{x+1}', value)
>>> gw = kx.tick.GATEWAY(
...     port=5031,
...     connections={'tp': 'localhost:5030'},
...     apis={'custom_api': gateway_api})
>>> gw.start()

add_connection

add_connection(connections=None)

Add additional callable named connections to a gateway process this functionality is additive to the connections (if established) when configuring a GATEWAY process. If the same name is used for two connections the last added connection will be used in function execution.

Parameters:

Name Type Description Default
connections dict

A dictionary which maps a key denoting the 'name' to be assigned to a process with the connection string containing the host/port information as follows: <host>:<port>:<username>:<password> where username and password are optional.

None
>>> import pykx as kx
>>> trade = kx.schema.builder({
...     'time': kx.TimespanAtom  , 'sym': kx.SymbolAtom,
...     'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
...     'px': kx.FloatAtom})
>>> tick = kx.tick.TICK(port=5030, tables={'trade': trade})
>>> def gateway_api(value):
...     gw.call('tp', b'{x+1}', value)
>>> gw = kx.tick.GATEWAY(
...     port=5031,
...     apis={'custom_api': gateway_api})
>>> gw.add_connection({'tp': 'localhost:5030'})

restart

restart()

Restart and re-initialise the Gateway Process, this will start the processes with validation and api functions etc as defined in the initial configuration of the processes.

Example:

Restart a Gateway process validating that defined API's in the restarted process are appropriately defined

>>> import pykx as kx
>>> def gateway_api(value):
...     return kx.q(value)
>>> gateway = kx.tick.GATEWAY(
...     port=5035,
...     libraries={'kx': 'pykx'},
...     apis={'custom_api': gateway_api})
Initialising Gateway process on port: 5035
Registering callable function 'custom_function' on port 5035
Successfully registed callable function 'custom_function' on port 5035
Gateway process initialised successfully on port: 5035
>>> gateway.start()
>>> gateway('gateway_api', '1+1')
pykx.LongAtom(pykx.q('2'))
>>> gateway.restart()
Restarting Gateway on port 5035

Gateway process on port 5035 being stopped
Gateway successfully shutdown on port 5035

Initialising Gateway process on port: 5035
Registering callable function 'custom_function' on port 5035
Successfully registed callable function 'custom_function' on port 5035
Gateway process initialised successfully on port: 5035

Gateway process on port 5035 successfully restarted
>>> gateway('gateway_api', '1+1')
pykx.LongAtom(pykx.q('2'))

connection_validation

connection_validation(function)

Define a function to be used on the Gateway process which validates users connecting to the process. This function should take two inputs, username and password and validate a user connecting is allowed to do so.

This function should return True if a user is permitted to establish a connection and False if they are not.

Parameters:

Name Type Description Default
function Callable

A function taking two parameters (username and password) which validates that a user connecting to the process is permitted or not to establish a callable connection.

required

Define a function on the gateway process to only accept users with the name 'new_user'.

>>> import pykx as kx
>>> def custom_validation(username, password):
...     if username != 'new_user':
...         return False
...     else:
...         return True
>>> gateway = kx.tick.GATEWAY(port=5034, connection_validator=custom_validation)
>>> with kx.SyncQConnection(port=5034, username='user') as q:
...     q('1+1')
QError: access
>>> with kx.SyncQConnection(port=5034, username='new_user') as q:
...     q('1+1')
pykx.LongAtom(pykx.q('2'))

BASIC

BASIC(
    tables,
    *,
    log_directory=".",
    hard_reset=False,
    database=None,
    ports=_default_ports
)

start

start()

Start a basic streaming architecture configured using kx.tick.BASIC

With this basic infrastructure users can then add functionality to increase overall complexity of their system.

Returns:

Type Description
None

On successful initialisation will start the Tickerplant, RDB and HDB processes, setting appropriate configuration

Examples:

Configure and start a Tickerplant and RDB process using default parameters

>>> import pykx as kx
>>> trade = kx.schema.builder({
...     'time': kx.TimespanAtom  , 'sym': kx.SymbolAtom,
...     'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
...     'px': kx.FloatAtom})
>>> basic = kx.tick.BASIC(tables={'trade': trade})
>>> basic.start()

Configure and start a Tickerplant, RDB and HDB process architecture loading a database at the location '/tmp/db' and persisting the tickerplant logs to the folder logs

>>> import pykx as kx
>>> trade = kx.schema.builder({
...     'time': kx.TimespanAtom  , 'sym': kx.SymbolAtom,
...     'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
...     'px': kx.FloatAtom})
>>> basic = kx.tick.BASIC(
...     tables={'trade': trade},
...     database='/tmp/db',
...     log_directory='logs')
>>> basic.start()

Configure and start a Tickerplant, RDB and HDB process setting these processes on the ports 5030, 5031 and 5032 respectively

>>> import pykx as kx
>>> trade = kx.schema.builder({
...     'time': kx.TimespanAtom  , 'sym': kx.SymbolAtom,
...     'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
...     'px': kx.FloatAtom})
>>> basic = kx.tick.BASIC(
...     tables={'trade': trade},
...     ports={'tickerplant': 5030, 'rdb': 5031, 'hdb': 5032}

stop

stop()

Stop processing and kill all processes within the streaming workflow. This allows the port on which the process is deployed to be reclaimed and the process to be restarted if appropriate.

Example:

>>> import pykx as kx
>>> trade = kx.schema.builder({
...     'time': kx.TimespanAtom  , 'sym': kx.SymbolAtom,
...     'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
...     'px': kx.FloatAtom})
>>> basic = kx.tick.BASIC(
...     tables={'trade': trade},
...     database='/tmp/db',
...     log_directory='logs')
>>> basic.start()
>>> basic.stop()

restart

restart()

Restart and re-initialise a Basic streaming infrastructure, this will start the processes with the configuration initially supplied.

Example:

>>> import pykx as kx
>>> trade = kx.schema.builder({
...     'time': kx.TimespanAtom  , 'sym': kx.SymbolAtom,
...     'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
...     'px': kx.FloatAtom})
>>> basic = kx.tick.BASIC(
...     tables={'trade': trade},
...     database='/tmp/db',
...     log_directory='logs')
>>> basic.start()
>>> basic.restart()