KX Stream Java API
The KX Stream Java API offers programmable access via the Java programming language to core features available as part of KX Delta Platform. This API extends the core KX Java Fusion interface providing structured access core services only available through the KX Delta Platform. This includes features such as:
- Configuration management
- Tickerplant interface
- Query Router service
- Public Query API service
- Streaming Analytics API
- Messaging and data discovery service
Developing with the KX Stream Java API
To build your own Java application using the KX Stream API, you require the base deltaj
libraries and a number of third-party libraries, all available in the DeltaJAPI.tgz
. This package is available to KX Stream users; please contact KX for download details. The sample test client source and Java doc is also available as part of the install. This will detail and highlight all main features of the KX Stream Java API.
To integrate with the KX Stream Java API, your Java application will have to import the libraries from the DeltaJAPIEval_<version\>.tgz
to the project classpath.
Using Eclipse as an example, the steps to set up an application from scratch would be:
-
New Eclipse project
-
Create
lib
folder, copy in dependencyjar
files fromDeltaJAPIEval.tgz
and add to the classpath. Libraries are located underDeltaJAPIEval_<version\>/app/deltaj-testclient-<version\>/lib
-
Add Stream API resources to project.
- These include Stream API JavaDoc and a Stream Test Client library with source.
- Libraries are located under
DeltaJAPIEval_<version\>/app/deltaj-testclient-<version\>/deltaStreamAPIResources
- Copy these files to the
lib
directory and link as highlighted in the images below.
Creating a KX Stream instance
The main entry point to the Java KX Stream API is an instance of DeltaStream
. As the features available may extend beyond what the application requires, the configuration of this instance will be important to ensure optimal performance. Full details of the initialization and parameterization is available in the JavaDocs packaged with the DeltaJAPIEval.tgz
.
A creation of a single DeltaStream
instance can be achieved by:
List<HostAndPort> hostAndPorts = new ArrayList<HostAndPort>();
hostAndPorts.add(new HostAndPort(primaryHost, primaryPort, useTLS));
hostAndPorts.add(new HostAndPort(secondaryHost, secondaryPort, useTLS));
DeltaStream stream = DeltaStreamFactory.newDeltaStream();
stream.initialiseDeltaControl(hostAndPorts, instanceName, instanceId, exclusive,
loginString, retryMaxTimeOutPeriod, retryTimeoutStep, encrypted);
Initialization errors and disconnections
The KX Stream API can be configured to throw back to the client when initialization fails within a given timeout value. When configured the initialiseDeltaControl
method will throw a DeltaStreamException
and post initialization connectivity issues are handled via the provided IBreakableServiceListener
callback method:
IBreakableServiceListener<DeltaStream> eventListener = new IBreakableServiceListener<DeltaStream>() {
@Override
public void accept(Event<IBreakableService.EventType, DeltaStream> event) {
if (event.getEventType() == IBreakableService.EventType.BROKEN) {
LOG.error("Disconnected from KX Delta Platform");
// Disconnected handling logic
}
}
};
List<HostAndPort> hostAndPorts = new ArrayList<HostAndPort>();
hostAndPorts.add(new HostAndPort(primaryHost, primaryPort, useTLS));
hostAndPorts.add(new HostAndPort(secondaryHost, secondaryPort, useTLS));
DeltaStream stream = DeltaStreamFactory.newDeltaStream();
try {
stream.initialiseDeltaControl(hostAndPorts, instanceName, instanceId, exclusive, loginString,
retryMaxTimeOutPeriod, retryTimeoutStep, encrypted, failConnectTimeout, eventListener);
} catch (IOException ioe) {
LOG.error("Failed to initialise KX Stream API", ioe);
}
The listener should be passed as the parameter eventListener
of initialiseDeltaControl()
, and the accept
method of the listener will be invoked if the BROKEN
event occurs. The BROKEN
event means the KX Delta Platform is unexpectedly unavailable. E.g. we've lost connectivity to the server and attempts to recover have failed within the specified failConnectTimeout
. In this event, the close
method is invoked automatically; the listener need only implement any application logic to handle this state.
Currently, BROKEN
is the only supported event type within the KX Stream API.
Initialization parameter details
parameter | description |
---|---|
hostAndPorts | List of KX Control host connection details |
instanceName | Instance name. Should relate to the application |
instanceId | If started from control; this should be applied here |
exclusive | Can duplicate instances of this run |
loginString | Authentication details username:password formatted |
retryMaxTimeOutPeriod | Time in milliseconds before next reconnect attempt if previous connection attempt failed |
retryTimeoutStep | Deprecated, not used |
encrypted | Whether the loginString should be encrypted or not |
failConnectTimeout | Timeout in seconds for connecting and reconnecting, 0 means no timeout and the BROKEN event is never triggered |
eventListener | Listener for BROKEN event, cannot be null when failConnectTimeout is greater than 0 |
Closing a KX Stream instance
Once a KX Stream API instance is no longer required, call the close
method to close connections and release resources. The services made available by KX Stream API will also be closed.
stream.close();
Calling close()
on an instance that is already closed is a no-op.
Available services
Once an instance of DeltaStream
has been created a number of services are available to interact with KX Delta Platform deploy. Each has a Demo class within the Stream Test Client as an example of available functionality
service | demo class | description |
---|---|---|
Configuration Management | DeltaStreamDemo | Allows interaction with Control configuration data |
Tickerplant interface | DeltaStreamDemoTPPublish | Publish Data from a Java application to a running Tickerplant |
Query Router Service | DeltaStreamDemoQR | Execute async queries aganst a running KX Delta Platform environment |
Public API Service | DeltaStreamDemoAPI | Execute public API functions via the Java Stream API |
Streaming Analytics API | DeltaStreamDemoStreamingAnalytics | Receive Streaming analytic updates from the KX Delta Platform |
Messaging and data discovery service | DeltaStreamDemoDMS | Integrate with the KX Delta Platform Messaging Service |
Once the instance of DeltaStream
has been closed, expectedly or unexpectedly, all these services are also closed and unavailable.
Configuration management
An instance of DeltaStream
will have a ConfigurationManager
associated with it. The ConfigurationManager
has a wide array of available functionality, which is fully detailed in the Java doc. Below are a few examples of what is available.
// Retrieves a config Parameter.
// Also available getFlip(...) to return the data as a c.Flip entity
List<Map<String, Object>> configParam =
stream.getConfigurationManager().getMapList("<configName>", "<overrideName>");
// Retrieves all overrides for a parameter
String[] overrides = stream.getConfigurationManager().getOverrides("<configname>");
// Returns the connection details for a connection entity
DeltaQConnectionDetails connection =
stream.getConfigurationManager().getConnectionDetails("<connectionName>");
// Returns the schema details by schema tab
DeltaSchema schema = stream.getConfigurationManager().getSchema("<schemaName>");
// Returns a list of schema part of a schema group
String[] schemas =
stream.getConfigurationManager().getSchemasForGroup("<schemaGroupName>");
// Returns the Process Template associated with an Instance by name
stream.getConfigurationManager().getProcessTemplateName("<instanceName>");
// Returns the instance parameter values associated with a Process Instance
Map<String, Object> instanceParameters =
stream.getConfigurationManager().getProcessInstanceParamValues("<instanceName>");
// Looks up task info for a given task name
Map<String, Object> taskInfo =
stream.getConfigurationManager().getTaskInfo("<taskName>");
The Stream API framework ConfigurationManager
can also be used to listen for configuration updates from KX Delta Platform, allowing the implementing application to dynamically update behaviors based on config data.
// Listen to updates on Configuration from Control
stream.getConfigurationManager().addConfigurationManagerListener
(
"DASHconfigBundle", "DEFAULT", new ConfigurationUpdateListener()
{
@Override
public void onConfigUpdateMapList
(
String configName,
String override,
List<Map<String,
Object>> data
)
{
System.out.println
(
"onConfigUpdateMapList: " + configName + ":" +
override + " updated"
);
}
@Override
public void onConfigUpdatePropList
(
String configName, String override, List<Properties> data
)
{}
@Override
public void onConfigUpdate
(
String configName, String override, String[] data
)
{}
@Override
public void onConfigUpdate
(
String configName, String override, Properties data
)
{}
@Override
public void onConfigUpdate
(
String configName, String override, Flip data
)
{}
@Override
public void onConfigUpdate
(
String configName, String override
)
{}
},
UpdateMode.MapList
);
The UpdateMode
allows different data structures to be provided to the listener for ease of use within the application. See JavaDoc for more information.
Tickerplant interface
The Java Stream API allows data to be published directly to tickerplants defined in the application. This allows data to be published in various formats through the stream instance
DeltaStreamTickerPlantDetails tpDetails =
new DeltaStreamTickerPlantDetails
(
pub_instance_name, pub_host, pub_port, pub_login, tlsConnection,
pub_schema_group, pub_cmd, pub_data_format, pub_timeout_intv,
pub_timeout_intv_step, pub_recovery_file, pub_recovery_intv,
pub_recovery_batchsize, pub_recovery_clearOnStartup,
pub_recovery_maxkb, pub_heatbeat_type, pub_heartbeat_intv,
sub_schema, sub_cmd
);
stream.addTickerPlantDestination(tpDetails);
A DeltaStreamTickerPlantDetails
instance has numerous parameters which control the connectivity and recovery options available
pub_instance_name Tickerplant instance name
pub_host Tickerplant hostname
pub_port Tickerplant port
pub_login Tickerplant user authentication details
tlsConnection Tickerplant TLS connection on/off/both
pub_schema_group Group containing schemas to load into publisher
pub_cmd Command used to publish data
Should use DeltaStreamConnection.DSTR_UPD_ARRAY
pub_data_format Format of data to publish to tickerplant
pub_timeout_intv Maximum time in ms to wait between reconnect attempts
pub_timeout_intv_step Step growth size in ms to each retry attempt,
up to pub_timeout_intv
pub_recovery_file Filename for recovery files
pub_recovery_intv Delay between executions of the recovery process
pub_recovery_batchsize Max records sent by recovery process
to tickerplant in a single cycle
pub_recovery_clearOnStartup If true recovery process will ignore
all pre-existing files
pub_recovery_maxkb Max size recovery file will be allowed to grow (KB)
pub_heatbeat_type Type of heartbeating
pub_heartbeat_intv How often to heartbeat
sub_schema List of tables the tickerplant should
listen for updates on
sub_cmd Command used to register for incoming messages
Publish formats
format | Java accessor | data published as |
---|---|---|
Flip | DeltaQConnection.PUBLISH_FORMAT_FLIP | Flip |
Array | DeltaQConnection.PUBLISH_FORMAT_ARRAY | Array |
Dict | DeltaQConnection.PUBLISH_FORMAT_DICT | Dict |
Heartbeat socket types
heartbeat | java accessor | description |
---|---|---|
DATA_SOCKET | HeartBeatSocket.DATA_SOCKET | Send heartbeats over existing command socket connection |
HEARTBEAT_SOCKET | HeartBeatSocket.HEARTBEAT_SOCKET | Send heartbeat over dedicated connection |
DISABLED_HEARTBEAT | HeartBeatSocket.DISABLED_HEARTBEAT | Don’t heartbeat |
There are a number of customizations available into how the Tickerplant publish service operates. Full details are in the JavaDoc. A number of useful ones are detailed here. Any changes to the tickerplant configuration made after initialization will be ignored.
setTPTimeColBlocked
When publishing to a defined schema the Stream API will null all columns for a schema being published to. This is to ensure compatibility with the tickerplant. However it is often desirable to block the publication of a time column, allowing the tickerplant to populate this column as the time it received the message. In the Java Stream API this is achieved by calling the setTPTimeColBlocked
method:
deltaStream.setTPTimeColBlocked(true);
addAPIListener
If the application wants to receive updates when a tickerplant disconnects and reconnects there is a listener available to do so.
deltaStream.addAPIListener(new DeltaStreamAPIListener() {
@Override
public void onTPDisconnected(String source) {
System.out.println("TP has disconnected " + source);
}
@Override
public void onTPConnected(String source) {
System.out.println("TP has connected " + source);
}
@Override
public Object onIncomingSync(String source, Object data) {
return null;
}
@Override
public void onIncomingAsync(String source, Object data) {
System.out.println("Incoming async " + source);
}
});
Query router service
The Query Router allows for advanced query execution within KX Delta Platform. If set up, it can be used to load-balance queries through routed queries across data warehouse clusters and provide Quality of Service (QoS) based query execution providing different priority for different queries. Please review KX Delta Platform documentation for further information on the Query Router.
The Query Router can be initialized within the KX Stream API through the DeltaStream instance. For example:
deltaStream.initialiseDeltaQueryRouterService
(deltaQueryRouter, name, userPass, connectionListener);
parameter | required | description |
---|---|---|
deltaQueryRouter | no | Allows the user to pass in an already created instance of a IDeltaQueryRouter . If null, one will be created based on configuration in KX Control. |
name | yes | Unique name for the QR Service, used to identify query sources in QR logs. |
userPass | yes | Authentication details for connecting to the Query Router. |
connectionListener | no | Allows the application to provide an IQRConnectionListener instance to be informed of connection/disconnection events from the Query Router. Note that in the case of a sharded QR, the listener receives events for the DEFAULT shard only. |
Once initialized, queries can be executed against the Query Router. There are a number of query-execution APIs available to the Query Router service.
Request/response query
A request to execute a query can be submitted to the Query Router in the format
deltaStream.getDeltaQueryRouterService().runQuery
(queryObj, user, database, optParams, messageHandler);
parameter | required | description |
---|---|---|
queryObj | yes | The query object which is being executed. |
user | yes | User executing the query. The user who authenticated the connections does not have to be the same user executing the query. |
database | yes | The connection group where the query should be executed. (This may be overridden in the cause of routed queries). |
optParams | yes | A Dictionary containing optional parameters for the query. Can be empty |
messageHandler | yes | The IQRMessageHandler handler instance which will handle the results/exception. A IQRBinaryMessageHandler is also available. This will return the data in native IPC binary format allowing the application to process the response manually |
This will execute the query asynchronously and the result provided by the messageHandler
. The format of the queryObj
is as per defined in the KX Fusion interface. For example
Object queryObj = "2+2";
queryObj = new Object[] { '+', 2, 2 };
KX Delta Platform adds additional secure-handling checks depending on the executing users and configuration of the environment. In such cases string queries, or functions the the calling user is unpermissioned for, will fail with a suitable error.
Polled queries
The Query Router also supports polled queries where a query can be submitted to the Query Router and the Query Router will execute that query on a timer and publish results back until the client disconnects or a stopPolledQuery
is requested.
String runId = deltaStream.getDeltaQueryRouterService().startPolledQuery
(queryObj, user, frequency, database, optParams, messageHandler);
parameter | required | description |
---|---|---|
queryObj | yes | The query object which is being executed. |
user | yes | User executing the query. The User who authenticated the connections does not have to be the same user who is is executing the query. |
frequency | yes | How often to the QR should execute the query in millisecs |
database | yes | The connection; connection group where the query should be executed. (This may be overridden in the cause of routed queries). |
optParams | yes | A Dictionary containing optional parameters for the query. Can be empty |
messageHandler | yes | The IQRMessageHandler handler instance which will handle the results/exception. This will return the data in native IPC binary format allowing the application to process the response manually |
Polled Queries are stopped via the API.
deltaStream.getDeltaQueryRouterService().stopPolledQueries
(runIds, user, messageHandler);
parameter | required | description |
---|---|---|
runIds | yes | A String[] of runIds to stop |
user | yes | User requesting the stop. |
messageHandler | yes | The IQRStopPollMessageHandler handler instance which will handle return of the request |
Public query API
KX Stream API provides a public API service which can be defined and configured in your KX Delta Platform environment. These are analytic functions which define input parameters; optional return parameters, and execute functions based on the analytic definition through the Query Router service. Unlike the Query API interface these queries are synchronous rather than through a callback asynchronous message handler.
The Query API can be initialized within the Stream API via DeltaStream instance:
stream.initialiseDeltaAPIService("<username:password>", null);
As the Public API service uses the Query Router, the initialization includes the initialization of the Query Router Service if not already initialized. The second parameter to the initialize call is a reference to a IQRConnectionListener
providing callbacks for QRConnection and QRDisconnection events. This can be null if custom callbacks are not required for these events. Once initialized the available API can be retrieved by calling
Map<String, IDeltaAPI> apis = stream.getDeltaAPIService().getAPI();
This will return a Map<String, IDeltaAPI>
of available and permissioned API in your environment. The key is the name for the API; associated with the alias
value of the analytics within your KX Delta Platform environment, and the value is an IDeltaAPI
object representing the API which can be executed. The IDeltaAPI
object is then used to build an IDeltaQuery
via the IDeltaAPIService
IDeltaAPI api = apis.get("<aliasName>");
IDeltaQuery query = stream.getDeltaAPIService().createDeltaQuery(api);
At this point you have an IDeltaQuery
framework object containing the API definition. The user must then apply parameterization to the IDeltaQuery
object. This will depend on what the API is expecting as parameters. The IDeltaAPI
will detail the expected parameters; as will the analytic definition in the KX Delta Platform environment.
For example to apply Dict input parameters:
Map<String, Object> params = new HashMap<String, Object>();
params.put("symList", new String[] { "EUR/USD" });
query.addParameterValues(params);
Alternatively, if the expected input is a Table, the parameters would be applied:
Map<String, Object> params = new HashMap<String, Object>();
params.put("col1", 1);
params.put("col2", new Integer[] {1, 2, 3});
query.addParameterValues(params);
params = new HashMap<String, Object>();
params.put("col1", 10);
params.put("col2", new Integer[] {99, 62, 33});
query.addParameterValues(params);
Once the IDeltaQuery
object has been created, it can then be executed with the runQuery
method API. This call will block until the query execution finishes or the call times out. The data returned will be as received from the q process; typically a Dict or a Flip object.
Object result = stream.getDeltaAPIService().runQuery(query, username, optParams);
parameter | required | description |
---|---|---|
query | yes | IDeltaQuery being executed |
username | yes | user to run query as |
optParams | yes | Map of optional parameters. Can be null |
The optional parameters provided to the runQuery
method allow customized processing within the KX Delta Platform query framework.
name | type | description |
---|---|---|
timeout | int | Override the default timeout for this request (in seconds) |
clientTime | timestamp | Timestamp assigned by client when entering request. Used in QR timestamp logging |
logCorr | char[] | Client assigned logging correlator |
page | int | Number of rows to return (same behavior as sublist) |
page | int[] | Two element list of start and number rows (same as sublist). If specified in this format, results will be returned in the Control paged format |
cache | boolean | Whether to cache paged results |
sort | boolean | Sort results as part of paging |
When including optional parameters, provide a Map object as part of the method call:
Map<String, Object> optParams =
new HashMap<String, Object>();
optParams.put
(
IDeltaQueryRouter.OPTPARAM_LOG_CORRELATOR_KEY,
"SimpleQueryAPI".toCharArray()
);
optParams.put(IDeltaQueryRouter.OPTPARAM_QUERYTIMEOUT_KEY, 10);
Object result =
stream.getDeltaAPIService().runQuery(query, "<username>", optParams);
For full details of the available optional parameters please refer to the Query Router section of the Process Template API documentation.
The available API can be refreshed programmatically by calling
stream.getDeltaAPIService().refresh();
Streaming Analytics API
The Stream for KX Java API provides access to a Streaming Analytics service, which can be used to subscribe to and receive updates from a Streaming Analytic. The service can be accessed via a KX Delta Platform deployment as follows:
stream.initialiseStreamingAnalyticsService("accessDetails");
IStreamingAnalyticsService streamingService = stream.getStreamingAnalyticsService();
To start streaming simply call startStreaming
.
Subscription s = streamingService.startStreaming
("analyticName", parameters, "database", "messageHandler");
A number of parameters are required.
parameter | description |
---|---|
analyticName | Name of the streaming analytic as defined in KX Delta Platform |
parameters | Parameters as defined in the analytic within KX Delta Platform. The type of this object will depend on the parameter types expected by the analytic. |
database | Database where the Streaming Analytic should be run |
messageHandler | Handles incoming messages. Instance of ISubscriptionMessageListener . A IBinarySubscriptionMessageListener also available to return data as native kdb+ IPC byte array. This allows the data to be deserialized within the application. |
To stop streaming, call the removeSubscription
method.
streamingService.removeSubscription(s);
Messaging and data-discovery service
Messaging Server (DMS) support has been built into the Java Stream for API. This allows the user to publish data to KX Delta Platform processes without having to set up destinations or point-to-point links. The user must only connect to the DMS service and register what it will publish, and begin publishing. Likewise, the Java Stream API can subscribe to data from their KX Delta Platform environment by registering interest in what is being published.
For both publishing and subscribing, it is necessary to initialize the DeltaStream instance, and then set up the DMS instance.
DeltaMessagingServer dms =
new DeltaMessagingServer(stream, "dmsConfigName", "userPass");
dms.start();
parameter | description |
---|---|
stream | Instance of DeltaStream |
dmsConfigName | Name of the Messaging Server configuration. Stream default is DS_MESSAGING_SERVER:DS |
userPass | Access details ':' separated |
Publishing
To publish via DMS; it is necessary to create a class implementing the DeltaMessagingPublisher
interface; this allows it to listen for incoming subscription (and un-subscription) requests.
DeltaMessagingPublisher dmsPublisher = new DeltaMessagingPublisher()
{
List<DeltaMessagingSubscription> subscriptions = new ArrayList< >();
public void subscriptionRemoved
(String table, DeltaMessagingSubscription subscription)
{subscriptions.add(subscription);}
public void subscriptionAdded
(String table, DeltaMessagingSubscription subscription)
{subscriptions.remove(subscription);}
};
Then, the instance should register this entity publisher with a channel, topic, and if applicable any filters which may be required. The channel and topic would be used to match a subscriber in your environment interested in the data.
HashMap<String, String> filterCols = new HashMap<String, String>();
filterCols.put("src", "GS.GS");
dms.registerPublisher(dmsPublisher, "channelName","topicName", filterCols);
Publishing would be achieved by using the defined DeltaMessageSubscriptions
and publishing to each.
// Publish a Dict
Object[] record = {
"EUR/USD",
"GS.GS",
"0",
currentTS,
Double.valueOf(1.3689),
Double.valueOf(1000),
NA,
NA,
currentTS,
NA,
'A',
Double.valueOf(0),
"EUR",
new java.sql.Date(currenttime.getTime()),
currentTS
};
Dict data = new Dict(cols, record);
for(DeltaMessagingSubscription s : subscriptions){
s.publish(topic, data);
}
// Publishing a Flip
Object[] records = {
new String[] { "EUR/USD", "EUR/USD" },
new String[] { "GS.GS", "GS.GS" },
new String[] { "0", "1" },
new Timestamp[] { currentTS, currentTS },
new Double[] { Double.valueOf(1.3689), Double.valueOf(1.36339) },
new Double[] { Double.valueOf(1000), Double.valueOf(2000) },
new String[] { NA, NA },
new String[] { NA, NA },
new Timestamp[] { currentTS, currentTS },
new String[] { NA, NA },
new Object[] { 'A', 'B' },
new Double[] { Double.valueOf(0), Double.valueOf(0) },
new String[] { "EUR", "GBP" },
new java.sql.Date[] {
new java.sql.Date(currenttime.getTime()),
new java.sql.Date(currenttime.getTime())
},
new Timestamp[] { currentTS, currentTS }
};
Flip fData = new Flip(new Dict(cols, records));
for(DeltaMessagingSubscription s : subscriptions){s.publish(topic, data); }
Subscribing
To subscribe via DMS, you should create a class implementing DeltaMessagingSubscriber
. This enforces the need for a upd
function to accept the incoming data for processing. This will contain the topic and a Flip of the data. Like publication the DMS instance should be registered as a subscriber with a channel, topic and if applicable any filters.
dms.registerSubscriber(new DeltaMessagingSubscriber() {
@Override
public void upd(String baseTopic, Flip data) {
//do Work
}
}, channel, topic, filterCols);
Once done, that’s it, your DeltaMessagingSubscriber
upd
method will receive any updates which you’ve subscribed to and are publishing.
System configuration options
When running the application it is possible to parameterise the Java Stream API to help tune the performance and memory utilization via system properties.
These should be passed in to the JVM like any system property. For example
java -jar JavaStreamClient.jar -Dmessaging.remotesub.queuesize=50000
Messaging service configuration
property | default | description |
---|---|---|
messaging.remotesub.queuesize | Integer.MAX | Size of the Remote Subscriber connection publish queue. Entries above this length will block the publishing thread until room becomes available in the queue |
messaging.remotesub.queuemode | BLOCK | Queuing mode, BLOCK will block the publish thread until successfully queued; TIMEOUT will attempt to queue the message for publication and timeout depending on the messaging.remotesub.queuetimeoutus system property |
messaging.remotesub.queuetimeoutus | 0 | Queue timeout when in TIMEOUT queue mode. Defaults to 0 which will BLOCK until queued |
messaging.remoteconn.reconnectratems | 30000 | Time in millisecs to wait between reconnection attempts for Remote Connections (publishers and subscribers) |
messaging.remoteconn.tcpnodelay | true | TCP no delay configuration for connections to Remote Connections (publishers and subscribers) |
messaging.remoteconn.sendbuffersize | JDK default | Socket buffer size for Remote Connections (publishers and subscribers) |
messaging.server.reconnectratems | 2000 | Time in millisecs to wait between reconnection attempts to the Messaging Server |
messaging.server.connecttimeoutms | 10000 | Time in millisecs to timeout the attempt to reconnect to the Messaging Server and initiate failover proceudres to a secondary Messaging Server |
messaging.server.heartbeatintervals | 30 | Heartbeat interval in seconds. If no response in 1.5 * interval client will doconnect and begin reconnection attempts |
Query service configuration
Property | Default | Description |
---|---|---|
queryservice.reconnecting.ratesec | 30 | Rate of reconnection attempts to underlying Query Service processes |
queryservice.connectedqpsonly | false | Whether to only allow queries to be assigned to currently running and connected Query Processors |
queryservice.heartbeat.ratesec | 10 | Heartbeat interval in seconds. If no response in 1.5 * interval client will disconnect and being reconnection attempts |
queryservice.pubthreadpoolsize | 32 | Number of publish threads which spawn from the Query Processor threads to publish to clients |
queryservice.parallelqpconnect | false | Initiate the QP (re)connectivity on parallel threads up to the maximum number of unconnected QPs. This utilises a cached thread pool and thus works best when dealing with a reasonable number of QP connections. |
Query manager configuration
Property | Default | Description |
---|---|---|
querymanager.heartbeat.ratesec | 10 | Heartbeat interval in seconds. If no response in 1.5 * interval client will disconnect and being reconnection attempts |
querymanager.reconnect.intervalsec | 30 | Reconnect rate in seconds |