Skip to content

Using the kdb Insights Java interface

The Java interface allows you to publish data to RT and query data from kdb Insights Enterprise from your own application.

This section details how an experienced java developer would get started with the Java interface. For examples on how to use it to publish data into kdb Insights Enterprise or kdb Insights Reliable Transport or see documentation on the sample program.

Downloading the Java interface

The Java Interface is typically installed into a project via maven or gradle, using the following.

Maven:

Maven:

    <repositories>
        ...
        <repository>
            <id>kx-maven</id>
            <url>https://nexus.dl.kx.com/repository/kxi-mvn-public</url>
        </repository>
        ...
    </repositories>
        ...
        </dependencies>
        ...
            <dependency>
            <groupId>com.kx</groupId>
            <artifactId>kxi-java-sdk</artifactId>
            <version>1.6.0</version>
        </dependency>
        ...
    </dependencies>

Gradle:

repositories {
...
    maven {
        url "https://nexus.dl.kx.com/repository/kxi-mvn-public"
    }
...
}

dependencies {
...
    implementation "com.kx:kxi-java-sdk:1.6.0"
...
}

If you need to add the java SDK to to your project manually you can download the Java interface from the kdb Insights Maven Nexus registry. The sample program for the SDK is also available from Nexus.

Note

The interface is only supported on Linux running on x86 architectures. We currently do not support Windows or OSX.

The Java interface zip file contains everything required to use the Java interface, you can download the file and unzip.

Environment variables

The Java interface can be configured through environment variables.

Ingest and Query

variable required description
KXI_CONFIG_URL Mandatory The URL or file that this program calls or reads to find the endpoint(s) it needs to connect to
SDK_CLIENT_UID Mandatory An access token that the user can obtain following the steps here
RT_FILE_LOGLEV Optional Sets the level of logging to file (0-6)
RT_CONSOLE_LOGLEV Optional Sets the level of logging to the console (0-6)
RT_DEBUG Optional (deprecated). Setting this to anything other than "0" is the same as setting RT_FILE_LOGLEV=3. Will be ignored if RT_FILE_LOGLEV is set.

Using code instead of environment variables

Much of this configuration can also be performed through code. For example, if you create an instance of StreamingClientFactory with a ConfigRequestor rather than using the default constructor, KXI_CONFIG_URL will be ignored.

KXI_CONFIG_URL

The KXI_CONFIG_URL variable points to a URL or file:

  • If you are relying on the Information Service to determine the endpoint(s) to connect to, then a URL in the form of https://{INSIGHTS_HOSTNAME}/informationservice/details/{SDK_CLIENT_UID} should be used.
  • If you are relying on a file to determine the endpoint(s), then a helper function FileConfigRequestor plus a json file can be provided.

An example of using an URL or a file are provided here.

Levels of Logging

The logging levels that you can set are as follows:

level description
0 NONE
1 FATAL
2 ERROR
3 WARN
4 INFO
5 DEBUG
6 TRACE

Additional variables for Ingest

variable required description
RT_REP_DIR Mandatory Path where the replicator can be extracted to. It must be possible to execute a program from this location.
RT_LOG_PATH Mandatory The location where the RT message streams will be written to
RT_NO_TRUNCATE Optional If this is set to anything other than "0" log truncation will be disabled

Additional variables for Query

variable required description
KXI_QUERY_TOKEN Conditional A bearer token used for authentication when making queries. If this is set then KXI_QUERY_USER and KXI_QUERY_SECRET are ignored.
KXI_QUERY_USER Conditional The keycloak client that will be used to generate bearer tokens to log in for query
KXI_QUERY_SECRET Conditional The keycloak client secret that will be used to generate bearer tokens for query

Pre-requisites

If you wish to use the interface in the same cluster as kdb Insights Enterprise there is no need for the Information Service and a configuration file is used to provide the endpoints and certificates .

  • Download the assembly sdk_sample_assembly following the instructions here.
  • Make sure that the assembly is deployed in your kdb Insights Enterprise instance.
  • Ensure you have an authenticated kdb Insights Enterprise client URL.
  • Make sure that the kdb Insights Enterprise ingest endpoints (as defined by the KXI_CONFIG_URL) are accessible.
  • Make sure that the _kdb Insights Reliable Transport" ingest endpoints are accessible.

Initializing

Before publishing any data, you need to start a new RtClient object in Java:

You can pass the configuration URL directly into the constructor as follows:

RtClient client = new RtClient("$KXI_CONFIG_URL");
client.start();

Or you can use the StreamingClientFactory and an HttpsConfigRequestor to pull the configuration URL from the KXI_CONFIG_URL environment variable:

StreamingClientFactory scf = new StreamingClientFactory(new HttpsConfigRequestor());
StreamingClient client = scf.getStreamingClient();
client.start()

You can use a FileConfigRequestor to provide the configuration from a file. When using the StreamingClientFactory, this is done as follows:

ConfigParser parser = new ConfigParser();
RtClient client = new RtClient(new FileConfigRequestor("~/config.json", parser));
client.start();

"~/config.json" will need to contain the configuration in the same format as it is passed back from kdb Insights Enterprise Information Service, but with some important differences:

  • The "useSslRt" top level key needs to be set to false
  • The "ca", "key" and "cert" are no longer needed and will be ignored if provided.
  • You will need to provide the internal hostnames and non-ssl RT port numbers (typically 5002) under the "insert" key

Here is an example of how the config file should look:

{
 "useSslRt":false,
 "name":"dgm3-service",
 "topics":{
  "insert":"sdk-sample-assembly",
  "query":"requests"},
 "insert":{
  "insert":[":rt-sdk-sample-assembly-north-0:5002",":rt-sdk-sample-assembly-north-1:5002",":rt-sdk-sample-assembly-north-2:5002"],
  "query":[]},
 "query":[":34.66.95.116:6050"]}

!!!note The KXI_CONFIG_URL environment variable will be ignored if you run the interface in this way.

Handling changing hostnames

Each publisher has a session name associated with it, to ensure it is uniquely identifiable by RT. This is physically represented within the Java interface as the directory messages are written to before being replicated into RT.

By default the Java interface sets the session name (and hence the message directory) to the following: SDK_CLIENT_UID.streamid.hostname. This ensures that publishers on different hosts, that are sending different data to the same RT Stream, can be correctly identified.

However, if the interface is being used in a scenario where the hostname changes between each instantiation (for example, inside a Docker container that is destroyed and re-created between each run) then you need to:

  • Ensure that the RT_LOG_PATH, where the message directory will be created, will survive reboots.

  • Override the host name to ensure the same directory is used for all instantiations of the publisher. This can be done by using the setSessionSuffix() method before calling start() on the client.

setSessionSuffix

This method allows you to overwrite the last portion of the session name.

client.setSessionSuffix("sessionIdentifier");
client.start();

Note

  • This method exists on the RtClient object, so to do this you will either need to create the client using the RtClient constructor, or cast the instance of StreamingClient you get from the StreamingClientFactory to RtClient. (Currently StreamingClientFactory only returns instances of RtClient.)
  • If you are using the RtDedupClient you don't need to do this because the session name is constructed differently.

Warning

  • Calling setSessionSuffix() after calling start() will result in an exception.

Publishing data

The simplest way to publish data is to use the BulkLoader class. It handles turning 2-dimensional arrays into flip objects before publishing. You can make one as follows (example taken from BulkUploadBatches.java):

import kx.insights.streaming.*
...
String[] columns = new String[] { "Time", "SensorName", "Voltage", "Current" };
BulkLoader tw = new BulkLoader(tableName, client, columns);

You can then pass in 2D arrays, where the first dimension is the row number, and the second is the column number.

Random rand = new Random();
Timestamp now = new Timestamp(System.currentTimeMillis());
for (int i = 0; i < numEntries; i++)
 {
 Object[][] tableToSend = new Object[NUM_SENSORS][];
 for (int j = 0; j < NUM_SENSORS; j++)
 {
     tableToSend[j] = new Object[columns.length];
     tableToSend[j][0] = now;
     tableToSend[j][1] = String.format("Sensor_%d", j);
     tableToSend[j][2] = rand.nextFloat() * 240.0f;
     tableToSend[j][3] = rand.nextFloat() * 10.0f;
     System.out.printf("Writing line, %s, %s, %f, %f\n",
     tableToSend[j][0], tableToSend[j][1], tableToSend[j][2], tableToSend[j][3]);
 }

 tw.writeTable(tableToSend);
}

There are two other loaders available. They both implement the loader interface, so this same basic example applies, but have some differences in how they work:

  • BatchLoader: this will not send data immediately, but will send the data in batches. You can specify the batch size in the constructor.

  • ValidatedLoader: (kdb Insights Enterprise only) this will check that the data you are sending matches the table schema and throw an exception if it does not match. You will need to obtain an instance of the ValidatedLoader using the ValidatedLoaderFactory. ValidatedLoaderFactory will use the kdb Insights Enterprise getMeta API to read the table schemas for the comparison.

!!!note The ValidatedLoader assumes that the kdb Insights service, the Stream Processor, does not modify the schema of the data before it is written to the table.

Terminating

When you have finished publishing data, call stop() on the RTClient object.

client.stop();

Deduplication

To take advantage of the deduplication capabilities of RT requires some changes to the above process.

All publishers who are sending data from the same origin must share the same deduplication ID. Messages will be deduplicated assuming that each message has a message id that is unique for the origin, and have a distance from zero that is increasing by one with each message. For example: 1 -2 3 4 is a valid sequence of ids, but 1 2 -2 4 is not.

For each deduplication ID, RT will keep track of the high watermark of the message id when RT sees a message with a lower identifier than the watermark then that message is discarded.

To use this facility you need to create an RtDedupClient and pass in your chosen deduplication ID.

RtDedupClient rtDedupClient = new RtDedupClient(new HttpsConfigRequestor(), "dedupId");
rtDedupClient.start();

Unlike in the Publishing data example, there are no loaders, so you will need to send data in using pasync directly.

rtDedupClient.pasync("streamid", data, messageid);

Where "data" is an array of the types in the section below.

When you have finished publishing data, call stop() on the rtDedupClient as you would a regular RtClient.

rtDedupClient.stop();

Monitoring the log position and checking the data flow

As of version 1.8.0 of the Java interface, both the RtClient and RtDedupClient support the getMaxMergedPosition() method.

Long logPosition = client.getMaxMergedPosition();

The value returned from this method can be used to check if data is being ingested properly at by RT by checking to see if the value returned increases as more data is ingested.

If using kdb Insights Enterprise, you can also query the data using the DataAccessClient, like this

Timestamp startTime = new Timestamp(System.currentTimeMillis() - 3600000); //1 hour ago in ms
Timestamp endTime = new Timestamp(System.currentTimeMillis());
DataAccessClient dataAccessClient = new DataAccessClient();
dataAccessClient.connect();
Object result = dataAccessClient.getData(tableName, startTime, endTime);

See the SimpleGetData sample in the kxi-java-sdk-samples for more details.

Supported data types

The following data types are supported by the Java interface:

q type name q type number java type schema notation example
boolean -1 Boolean 'boolean' true
guid -2 UUID 'guid' 0f14d0ab-9605-4a62-a9e4-5ed26688389b
byte -4 Byte 'byte' 0xF4
short -5 Short 'short' 5
int -6 Integer 'int' or 'integer' 56789
long -7 Long 'long' 12345678
real -8 Float 'real' 2.5
float -9 Double 'float' 89.3
char -10 Character 'char' d
symbol -11 String 'symbol' LLOY
timestamp -12 java.sql.Timestamp 'timestamp' or 'ts' 2000.01.01D00:00:00.200000000
month -13 kx.c.Month 'month' 2002.02m
date -14 java.sql.Date 'date' 1999.12.31
datetime -15 java.util.Date 'datetime' 12.31,1999.12.31T23:59:59.999
timespan -16 kx.c.Timespan 'timespan' 00:00:00.000000000
minute -17 kc.c.Minute 'minute' 00:00
second -18 kx.c.Second 'second' 00:00:00
time -19 java.sql.Time 'time' 00:00:00
string 10 char[] 'string' "this is a string"

symbol type

Only make text columns into symbols when the fields will be drawn from a small, stable domain and there is significant repetition in their use. When in doubt, start with a string column. It is easier to convert a string column to symbols than it is to remove symbols from the sym list.

time type

Because java.sql.Time does not have milliseconds, some precision is lost if this type is used.