Skip to content

KX Stream Java API

The KX Stream Java API offers programmable access via the Java programming language to core features available as part of KX Platform. This API extends the core KX Java Fusion interface providing structured access core services only available through the KX Platform. This includes features such as:

Developing with the KX Stream Java API

To build your own Java application using the KX Stream API, you require the base deltaj libraries and a number of third-party libraries, all available in the DeltaJAPI.tgz. This package is available to KX Stream users; please contact KX for download details. The sample test client source and Java doc is also available as part of the install. This will detail and highlight all main features of the KX Stream Java API.

To integrate with the KX Stream Java API, your Java application will have to import the libraries from the DeltaJAPIEval_<version\>.tgz to the project classpath.

Using Eclipse as an example, the steps to set up an application from scratch would be:

  1. New Eclipse project


  2. Create lib folder, copy in depenency jar files from DeltaJAPIEval.tgz and add to the classpath. Libraries are located under DeltaJAPIEval_<version\>/app/deltaj-testclient-<version\>/lib


  3. Add Stream API resources to project.

    1. These include Stream API JavaDoc and a Stream Test Client library with source.
    2. Libraries are located under DeltaJAPIEval_<version\>/app/deltaj-testclient-<version\>/deltaStreamAPIResources
    3. Copy these files to the lib directory and link as highlighted in the images below.



Creating a KX Stream instance

The main entry point to the Java KX Stream API is an instance of DeltaStream. As the features available may extend beyond what the application requires, the configuration of this instance will be important to ensure optimal performance. Full details of the initialization and parameterization is available in the JavaDocs packaged with the DeltaJAPIEval.tgz.

A creation of a single DeltaStream instance can be achieved by:

List<HostAndPort> hostAndPorts = new ArrayList<HostAndPort>();
hostAndPorts.add(new HostAndPort(primaryHost, primaryPort, useTLS));
hostAndPorts.add(new HostAndPort(secondaryHost, secondaryPort, useTLS));

DeltaStream stream = DeltaStreamFactory.newDeltaStream();
stream.initialiseDeltaControl(hostAndPorts, instanceName, instanceId, exclusive,
    loginString, retryMaxTimeOutPeriod, retryTimeoutStep, encrypted);

Initialization errors and disconnections

The KX Stream API can be configured to throw back to the client when initialization fails within a given timeout value. When configured the initialiseDeltaControl method will throw a DeltaStreamException and post initialization connectivity issues are handled via the provided IBreakableServiceListener callback method:

IBreakableServiceListener<DeltaStream> eventListener = new IBreakableServiceListener<DeltaStream>() {
    public void accept(Event<IBreakableService.EventType, DeltaStream> event) {
        if (event.getEventType() == IBreakableService.EventType.BROKEN) {
            LOG.error("Disconnected from KX Platform");
            // Disconnected handling logic

List<HostAndPort> hostAndPorts = new ArrayList<HostAndPort>();
hostAndPorts.add(new HostAndPort(primaryHost, primaryPort, useTLS));
hostAndPorts.add(new HostAndPort(secondaryHost, secondaryPort, useTLS));

DeltaStream stream = DeltaStreamFactory.newDeltaStream();

try {
    stream.initialiseDeltaControl(hostAndPorts, instanceName, instanceId, exclusive, loginString,
        retryMaxTimeOutPeriod, retryTimeoutStep, encrypted, failConnectTimeout, eventListener);
    } catch (IOException ioe) {
        LOG.error("Failed to initialise KX Stream API", ioe);

The listener should be passed as the parameter eventListener of initialiseDeltaControl(), and the accept method of the listener will be invoked if the BROKEN event occurs. The BROKEN event means the KX Platform is unexpectedly unavailable. E.g. we've lost connectivity to the server and attempts to recover have failed within the specified failConnectTimeout. In this event, the close method is invoked automatically; the listener need only implement any application logic to handle this state.

Currently, BROKEN is the only supported event type within the KX Stream API.

Initialization parameter details

parameter description
hostAndPorts List of KX Control host connection details
instanceName Instance name. Should relate to the application
instanceId If started from control; this should be applied here
exclusive Can duplicate instances of this run
loginString Authentication details username:password formatted
retryMaxTimeOutPeriod Time in milliseconds before next reconnect attempt if previous connection attempt failed
retryTimeoutStep Deprecated, not used
encrypted Whether the loginString should be encrypted or not
failConnectTimeout Timeout in seconds for connecting and reconnecting, 0 means no timeout and the BROKEN event is never triggered
eventListener Listener for BROKEN event, cannot be null when failConnectTimeout is greater than 0

Closing a KX Stream instance

Once a KX Stream API instance is no longer required, call the close method to close connections and release resources. The services made available by KX Stream API will also be closed.


Calling close() on an instance that is already closed is a no-op.

Available services

Once an instance of DeltaStream has been created a number of services are available to interact with KX Platform deploy. Each has a Demo class within the Stream Test Client as an example of available functionality

service demo class description
Configuration Management DeltaStreamDemo Allows interaction with Control configuration data
Tickerplant interface DeltaStreamDemoTPPublish Publish Data from a Java application to a running Tickerplant
Query Router Service DeltaStreamDemoQR Execute async queries aganst a running KX Platform environment
Public API Service DeltaStreamDemoAPI Execute public API functions via the Java Stream API
Streaming Analytics API DeltaStreamDemoStreamingAnalytics Receive Streaming analytic updates from the KX Platform
Messaging and data discovery service DeltaStreamDemoDMS Integrate with the KX Platform Messaging Service

Once the instance of DeltaStream has been closed, expectedly or unexpectedly, all these services are also closed and unavailable.

Configuration management

An instance of DeltaStream will have a ConfigurationManager associated with it. The ConfigurationManager has a wide array of available functionality, which is fully detailed in the Java doc. Below are a few examples of what is available.

// Retrieves a config Parameter.
// Also available getFlip(...) to return the data as a c.Flip entity
List<Map<String, Object>> configParam =
    stream.getConfigurationManager().getMapList("<configName>", "<overrideName>");

// Retrieves all overrides for a parameter
String[] overrides = stream.getConfigurationManager().getOverrides("<configname>");

// Returns the connection details for a connection entity
DeltaQConnectionDetails connection =

// Returns the schema details by schema tab
DeltaSchema schema = stream.getConfigurationManager().getSchema("<schemaName>");

// Returns a list of schema part of a schema group
String[] schemas =

// Returns the Process Template associated with an Instance by name

// Returns the instance parameter values associated with a Process Instance
Map<String, Object> instanceParameters =

// Looks up task info for a given task name
Map<String, Object> taskInfo =

The Stream API framework ConfigurationManager can also be used to listen for configuration updates from KX Platform, allowing the implementing application to dynamically update behaviors based on config data.

// Listen to updates on Configuration from Control
        "DASHconfigBundle", "DEFAULT", new ConfigurationUpdateListener()
            public void onConfigUpdateMapList
                String configName,
                String override,
                Object>> data
                    "onConfigUpdateMapList: " + configName + ":" +
                    override + " updated"
            public void onConfigUpdatePropList
                String configName, String override, List<Properties> data
            public void onConfigUpdate
                String configName, String override, String[] data
            public void onConfigUpdate
                String configName, String override, Properties data
            public void onConfigUpdate
                String configName, String override, Flip data
            public void onConfigUpdate
                String configName, String override

The UpdateMode allows different data structures to be provided to the listener for ease of use within the application. See JavaDoc for more information.

Tickerplant interface

The Java Stream API allows data to be published directly to tickerplants defined in the application. This allows data to be published in various formats through the stream instance

DeltaStreamTickerPlantDetails tpDetails =
    new DeltaStreamTickerPlantDetails
        pub_instance_name, pub_host, pub_port, pub_login, tlsConnection,
        pub_schema_group, pub_cmd, pub_data_format, pub_timeout_intv,
        pub_timeout_intv_step, pub_recovery_file, pub_recovery_intv,
        pub_recovery_batchsize, pub_recovery_clearOnStartup,
        pub_recovery_maxkb, pub_heatbeat_type, pub_heartbeat_intv,
        sub_schema, sub_cmd

A DeltaStreamTickerPlantDetails instance has numerous parameters which control the connectivity and recovery options available

pub_instance_name            Tickerplant instance name
pub_host                     Tickerplant hostname
pub_port                     Tickerplant port
pub_login                    Tickerplant user authentication details
tlsConnection                Tickerplant TLS connection on/off/both
pub_schema_group             Group containing schemas to load into publisher
pub_cmd                      Command used to publish data
                             Should use DeltaStreamConnection.DSTR_UPD_ARRAY
pub_data_format              Format of data to publish to tickerplant
pub_timeout_intv             Maximum time in ms to wait between reconnect attempts
pub_timeout_intv_step        Step growth size in ms to each retry attempt,
                             up to pub_timeout_intv
pub_recovery_file            Filename for recovery files
pub_recovery_intv            Delay between executions of the recovery process
pub_recovery_batchsize       Max records sent by recovery process
                             to tickerplant in a single cycle
pub_recovery_clearOnStartup  If true recovery process will ignore
                             all pre-existing files
pub_recovery_maxkb           Max size recovery file will be allowed to grow (KB)
pub_heatbeat_type            Type of heartbeating
pub_heartbeat_intv           How often to heartbeat
sub_schema                   List of tables the tickerplant should
                             listen for updates on
sub_cmd                      Command used to register for incoming messages

Publish formats

format Java accessor data published as
Flip DeltaQConnection.PUBLISH_FORMAT_FLIP Flip
Array DeltaQConnection.PUBLISH_FORMAT_ARRAY Array
Dict DeltaQConnection.PUBLISH_FORMAT_DICT Dict

Heartbeat socket types

heartbeat java accessor description
DATA_SOCKET HeartBeatSocket.DATA_SOCKET Send heartbeats over existing command socket connection
HEARTBEAT_SOCKET HeartBeatSocket.HEARTBEAT_SOCKET Send heartbeat over dedicated connection

There are a number of customizations available into how the Tickerplant publish service operates. Full details are in the JavaDoc. A number of useful ones are detailed here. Any changes to the tickerplant configuration made after initialization will be ignored.


When publishing to a defined schema the Stream API will null all columns for a schema being published to. This is to ensure compatibility with the tickerplant. However it is often desirable to block the publication of a time column, allowing the tickerplant to populate this column as the time it received the message. In the Java Stream API this is achieved by calling the setTPTimeColBlocked method:



If the application wants to receive updates when a tickerplant disconnects and reconnects there is a listener available to do so.

deltaStream.addAPIListener(new DeltaStreamAPIListener() {
    public void onTPDisconnected(String source) {
        System.out.println("TP has disconnected " + source);
    public void onTPConnected(String source) {
        System.out.println("TP has connected " + source);
    public Object onIncomingSync(String source, Object data) {
        return null;
    public void onIncomingAsync(String source, Object data) {
        System.out.println("Incoming async " + source);

Query router service

The Query Router allows for advanced query execution within the platform. If set up, it can be used to load-balance queries through routed queries across data warehouse clusters and provide Quality of Service (QoS) based query execution providing different priority for different queries. Please review KX Platform documentation for further information on the Query Router.

The Query Router can be initialized within the KX Stream API through the DeltaStream instance. For example:

    (deltaQueryRouter, name, userPass, connectionListener);
parameter required description
deltaQueryRouter no Allows the user to pass in an already created instance of a IDeltaQueryRouter. If null, one will be created based on configuration in KX Control.
name yes Unique name for the QR Service, used to identify query sources in QR logs.
userPass yes Authentication details for connecting to the Query Router.
connectionListener no Allows the application to provide an IQRConnectionListener instance to be informed of connection/disconnection events from the Query Router. Note that in the case of a sharded QR, the listener receives events for the DEFAULT shard only.

Once initialized, queries can be executed against the Query Router. There are a number of query-execution APIs available to the Query Router service.

Request/response query

A request to execute a query can be submitted to the Query Router in the format

    (queryObj, user, database, optParams, messageHandler);
parameter required description
queryObj yes The query object which is being executed.
user yes User executing the query. The user who authenticated the connections does not have to be the same user executing the query.
database yes The connection group where the query should be executed. (This may be overriden in the cause of routed queries).
optParams yes A Dictionary containing optional parameters for the query. Can be empty
messageHandler yes The IQRMessageHandler handler instance which will handle the results/exception.
A IQRBinaryMessageHandler is also available. This will return the data in native IPC binary format allowing the application to process the response manually

This will execute the query asynchronously and the result provided by the messageHandler. The format of the queryObj is as per defined in the KX Fusion interface. For example

Object queryObj = "2+2";
queryObj = new Object[] { '+', 2, 2 };

KX Platform adds additional secure-handling checks depending on the executing users and configuration of the environment. In such cases string queries, or functions the the calling user is unpermissioned for, will fail with a suitable error.

Polled queries

The Query Router also supports polled queries where a query can be submitted to the Query Router and the Query Router will execute that query on a timer and publish results back until the client disconnects or a stopPolledQuery is requested.

String runId = deltaStream.getDeltaQueryRouterService().startPolledQuery
    (queryObj, user, frequency, database, optParams, messageHandler);
parameter required description
queryObj yes The query object which is being executed.
user yes User executing the query. The User who authenticated the connections does not have to be the same user who is is executing the query.
frequency yes How often to the QR should execute the query in millisecs
database yes The connection; connection group where the query should be executed. (This may be overriden in the cause of routed queries).
optParams yes A Dictionary containing optional parameters for the query. Can be empty
messageHandler yes The IQRMessageHandler handler instance which will handle the results/exception. This will return the data in native IPC binary format allowing the application to process the response manually

Polled Queries are stopped via the API.

    (runIds, user, messageHandler);
parameter required description
runIds yes A String[] of runIds to stop
user yes User requesting the stop.
messageHandler yes The IQRStopPollMessageHandler handler instance which will handle return of the request

Public query API

KX Stream API provides a public API service which can be defined and configured in your Platform environment. These are analytic functions which define input parameters; optional return parameters, and execute functions based on the analytic definition through the Query Router service. Unlike the Query API interface these queries are synchronous rather than through a callback asynchronous message handler.

The Query API can be initialized within the Stream API via DeltaStream instance:

stream.initialiseDeltaAPIService("<username:password>", null);

As the Public API service uses the Query Router, the initialization includes the initialization of the Query Router Service if not already initialized. The second parameter to the initialize call is a reference to a IQRConnectionListener providing callbacks for QRConnection and QRDisconnection events. This can be null if custom callbacks are not required for these events. Once initialized the available API can be retrieved by calling

Map<String, IDeltaAPI> apis = stream.getDeltaAPIService().getAPI();

This will return a Map<String, IDeltaAPI> of available and permissioned API in your environment. The key is the name for the API; associated with the alias value of the analytics within your Platform environment, and the value is an IDeltaAPI object representing the API which can be executed. The IDeltaAPI object is then used to build an IDeltaQuery via the IDeltaAPIService

IDeltaAPI api = apis.get("<aliasName>");
IDeltaQuery query = stream.getDeltaAPIService().createDeltaQuery(api);

At this point you have an IDeltaQuery framework object containing the API definition. The user must then apply parameterization to the IDeltaQuery object. This will depend on what the API is expecting as parameters. The IDeltaAPI will detail the expected parameters; as will the analytic definition in the Platform environment. For example to apply Dict input parameters:

Map<String, Object> params = new HashMap<String, Object>();
params.put("symList", new String[] { "EUR/USD" });
Alternatively, if the expected input is a Table, the parameters would be applied:

Map<String, Object> params = new HashMap<String, Object>();
params.put("col1", 1);
params.put("col2", new Integer[] {1, 2, 3});
params = new HashMap<String, Object>();
params.put("col1", 10);
params.put("col2", new Integer[] {99, 62, 33});

Once the IDeltaQuery object has been created, it can then be executed with the runQuery method API. This call will block until the query execution finishes or the call times out. The data returned will be as received from the q process; typically a Dict or a Flip object.

Object result = stream.getDeltaAPIService().runQuery(query, username, optParams);
parameter required description
query yes IDeltaQuery being executed
username yes user to run query as
optParams yes Map of optional parameters. Can be null

The optional parameters provided to the runQuery method allow customized processing within the KX Platform query framework.

name type description
timeout int Override the default timeout for this request (in seconds)
clientTime timestamp Timestamp assigned by client when entering request. Used in QR timestamp logging
logCorr char[] Client assigned logging correlator
page int Number of rows to return (same behavior as sublist)
page int[] Two element list of start and number rows (same as sublist). If specified in this format, results will be returned in the Control paged format
cache boolean Whether to cache paged results
sort boolean Sort results as part of paging

When including optional parameters, provide a Map object as part of the method call:

Map<String, Object> optParams = 
    new HashMap<String, Object>();
optParams.put(IDeltaQueryRouter.OPTPARAM_QUERYTIMEOUT_KEY, 10);
Object result = 
    stream.getDeltaAPIService().runQuery(query, "<username>", optParams);

For full details of the available optional parameters please refer to the Query Router section of the Process Template API documentation.

The available API can be refreshed programmatically by calling


Streaming Analytics API

The Stream for KX Java API provides access to a Streaming Analytics service, which can be used to subscribe to and receive updates from a Streaming Analytic. The service can be accessed via a Platform deployment as follows:

IStreamingAnalyticsService streamingService = stream.getStreamingAnalyticsService();

To start streaming simply call startStreaming.

Subscription s = streamingService.startStreaming
    ("analyticName", parameters, "database", "messageHandler");

A number of parameters are required.

parameter description
analyticName Name of the streaming analytic as defined in the Platform
parameters Parameters as defined in the analytic within the Platform. The type of this object will depend on the parameter types expected by the analytic.
database Database where the Streaming Analytic should be run
messageHandler Handles incoming messages. Instance of ISubscriptionMessageListener. A IBinarySubscriptionMessageListener also available to return data as native kdb+ IPC byte array. This allows the data to be deserialized within the application.

To stop streaming, call the removeSubscription method.


Messaging and data-discovery service

Messaging Server (DMS) support has been built into the Java Stream for API. This allows the user to publish data to Platform processes without having to set up destinations or point-to-point links. The user must only connect to the DMS service and register what it will publish, and begin publishing. Likewise, the Java Stream API can subscribe to data from their Platform environment by registering interest in what is being published.

For both publishing and subscribing, it is necessary to initialize the DeltaStream instance, and then set up the DMS instance.

DeltaMessagingServer dms = 
    new DeltaMessagingServer(stream, "dmsConfigName", "userPass");
parameter description
stream Instance of DeltaStream
dmsConfigName Name of the Messaging Server configuration. Stream default is DS_MESSAGING_SERVER:DS
userPass Access details ':' separated


To publish via DMS; it is necessary to create a class implementing the DeltaMessagingPublisher interface; this allows it to listen for incoming subscription (and un-subscription) requests.

DeltaMessagingPublisher dmsPublisher = new DeltaMessagingPublisher() 
    List<DeltaMessagingSubscription> subscriptions = new ArrayList< >();

    public void subscriptionRemoved
        (String table, DeltaMessagingSubscription subscription) 

    public void subscriptionAdded
        (String table, DeltaMessagingSubscription subscription) 

Then, the instance should register this entity publisher with a channel, topic, and if applicable any filters which may be required. The channel and topic would be used to match a subscriber in your environment interested in the data.

HashMap<String, String> filterCols = new HashMap<String, String>();
filterCols.put("src", "GS.GS");
dms.registerPublisher(dmsPublisher, "channelName","topicName", filterCols);

Publishing would be achieved by using the defined DeltaMessageSubscriptions and publishing to each.

// Publish a Dict
Object[] record = {
    new java.sql.Date(currenttime.getTime()), 
Dict data = new Dict(cols, record);

for(DeltaMessagingSubscription s : subscriptions){
    s.publish(topic, data);

// Publishing a Flip
Object[] records = {
    new String[] { "EUR/USD", "EUR/USD" },
    new String[] { "GS.GS", "GS.GS" },
    new String[] { "0", "1" },
    new Timestamp[] { currentTS, currentTS },
    new Double[] { Double.valueOf(1.3689), Double.valueOf(1.36339) },
    new Double[] { Double.valueOf(1000), Double.valueOf(2000) },
    new String[] { NA, NA },
    new String[] { NA, NA },
    new Timestamp[] { currentTS, currentTS },
    new String[] { NA, NA },
    new Object[] { 'A', 'B' },
    new Double[] { Double.valueOf(0), Double.valueOf(0) },
    new String[] { "EUR", "GBP" },
    new java.sql.Date[] { 
        new java.sql.Date(currenttime.getTime()),
        new java.sql.Date(currenttime.getTime()) 
    new Timestamp[] { currentTS, currentTS }
Flip fData = new Flip(new Dict(cols, records));
for(DeltaMessagingSubscription s : subscriptions){s.publish(topic, data); }


To subscribe via DMS, you should create a class implementing DeltaMessagingSubscriber. This enforces the need for a upd function to accept the incoming data for processing. This will contain the topic and a Flip of the data. Like publication the DMS instance should be registered as a subscriber with a channel, topic and if applicable any filters.

dms.registerSubscriber(new DeltaMessagingSubscriber() {

    public void upd(String baseTopic, Flip data) {
        //do Work
}, channel, topic, filterCols);

Once done, that’s it, your DeltaMessagingSubscriber upd method will receive any updates which you’ve subscribed to and are publishing.

System configuration options

When running the application it is possible to paramterise the Java Stream API to help tune the performance and memory utilization via system properties.

These should be passed in to the JVM like any system property. For example

java -jar JavaStreamClient.jar -Dmessaging.remotesub.queuesize=50000

Messaging service configuration

property default description
messaging.remotesub.queuesize Integer.MAX Size of the Remote Subscriber connection publish queue. Entries above this length will block the publishing thread until room becomes available in the queue
messaging.remotesub.queuemode BLOCK Queuing mode, BLOCK will block the publish thread until successfully queued; TIMEOUT will attempt to queue the message for publication and timeout depending on the messaging.remotesub.queuetimeoutus system property
messaging.remotesub.queuetimeoutus 0 Queue timeout when in TIMEOUT queue mode. Defaults to 0 which will BLOCK until queued
messaging.remoteconn.reconnectratems 30000 Time in millisecs to wait between reconnection attempts for Remote Connections (publishers and subscribers)
messaging.remoteconn.tcpnodelay true TCP no delay configuration for connections to Remote Connections (publishers and subscribers)
messaging.remoteconn.sendbuffersize JDK default Socket buffer size for Remote Connections (publishers and subscribers)
messaging.server.reconnectratems 2000 Time in millisecs to wait between reconnection attempts to the Messaging Server
messaging.server.connecttimeoutms 10000 Time in millisecs to timeout the attempt to reconnect to the Messaging Server and initiate failover proceudres to a secondary Messaging Server
messaging.server.heartbeatintervals 30 Heartbeat interval in seconds. If no response in 1.5 * interval client will doconnect and begin reconnection attempts

Query service configuration

Property Default Description
queryservice.reconnecting.ratesec 30 Rate of reconnection attempts to underlying Query Service processes
queryservice.connectedqpsonly false Whether to only allow queries to be assigned to currently running and connected Query Processors
queryservice.heartbeat.ratesec 10 Heartbeat interval in seconds. If no response in 1.5 * interval client will disconnect and being reconnection attempts
queryservice.pubthreadpoolsize 32 Number of publish threads which spawn from the Query Processor threads to publish to clients
queryservice.parallelqpconnect false Initiate the QP (re)connectivity on parallel threads up to the maximum number of unconnected QPs. This utilises a cached thread pool and thus works best when dealing with a reasonable number of QP connections.

Query manager configuration

Property Default Description
querymanager.heartbeat.ratesec 10 Heartbeat interval in seconds. If no response in 1.5 * interval client will disconnect and being reconnection attempts
querymanager.reconnect.intervalsec 30 Reconnect rate in seconds
Back to top