Kx Stream Java API

The Kx Stream Java API offers programmable access via the Java programming language to core features available as part of Kx Platform. This API extends the core Kx Java Fusion interface providing structured access core services only available through the Kx Platform. This includes features such as:

Developing with the Kx Stream Java API

To build your own Java application using the Kx Stream API, you require the base deltaj libraries and a number of third-party libraries, all available in the DeltaJAPI.tgz. This package is available to Kx Stream users; please contact Kx for download details. The sample test client source and Java doc is also available as part of the install. This will detail and highlight all main features of the Kx Stream Java API.

To integrate with the Kx Stream Java API, your Java application will have to import the libraries from the DeltaJAPIEval_<version\>.tgz to the project classpath.

Using Eclipse as an example, the steps to set up an application from scratch would be:

  1. New Eclipse project

    Screenshot

  2. Create lib folder, copy in depenency jar files from DeltaJAPIEval.tgz and add to the classpath. Libraries are located under DeltaJAPIEval_<version\>/app/deltaj-testclient-<version\>/lib

    Screenshot

  3. Add Stream API resources to project.

    1. These include Stream API JavaDoc and a Stream Test Client library with source.
    2. Libraries are located under DeltaJAPIEval_<version\>/app/deltaj-testclient-<version\>/deltaStreamAPIResources
    3. Copy these files to the lib directory and link as highlighted in the images below.

    Screenshot

    Screenshot

Creating a Kx Stream instance

The main entry point to the Java Kx Stream API is an instance of DeltaStream. As the features available may extend beyond what the application requires, the configuration of this instance will be important to ensure optimal performance. Full details of the initialization and parameterization is available in the JavaDocs packaged with the DeltaJAPIEval.tgz.

A creation of a single DeltaStream can be achieved by:

List<HostAndPort> hostAndPorts = new ArrayList<HostAndPort>();
hostAndPorts.add(new HostAndPort(primaryHost, primaryPort, useTLS));
hostAndPorts.add(new HostAndPort(secondaryHost, secondaryPort, useTLS));

DeltaStream stream = DeltaStreamFactory.newDeltaStream();
stream.initialiseDeltaControl(hostAndPorts, instanceName, instanceId, exclusive,
    loginString, retryMaxTimeOutPeriod, retryTimeoutStep, encrypted);

This will create a connection to Kx Control and authorize the user and connection where:

parameter description
hostAndPorts List of Kx Control host connection details
instanceName Instance name. Should relate to the application
instanceId If started from control; this should be applied here
exclusive Can duplicate instances of this run
loginString Authentication details username:password formatted
retryMaxTimeOutPeriod Time in milliseconds before next reconnect attempt if previous connection attempt failed
retryTimeoutStep Deprecated, not used
encrypted Whether the loginString should be encrypted or not

Available services

Once an instance of DeltaStream has been created a number of services are available to interact with Kx Platform deploy. Each has a Demo class within the Stream Test Client as an example of available functionality

service demo class description
Configuration Management DeltaStreamDemo Allows interaction with Control configuration data
Tickerplant interface DeltaStreamDemoTPPublish Publish Data from a Java application to a running Tickerplant
Query Router Service DeltaStreamDemoQR Execute async queries aganst a running Kx Platform environment
Public API Service DeltaStreamDemoAPI Execute public API functions via the Java Stream API
Streaming Analytics API DeltaStreamDemoStreamingAnalytics Receive Streaming analytic updates from the Kx Platform
Messaging and data discovery service DeltaStreamDemoDMS Integrate with the Kx Platform Messaging Service

Configuration management

An instance of DeltaStream will have a ConfigurationManager associated with it. The ConfigurationManager has a wide array of available functionality, which is fully detailed in the Java doc. Below are a few examples of what is available.

// Retrieves a config Parameter.
// Also available getFlip(...) to return the data as a c.Flip entity
List<Map<String, Object>> configParam =
    stream.getConfigurationManager().getMapList("<configName>", "<overrideName>");

// Retrieves all overrides for a parameter
String[] overrides = stream.getConfigurationManager().getOverrides("<configname>");

// Returns the connection details for a connection entity
DeltaQConnectionDetails connection =
    stream.getConfigurationManager().getConnectionDetails("<connectionName>");

// Returns the schema details by schema tab
DeltaSchema schema = stream.getConfigurationManager().getSchema("<schemaName>");

// Returns a list of schema part of a schema group
String[] schemas =
    stream.getConfigurationManager().getSchemasForGroup("<schemaGroupName>");

// Returns the Process Template associated with an Instance by name
stream.getConfigurationManager().getProcessTemplateName("<instanceName>");

// Returns the instance parameter values associated with a Process Instance
Map<String, Object> instanceParameters =
    stream.getConfigurationManager().getProcessInstanceParamValues("<instanceName>");

// Looks up task info for a given task name
Map<String, Object> taskInfo =
    stream.getConfigurationManager().getTaskInfo("<taskName>");

The Stream API framework ConfigurationManager can also be used to listen for configuration updates from Kx Platform, allowing the implementing application to dynamically update behaviors based on config data.

// Listen to updates on Configuration from Control
stream.getConfigurationManager().addConfigurationManagerListener
    (
        "DASHconfigBundle", "DEFAULT", new ConfigurationUpdateListener()
        {
            @Override
            public void onConfigUpdateMapList
            (
                String configName,
                String override,
                List<Map<String,
                Object>> data
            )
            {
                System.out.println
                (
                    "onConfigUpdateMapList: " + configName + ":" +
                    override + " updated"
                );
            }
            @Override
            public void onConfigUpdatePropList
            (
                String configName, String override, List<Properties> data
            )
            {}
            @Override
            public void onConfigUpdate
            (
                String configName, String override, String[] data
            )
            {}
            @Override
            public void onConfigUpdate
            (
                String configName, String override, Properties data
            )
            {}
            @Override
            public void onConfigUpdate
            (
                String configName, String override, Flip data
            )
            {}
            @Override
            public void onConfigUpdate
            (
                String configName, String override
            )
            {}
        },
        UpdateMode.MapList
    );

The UpdateMode allows different data structures to be provided to the listener for ease of use within the application. See JavaDoc for more information.

Tickerplant interface

The Java Stream API allows data to be published directly to tickerplants defined in the application. This allows data to be published in various formats through the stream instance

DeltaStreamTickerPlantDetails tpDetails =
    new DeltaStreamTickerPlantDetails
    (
        pub_instance_name, pub_host, pub_port, pub_login, tlsConnection,
        pub_schema_group, pub_cmd, pub_data_format, pub_timeout_intv,
        pub_timeout_intv_step, pub_recovery_file, pub_recovery_intv,
        pub_recovery_batchsize, pub_recovery_clearOnStartup,
        pub_recovery_maxkb, pub_heatbeat_type, pub_heartbeat_intv,
        sub_schema, sub_cmd
    );
stream.addTickerPlantDestination(tpDetails);

A DeltaStreamTickerPlantDetails instance has numerous parameters which control the connectivity and recovery options available

pub_instance_name            Tickerplant instance name
pub_host                     Tickerplant hostname
pub_port                     Tickerplant port
pub_login                    Tickerplant user authentication details
tlsConnection                Tickerplant TLS connection on/off/both
pub_schema_group             Group containing schemas to load into publisher
pub_cmd                      Command used to publish data
                             Should use DeltaStreamConnection.DSTR_UPD_ARRAY
pub_data_format              Format of data to publish to tickerplant
pub_timeout_intv             Maximum time in ms to wait between reconnect attempts
pub_timeout_intv_step        Step growth size in ms to each retry attempt,
                             up to pub_timeout_intv
pub_recovery_file            Filename for recovery files
pub_recovery_intv            Delay between executions of the recovery process
pub_recovery_batchsize       Max records sent by recovery process
                             to tickerplant in a single cycle
pub_recovery_clearOnStartup  If true recovery process will ignore
                             all pre-existing files
pub_recovery_maxkb           Max size recovery file will be allowed to grow (KB)
pub_heatbeat_type            Type of heartbeating
pub_heartbeat_intv           How often to heartbeat
sub_schema                   List of tables the tickerplant should
                             listen for updates on
sub_cmd                      Command used to register for incoming messages

Publish formats

format Java accessor data published as
Flip DeltaQConnection.PUBLISH_FORMAT_FLIP Flip
Array DeltaQConnection.PUBLISH_FORMAT_ARRAY Array
Dict DeltaQConnection.PUBLISH_FORMAT_DICT Dict

Heartbeat socket types

heartbeat java accessor description
DATA_SOCKET HeartBeatSocket.DATA_SOCKET Send heartbeats over existing command socket connection
HEARTBEAT_SOCKET HeartBeatSocket.HEARTBEAT_SOCKET Send heartbeat over dedicated connection
DISABLED_HEARTBEAT HeartBeatSocket.DISABLED_HEARTBEAT Don’t heartbeat

There are a number of customizations available into how the Tickerplant publish service operates. Full details are in the JavaDoc. A number of useful ones are detailed here. Any changes to the tickerplant configuration made after initialization will be ignored.

setTPTimeColBlocked

When publishing to a defined schema the Stream API will null all columns for a schema being published to. This is to ensure compatibility with the tickerplant. However it is often desirable to block the publication of a time column, allowing the tickerplant to populate this column as the time it received the message. In the Java Stream API this is achieved by calling the setTPTimeColBlocked method:

deltaStream.setTPTimeColBlocked(true);

addAPIListener

If the application wants to receive updates when a tickerplant disconnects and reconnects there is a listener available to do so.

deltaStream.addAPIListener(new DeltaStreamAPIListener() {
    @Override
    public void onTPDisconnected(String source) {
        System.out.println("TP has disconnected " + source);
    }
    @Override
    public void onTPConnected(String source) {
        System.out.println("TP has connected " + source);
    }
    @Override
    public Object onIncomingSync(String source, Object data) {
        return null;
    }
    @Override
    public void onIncomingAsync(String source, Object data) {
        System.out.println("Incoming async " + source);
    }
});

Query Router Service

The Query Router allows for advanced query execution within the platform. If set up, it can be used to load-balance queries through routed queries across data warehouse clusters and provide Quality of Service (QoS) based query execution providing different priority for different queries. Please review Kx Platform documentation for further information on the Query Router.

The Query Router can be initialized within the Kx Stream API through the DeltaStream instance. For example:

deltaStream.initialiseDeltaQueryRouterService
    (deltaQueryRouter, name, userPass, connectionListener);
parameter required description
deltaQueryRouter no Allows the user to pass in an already created instance of a IDeltaQueryRouter. If null, one will be created based on configuration in Kx Control.
name yes Unique name for the QR Service, used to identify query sources in QR logs.
userPass yes Authentication details for connecting to the Query Router.
connectionListener no Allows the application to provide an IQRConnectionListener instance to be informed of connection/disconnection events from the Query Router. Note that in the case of a sharded QR, the listener receives events for the DEFAULT shard only.

Once initialized, queries can be executed against the Query Router. There are a number of query-execution APIs available to the Query Router service.

Request/response query

A request to execute a query can be submitted to the Query Router in the format

deltaStream.getDeltaQueryRouterService().runQuery
    (queryObj, user, database, optParams, messageHandler);
parameter required description
queryObj yes The query object which is being executed.
user yes User executing the query. The user who authenticated the connections does not have to be the same user executing the query.
database yes The connection group where the query should be executed. (This may be overriden in the cause of routed queries).
optParams yes A Dictionary containing optional parameters for the query. Can be empty
messageHandler yes The IQRMessageHandler handler instance which will handle the results/exception.
A IQRBinaryMessageHandler is also available. This will return the data in native IPC binary format allowing the application to process the response manually

This will execute the query asynchronously and the result provided by the messageHandler. The format of the queryObj is as per defined in the Kx Fusion interface. For example

Object queryObj = "2+2";
queryObj = new Object[] { '+', 2, 2 };

Kx Platform adds additional secure-handling checks depending on the executing users and configuration of the environment. In such cases string queries, or functions the the calling user is unpermissioned for, will fail with a suitable error.

Polled queries

The Query Router also supports polled queries where a query can be submitted to the Query Router and the Query Router will execute that query on a timer and publish results back until the client disconnects or a stopPolledQuery is requested.

String runId = deltaStream.getDeltaQueryRouterService().startPolledQuery
    (queryObj, user, frequency, database, optParams, messageHandler);
parameter required description
queryObj yes The query object which is being executed.
user yes User executing the query. The User who authenticated the connections does not have to be the same user who is is executing the query.
frequency yes How often to the QR should execute the query in millisecs
database yes The connection; connection group where the query should be executed. (This may be overriden in the cause of routed queries).
optParams yes A Dictionary containing optional parameters for the query. Can be empty
messageHandler yes The IQRMessageHandler handler instance which will handle the results/exception. This will return the data in native IPC binary format allowing the application to process the response manually

Polled Queries are stopped via the API.

deltaStream.getDeltaQueryRouterService().stopPolledQueries
    (runIds, user, messageHandler);
parameter required description
runIds yes A String[] of runIds to stop
user yes User requesting the stop.
messageHandler yes The IQRStopPollMessageHandler handler instance which will handle return of the request

Public Query API

Kx Stream API provides a public API service which can be defined and configured in your Platform environment. These are analytic functions which define input parameters; optional return parameters, and execute functions based on the analytic definition through the Query Router service. Unlike the Query API interface these queries are synchronous rather than through a callback asynchronous message handler.

The Query API can be initialized within the Stream API via DeltaStream instance:

stream.initialiseDeltaAPIService("<username:password>", null);

As the Public API service uses the Query Router, the initialization includes the initialization of the Query Router Service if not already initialized. The second parameter to the initialize call is a reference to a IQRConnectionListener providing callbacks for QRConnection and QRDisconnection events. This can be null if custom callbacks are not required for these events. Once initialized the available API can be retrieved by calling

Map<String, IDeltaAPI> apis = stream.getDeltaAPIService().getAPI();

This will return a Map<String, IDeltaAPI> of available and permissioned API in your environment. The key is the name for the API; associated with the alias value of the analytics within your Platform environment, and the value is an IDeltaAPI object representing the API which can be executed. The IDeltaAPI object is then used to build an IDeltaQuery via the IDeltaAPIService

IDeltaAPI api = apis.get("<aliasName>");
IDeltaQuery query = stream.getDeltaAPIService().createDeltaQuery(api);

At this point you have an IDeltaQuery framework object containing the API definition. The user must then apply parameterization to the IDeltaQuery object. This will depend on what the API is expecting as parameters. The IDeltaAPI will detail the expected parameters; as will the analytic definition in the Platform environment. For example to apply Dict input parameters:

Map<String, Object> params = new HashMap<String, Object>();
params.put("symList", new String[] { "EUR/USD" });
query.addParameterValues(params);

Alternatively, if the expected input is a Table, the parameters would be applied:

Map<String, Object> params = new HashMap<String, Object>();
params.put("col1", 1);
params.put("col2", new Integer[] {1, 2, 3});
query.addParameterValues(params);
params = new HashMap<String, Object>();
params.put("col1", 10);
params.put("col2", new Integer[] {99, 62, 33});
query.addParameterValues(params);

Once the IDeltaQuery object has been created, it can then be executed with the runQuery method API. This call will block until the query execution finishes or the call times out. The data returned will be as received from the q process; typically a Dict or a Flip object.

Object result = stream.getDeltaAPIService().runQuery(query, username, optParams);
parameter required description
query yes IDeltaQuery being executed
username yes user to run query as
optParams yes Map of optional parameters. Can be null

The optional parameters provided to the runQuery method allow customized processing within the Kx Platform query framework.

name type description
timeout int Override the default timeout for this request (in seconds)
clientTime timestamp Timestamp assigned by client when entering request. Used in QR timestamp logging
logCorr char[] Client assigned logging correlator
page int Number of rows to return (same behavior as sublist)
page int[] Two element list of start and number rows (same as sublist). If specified in this format, results will be returned in the Control paged format
cache boolean Whether to cache paged results
sort boolean Sort results as part of paging

When including optional parameters, provide a Map object as part of the method call:

Map<String, Object> optParams = 
    new HashMap<String, Object>();
optParams.put
    (
        IDeltaQueryRouter.OPTPARAM_LOG_CORRELATOR_KEY, 
        "SimpleQueryAPI".toCharArray()
    );
optParams.put(IDeltaQueryRouter.OPTPARAM_QUERYTIMEOUT_KEY, 10);
Object result = 
    stream.getDeltaAPIService().runQuery(query, "<username>", optParams);

For full details of the available optional parameters please refer to the Query Router section of the Process Template API documentation.

The available API can be refreshed programmatically by calling

stream.getDeltaAPIService().refresh();

Streaming Analytics API

The Stream for Kx Java API provides access to a Streaming Analytics service, which can be used to subscribe to and receive updates from a Streaming Analytic. The service can be accessed via a Platform deployment as follows:

stream.initialiseStreamingAnalyticsService("accessDetails");
IStreamingAnalyticsService streamingService = stream.getStreamingAnalyticsService();

To start streaming simply call startStreaming.

Subscription s = streamingService.startStreaming
    ("analyticName", parameters, "database", "messageHandler");

A number of parameters are required.

parameter description
analyticName Name of the streaming analytic as defined in the Platform
parameters Parameters as defined in the analytic within the Platform. The type of this object will depend on the parameter types expected by the analytic.
database Database where the Streaming Analytic should be run
messageHandler Handles incoming messages. Instance of ISubscriptionMessageListener. A IBinarySubscriptionMessageListener also available to return data as native kdb+ IPC byte array. This allows the data to be deserialized within the application.

To stop streaming, call the removeSubscription method.

streamingService.removeSubscription(s);

Messaging and data-discovery service

Messaging Server (DMS) support has been built into the Java Stream for API. This allows the user to publish data to Platform processes without having to set up destinations or point-to-point links. The user must only connect to the DMS service and register what it will publish, and begin publishing. Likewise, the Java Stream API can subscribe to data from their Platform environment by registering interest in what is being published.

For both publishing and subscribing, it is necessary to initialize the DeltaStream instance, and then set up the DMS instance.

DeltaMessagingServer dms = 
    new DeltaMessagingServer(stream, "dmsConfigName", "userPass");
dms.start();
parameter description
stream Instance of DeltaStream
dmsConfigName Name of the Messaging Server configuration. Stream default is DS_MESSAGING_SERVER:DS
userPass Access details ':' separated

Publishing

To publish via DMS; it is necessary to create a class implementing the DeltaMessagingPublisher interface; this allows it to listen for incoming subscription (and un-subscription) requests.

DeltaMessagingPublisher dmsPublisher = new DeltaMessagingPublisher() 
{
    List<DeltaMessagingSubscription> subscriptions = new ArrayList< >();

    public void subscriptionRemoved
        (String table, DeltaMessagingSubscription subscription) 
        {subscriptions.add(subscription);}

    public void subscriptionAdded
        (String table, DeltaMessagingSubscription subscription) 
        {subscriptions.remove(subscription);}
};

Then, the instance should register this entity publisher with a channel, topic, and if applicable any filters which may be required. The channel and topic would be used to match a subscriber in your environment interested in the data.

HashMap<String, String> filterCols = new HashMap<String, String>();
filterCols.put("src", "GS.GS");
dms.registerPublisher(dmsPublisher, "channelName","topicName", filterCols);

Publishing would be achieved by using the defined DeltaMessageSubscriptions and publishing to each.

// Publish a Dict
Object[] record = {
    "EUR/USD", 
    "GS.GS", 
    "0", 
    currentTS, 
    Double.valueOf(1.3689), 
    Double.valueOf(1000), 
    NA, 
    NA, 
    currentTS, 
    NA, 
    'A', 
    Double.valueOf(0), 
    "EUR", 
    new java.sql.Date(currenttime.getTime()), 
    currentTS 
};
Dict data = new Dict(cols, record);

for(DeltaMessagingSubscription s : subscriptions){
    s.publish(topic, data);
}

// Publishing a Flip
Object[] records = {
    new String[] { "EUR/USD", "EUR/USD" },
    new String[] { "GS.GS", "GS.GS" },
    new String[] { "0", "1" },
    new Timestamp[] { currentTS, currentTS },
    new Double[] { Double.valueOf(1.3689), Double.valueOf(1.36339) },
    new Double[] { Double.valueOf(1000), Double.valueOf(2000) },
    new String[] { NA, NA },
    new String[] { NA, NA },
    new Timestamp[] { currentTS, currentTS },
    new String[] { NA, NA },
    new Object[] { 'A', 'B' },
    new Double[] { Double.valueOf(0), Double.valueOf(0) },
    new String[] { "EUR", "GBP" },
    new java.sql.Date[] { 
        new java.sql.Date(currenttime.getTime()),
        new java.sql.Date(currenttime.getTime()) 
    },
    new Timestamp[] { currentTS, currentTS }
};
Flip fData = new Flip(new Dict(cols, records));
for(DeltaMessagingSubscription s : subscriptions){s.publish(topic, data); }

Subscribing

To subscribe via DMS, you should create a class implementing DeltaMessagingSubscriber. This enforces the need for a upd function to accept the incoming data for processing. This will contain the topic and a Flip of the data. Like publication the DMS instance should be registered as a subscriber with a channel, topic and if applicable any filters.

dms.registerSubscriber(new DeltaMessagingSubscriber() {

    @Override
    public void upd(String baseTopic, Flip data) {
        //do Work
    }
}, channel, topic, filterCols);

Once done, that’s it, your DeltaMessagingSubscriber upd method will receive any updates which you’ve subscribed to and are publishing.

System configuration options

When running the application it is possible to paramterise the Java Stream API to help tune the performance and memory utilization via system properties.

These should be passed in to the JVM like any system property. For example

java -jar JavaStreamClient.jar -Dmessaging.remotesub.queuesize=50000

Messaging Service configuration

property default description
messaging.remotesub.queuesize Integer.MAX Size of the Remote Subscriber connection publish queue. Entries above this length will block the publishing thread until room becomes available in the queue
messaging.remotesub.queuemode BLOCK Queuing mode, BLOCK will block the publish thread until successfully queued; TIMEOUT will attempt to queue the message for publication and timeout depending on the messaging.remotesub.queuetimeoutus system property
messaging.remotesub.queuetimeoutus 0 Queue timeout when in TIMEOUT queue mode. Defaults to 0 which will BLOCK until queued
messaging.remoteconn.reconnectratems 30000 Time in millisecs to wait between reconnection attempts for Remote Connections (publishers and subscriber)
messaging.server.reconnectratems 2000 Time in millisecs to wait between reconnection attempts to the Messaging Server
messaging.server.connecttimeoutms 10000 Time in millisecs to timeout the attempt to reconnect to the Messaging Server and initiate failover proceudres to a secondary Messaging Server
messaging.server.heartbeatintervals 30 Heartbeat interval in seconds. If no response in 1.5 * interval client will doconnect and begin reconnection attempts

Query Service configuration

Property Default Description
queryservice.reconnecting.ratesec 30 Rate of reconnection attempts to underlying Query Service processes
queryservice.connectedqpsonly false Whether to only allow queries to be assigned to currently running and connected Query Processors
queryservice.heartbeat.ratesec 10 Heartbeat interval in seconds. If no response in 1.5 * interval client will disconnect and being reconnection attempts
queryservice.pubthreadpoolsize.ratesec 32 Number of publish threads which spawn from the Query Processor threads to publish to clients

Query Manager configuration

Property Default Description
querymanager.heartbeat.ratesec 10 Heartbeat interval in seconds. If no response in 1.5 * interval client will disconnect and being reconnection attempts
querymanager.reconnect.intervalsec 30 Reconnect rate in seconds