Skip to content

Using the kdb Insights Java SDK

The Java SDK allows you to publish data to RT and query data from kdb Insights Enterprise from your own application.

This section details how to get started with the Java SDK. For examples on how to use it to publish data into a kdb Insights RT Microservice or kdb Insights Enterprise see documentation on the sample program.

Downloading the Java SDK

You can download the Java SDK from the kdb Insights Maven Nexus registry and a sample program from the kdb Insights package Nexus registry.

Note

The SDKs are only supported on Linux running on x86 architectures. We currently do not support Windows or OsX.

The Java SDK zip file contains everything required to use the Java SDK, you can download the file and unzip.

Environment variables

The Java SDK can be configured through environment variables.

Ingest and Query

variable required description
KXI_CONFIG_URL Mandatory The URL or file that this program calls or reads to find the endpoint(s) it needs to connect to
SDK_CLIENT_UID Mandatory An access token that the user can obtain following the steps here
RT_FILE_LOGLEV Optional Sets the level of logging to file (0-6)
RT_CONSOLE_LOGLEV Optional Sets the level of logging to the console (0-6)
RT_DEBUG Optional (deprecated). Setting this to anything other than "0" is the same as setting RT_FILE_LOGLEV=3. Will be ignored if RT_FILE_LOGLEV is set.

Note

Much of this configuration can also be performed through code. For example, if you create an instance of StreamingClientFactory with a ConfigRequestor rather than using the default constructor, KXI_CONFIG_URL will be ignored.

KXI_CONFIG_URL

The KXI_CONFIG_URL variable points to a URL or file. If you are relying on the Information Service to determine the endpoint(s) to connect to, then a URL in the form of https://{INSIGHTS_HOSTNAME}/informationservice/details/{SDK_CLIENT_UID} should be used. If the user is relying on a file to determine the endpoint(s), then a helper function FileConfigRequestor plus a json file can be provided. An example of using an URL or a file are provided here.

Levels of Logging

The logging levels that you can set are as follows:

level description
0 NONE
1 FATAL
2 ERROR
3 WARN
4 INFO
5 DEBUG
6 TRACE

Additional variables for Ingest

variable required description
RT_REP_DIR Mandatory Path where the replicator can be extracted to. It must be possible to execute a program from this location.
RT_LOG_PATH Mandatory The location where the RT message streams will be written to
RT_NO_TRUNCATE Optional If this is set to anything other than "0" log truncation will be disabled

Additional variables for Query

variable required description
KXI_QUERY_TOKEN Conditional A bearer token used for authentication when making queries. If this is set KXI_QUERY_USER and KXI_QUERY_SECRET are ignored.
KXI_QUERY_USER Conditional The keycloak client that will be used to generate bearer tokens to log in for query
KXI_QUERY_SECRET Conditional The keycloak client secret that will be used to generate bearer tokens for query

Pre-requisites

If you wish to use the SDK in the same cluster as kdb Insights there is no need for an Information Service and a configuration file is used to provide the endpoints and certificates .

  • Download the assembly sdk_sample_assembly following the instructions here.
  • Make sure that the assembly is deployed in your kdb Insights Enterprise instance.
  • Ensure you have an authenticated kdb Insights Enterprise client URL.
  • Make sure that the kdb Insights Enterprise ingest endpoints (as defined by the KXI_CONFIG_URL) are accessible.
  • Make sure that the kdb Insights RT Microservice ingest endpoints are accessible.

Initializing

Before publishing any data, you need to start a new RtClient object in Java:

You can pass the configuration URL directly into the constructor as follows:

RtClient client = new RtClient("$KXI_CONFIG_URL");
client.start();

Or you can use the StreamingClientFactory and an HttpsConfigRequestor to pull the configuration URL from the KXI_CONFIG_URL environment variable:

StreamingClientFactory scf = new StreamingClientFactory(new HttpsConfigRequestor());
StreamingClient client = scf.getStreamingClient();
client.start()

You can use a FileConfigRequestor to provide the configuration from a file. When using the StreamingClientFactory, this is done as follows:

ConfigParser parser = new ConfigParser();
RtClient client = new RtClient(new FileConfigRequestor("~/config.json", parser));
client.start();

"~/config.json" will need to contain the configuration in the same format as it is passed back from kdb Insights Information Service, but with some important differences:

  • The "useSslRt" top level key needs to be set to false
  • The "ca", "key" and "cert" are no longer needed and will be ignored if provided.
  • You will need to provide the internal hostnames and non-ssl RT port numbers (typically 5002) under the "insert" key

Here is an example of how the config file should look:

{
    "useSslRt":false, 
    "name":"dgm3-service",
    "topics":{
        "insert":"sdk-sample-assembly",
        "query":"requests"},
    "insert":{
        "insert":[":rt-sdk-sample-assembly-north-0:5002",":rt-sdk-sample-assembly-north-1:5002",":rt-sdk-sample-assembly-north-2:5002"],
        "query":[]},
    "query":[":34.66.95.116:6050"]}

Note

The KXI_CONFIG_URL environment variable will be ignored if you run the SDK in this way.

Handling changing hostnames

Each publisher has a session name associated with it, to ensure it is uniquely identifiable by kdb Insights. This is physically represented within the Java SDK as the directory messages are written to before being replicated into kdb Insights.

By default the Java SDK sets the session name (and hence the message directory) to the following: SDK_CLIENT_UID.streamid.hostname. This ensures that publishers on different hosts, that are sending different data to the same kdb Insights Stream, can be correctly identified.

However, if the SDK is being used in a scenario where the hostname changes between each instantiation (for example, inside a Docker container that is destroyed and re-created between each run) then you need to:

  • Ensure that the RT_LOG_PATH, where the message directory will be created, will survive reboots.

  • Override the host name to ensure the same directory is used for all instantiations of the publisher. This can be done by using the setSessionSuffix() method before calling start() on the client.

setSessionSuffix

This method allows you to overwrite the last portion of the session name.

client.setSessionSuffix("sessionIdentifier");
client.start();

Note

  • This method exists on the RtClient object, so to do this you will either need to create the client using the RtClient constructor, or cast the instance of StreamingClient you get from the StreamingClientFactory to RtClient. (Currently StreamingClientFactory only returns instances of RtClient.)
  • If you are using the RtDedupClient you don't need to do this because the session name is constructed differently.

Warning

  • Calling setSessionSuffix() after calling start() will result in an exception.

Publishing data

The simplest way to publish data is to use the BulkLoader class. It handles turning 2-dimensional arrays into flip objects before publishing. You can make one as follows (example taken from RandomDataSample.java):

String[] columns = new String[] { "Time", "SensorName", "Voltage", "Current" };
BulkLoader tw = new BulkLoader(tableName, client, columns);

You can then pass in 2D arrays, where the first dimension is the row number, and the second is the column number.

Random rand = new Random();
Timestamp now = new Timestamp(System.currentTimeMillis());
for (int i = 0; i < numEntries; i++)
 {
    Object[][] tableToSend = new Object[NUM_SENSORS][];
    for (int j = 0; j < NUM_SENSORS; j++)
    {
        tableToSend[j] = new Object[columns.length];
        tableToSend[j][0] = now;
        tableToSend[j][1] = String.format("Sensor_%d", j);
        tableToSend[j][2] = rand.nextFloat() * 240.0f;
        tableToSend[j][3] = rand.nextFloat() * 10.0f;
        System.out.printf("Writing line, %s, %s, %f, %f\n",
        tableToSend[j][0], tableToSend[j][1], tableToSend[j][2], tableToSend[j][3]);
    }

    tw.writeTable(tableToSend);
}

There are two other loaders available. They both implement the loader interface, so this same basic example applies, but have some differences in how they work:

  • BatchLoader: this will not send data immediately, but will send the data in batches. You can specify the batch size in the constructor.

  • ValidatedLoader: (kdb Insights Enterprise only) this will check that the data you are sending matches the table schema and throw an exception if it does not match. You will need to obtain an instance of the ValidatedLoader using the ValidatedLoaderFactory. ValidatedLoaderFactory will use the kdb Insights Enterprise getMeta API to read the table schemas for the comparison.

    Note

    The ValidatedLoader assumes that the kdb Insights service, the Stream Processor, does not modify the schema of the data before it is written to the table.

Terminating

When you have finished publishing data, call stop() on the RTClient object.

client.stop();

Deduplication

To take advantage of the deduplication capabilities of RT requires some changes to the above process.

All publishers who are sending data from the same origin must share the same deduplication ID. Messages will be deduplicated assuming that each message has a message id that is unique for the origin, and have a distance from zero that is increasing by one with each message. For example: 1 -2 3 4 is a valid sequence of ids, but 1 2 -2 4 is not.

For each deduplication ID, RT will keep track of the high watermark of the message id when RT sees a message with a lower identifier than the watermark then that message is discarded.

To use this facility you need to create an RtDedupClient and pass in your chosen deduplication ID.

RtDedupClient rtDedupClient = new RtDedupClient(new HttpsConfigRequestor(), "dedupId");
rtDedupClient.start();

Unlike in the Publishing data example, there are no loaders, so you will need to send data in using pasync directly.

rtDedupClient.pasync("streamid", data, messageid);

Where "data" is an array of the types in the section below.

When you have finished publishing data, call stop() on the rtDedupClient as you would a regular RtClient.

rtDedupClient.stop();

Supported data types

The following data types are supported by the Java SDK:

q type name q type number java type schema notation example
boolean -1 Boolean 'boolean' true
guid -2 UUID 'guid' 0f14d0ab-9605-4a62-a9e4-5ed26688389b
byte -4 Byte 'byte' 0xF4
short -5 Short 'short' 5
int -6 Integer 'int' or 'integer' 56789
long -7 Long 'long' 12345678
real -8 Float 'real' 2.5
float -9 Double 'float' 89.3
char -10 Character 'char' d
symbol -11 String 'symbol' LLOY
timestamp -12 java.sql.Timestamp 'timestamp' or 'ts' 2000.01.01D00:00:00.200000000
month -13 kx.c.Month 'month' 2002.02m
date -14 java.sql.Date 'date' 1999.12.31
datetime -15 java.util.Date 'datetime' 12.31,1999.12.31T23:59:59.999
timespan -16 kx.c.Timespan 'timespan' 00:00:00.000000000
minute -17 kc.c.Minute 'minute' 00:00
second -18 kx.c.Second 'second' 00:00:00
time -19 java.sql.Time 'time' 00:00:00
string 10 char[] 'string' "this is a string"

symbol type

Only make text columns into symbols when the fields will be drawn from a small, stable domain and there is significant repetition in their use. When in doubt, start with a string column. It is easier to convert a string column to symbols than it is to remove symbols from the sym list.

time type

Because java.sql.Time does not have milliseconds, some precision is lost if this type is used.