Using the kdb Insights Java SDK
The Java SDK allows you to publish data to RT and query data from kdb Insights Enterprise from your own application.
This section details how to get started with the Java SDK. For examples on how to use it to publish data into a kdb Insights RT Microservice or kdb Insights Enterprise see documentation on the sample program.
Downloading the Java SDK
You can download the Java SDK from the kdb Insights Maven Nexus registry and a sample program from the kdb Insights package Nexus registry.
Note
The SDKs are only supported on Linux running on x86 architectures. We currently do not support Windows or OsX.
The Java SDK zip file contains everything required to use the Java SDK, you can download the file and unzip.
Environment variables
The Java SDK can be configured through environment variables.
Ingest and Query
variable | required | description |
---|---|---|
KXI_CONFIG_URL |
Mandatory | The URL or file that this program calls or reads to find the endpoint(s) it needs to connect to |
SDK_CLIENT_UID |
Mandatory | An access token that the user can obtain following the steps here |
RT_FILE_LOGLEV |
Optional | Sets the level of logging to file (0-6) |
RT_CONSOLE_LOGLEV |
Optional | Sets the level of logging to the console (0-6) |
RT_DEBUG |
Optional | (deprecated). Setting this to anything other than "0" is the same as setting RT_FILE_LOGLEV=3. Will be ignored if RT_FILE_LOGLEV is set. |
Note
Much of this configuration can also be performed through code. For example, if you create an instance of StreamingClientFactory with a ConfigRequestor rather than using the default constructor, KXI_CONFIG_URL
will be ignored.
KXI_CONFIG_URL
The KXI_CONFIG_URL
variable points to a URL or file. If you are relying on the Information Service to determine the endpoint(s) to connect to, then a URL in the form of https://{INSIGHTS_HOSTNAME}/informationservice/details/{SDK_CLIENT_UID} should be used. If the user is relying on a file to determine the endpoint(s), then a helper function FileConfigRequestor
plus a json file can be provided. An example of using an URL or a file are provided here.
Levels of Logging
The logging levels that you can set are as follows:
level | description |
---|---|
0 | NONE |
1 | FATAL |
2 | ERROR |
3 | WARN |
4 | INFO |
5 | DEBUG |
6 | TRACE |
Additional variables for Ingest
variable | required | description |
---|---|---|
RT_REP_DIR |
Mandatory | Path where the replicator can be extracted to. It must be possible to execute a program from this location. |
RT_LOG_PATH |
Mandatory | The location where the RT message streams will be written to |
RT_NO_TRUNCATE |
Optional | If this is set to anything other than "0" log truncation will be disabled |
Additional variables for Query
variable | required | description |
---|---|---|
KXI_QUERY_TOKEN |
Conditional | A bearer token used for authentication when making queries. If this is set KXI_QUERY_USER and KXI_QUERY_SECRET are ignored. |
KXI_QUERY_USER |
Conditional | The keycloak client that will be used to generate bearer tokens to log in for query |
KXI_QUERY_SECRET |
Conditional | The keycloak client secret that will be used to generate bearer tokens for query |
Pre-requisites
If you wish to use the SDK in the same cluster as kdb Insights there is no need for an Information Service and a configuration file is used to provide the endpoints and certificates .
- Download the assembly
sdk_sample_assembly
following the instructions here. - Make sure that the assembly is deployed in your kdb Insights Enterprise instance.
- Ensure you have an authenticated kdb Insights Enterprise client URL.
- Make sure that the kdb Insights Enterprise ingest endpoints (as defined by the
KXI_CONFIG_URL
) are accessible.
- Make sure that the kdb Insights RT Microservice ingest endpoints are accessible.
Initializing
Before publishing any data, you need to start a new RtClient object in Java:
You can pass the configuration URL directly into the constructor as follows:
RtClient client = new RtClient("$KXI_CONFIG_URL");
client.start();
Or you can use the StreamingClientFactory and an HttpsConfigRequestor to pull the configuration URL from the KXI_CONFIG_URL
environment variable:
StreamingClientFactory scf = new StreamingClientFactory(new HttpsConfigRequestor());
StreamingClient client = scf.getStreamingClient();
client.start()
You can use a FileConfigRequestor to provide the configuration from a file. When using the StreamingClientFactory, this is done as follows:
ConfigParser parser = new ConfigParser();
RtClient client = new RtClient(new FileConfigRequestor("~/config.json", parser));
client.start();
"~/config.json" will need to contain the configuration in the same format as it is passed back from kdb Insights Information Service, but with some important differences:
- The "useSslRt" top level key needs to be set to false
- The "ca", "key" and "cert" are no longer needed and will be ignored if provided.
- You will need to provide the internal hostnames and non-ssl RT port numbers (typically 5002) under the "insert" key
Here is an example of how the config file should look:
{
"useSslRt":false,
"name":"dgm3-service",
"topics":{
"insert":"sdk-sample-assembly",
"query":"requests"},
"insert":{
"insert":[":rt-sdk-sample-assembly-north-0:5002",":rt-sdk-sample-assembly-north-1:5002",":rt-sdk-sample-assembly-north-2:5002"],
"query":[]},
"query":[":34.66.95.116:6050"]}
Note
The KXI_CONFIG_URL
environment variable will be ignored if you run the SDK in this way.
Handling changing hostnames
Each publisher has a session name associated with it, to ensure it is uniquely identifiable by kdb Insights. This is physically represented within the Java SDK as the directory messages are written to before being replicated into kdb Insights.
By default the Java SDK sets the session name (and hence the message directory) to the following: SDK_CLIENT_UID.streamid.hostname
. This ensures that publishers on different hosts, that are sending different data to the same kdb Insights Stream, can be correctly identified.
However, if the SDK is being used in a scenario where the hostname changes between each instantiation (for example, inside a Docker container that is destroyed and re-created between each run) then you need to:
-
Ensure that the
RT_LOG_PATH
, where the message directory will be created, will survive reboots. -
Override the host name to ensure the same directory is used for all instantiations of the publisher. This can be done by using the
setSessionSuffix()
method before callingstart()
on the client.
setSessionSuffix
This method allows you to overwrite the last portion of the session name.
client.setSessionSuffix("sessionIdentifier");
client.start();
Note
- This method exists on the RtClient object, so to do this you will either need to create the client using the RtClient constructor, or cast the instance of StreamingClient you get from the StreamingClientFactory to RtClient. (Currently StreamingClientFactory only returns instances of RtClient.)
- If you are using the
RtDedupClient
you don't need to do this because the session name is constructed differently.
Warning
- Calling
setSessionSuffix()
after callingstart()
will result in an exception.
Publishing data
The simplest way to publish data is to use the BulkLoader class. It handles turning 2-dimensional arrays into flip objects before publishing. You can make one as follows (example taken from RandomDataSample.java):
String[] columns = new String[] { "Time", "SensorName", "Voltage", "Current" };
BulkLoader tw = new BulkLoader(tableName, client, columns);
You can then pass in 2D arrays, where the first dimension is the row number, and the second is the column number.
Random rand = new Random();
Timestamp now = new Timestamp(System.currentTimeMillis());
for (int i = 0; i < numEntries; i++)
{
Object[][] tableToSend = new Object[NUM_SENSORS][];
for (int j = 0; j < NUM_SENSORS; j++)
{
tableToSend[j] = new Object[columns.length];
tableToSend[j][0] = now;
tableToSend[j][1] = String.format("Sensor_%d", j);
tableToSend[j][2] = rand.nextFloat() * 240.0f;
tableToSend[j][3] = rand.nextFloat() * 10.0f;
System.out.printf("Writing line, %s, %s, %f, %f\n",
tableToSend[j][0], tableToSend[j][1], tableToSend[j][2], tableToSend[j][3]);
}
tw.writeTable(tableToSend);
}
There are two other loaders available. They both implement the loader interface, so this same basic example applies, but have some differences in how they work:
-
BatchLoader: this will not send data immediately, but will send the data in batches. You can specify the batch size in the constructor.
-
ValidatedLoader: (kdb Insights Enterprise only) this will check that the data you are sending matches the table schema and throw an exception if it does not match. You will need to obtain an instance of the ValidatedLoader using the ValidatedLoaderFactory. ValidatedLoaderFactory will use the kdb Insights Enterprise getMeta API to read the table schemas for the comparison.
Note
The ValidatedLoader assumes that the kdb Insights service, the Stream Processor, does not modify the schema of the data before it is written to the table.
Terminating
When you have finished publishing data, call stop()
on the RTClient object.
client.stop();
Deduplication
To take advantage of the deduplication capabilities of RT requires some changes to the above process.
All publishers who are sending data from the same origin must share the same deduplication ID
. Messages will be deduplicated assuming that each message has a message id
that is unique for the origin, and have a distance from zero that is increasing by one with each message. For example: 1 -2 3 4
is a valid sequence of ids, but 1 2 -2 4
is not.
For each deduplication ID
, RT will keep track of the high watermark of the message id when RT sees a message with a lower identifier than the watermark then that message is discarded.
To use this facility you need to create an RtDedupClient
and pass in your chosen deduplication ID
.
RtDedupClient rtDedupClient = new RtDedupClient(new HttpsConfigRequestor(), "dedupId");
rtDedupClient.start();
Unlike in the Publishing data example, there are no loaders, so you will need to send data in using pasync directly.
rtDedupClient.pasync("streamid", data, messageid);
Where "data" is an array of the types in the section below.
When you have finished publishing data, call stop() on the rtDedupClient as you would a regular RtClient.
rtDedupClient.stop();
Supported data types
The following data types are supported by the Java SDK:
q type name | q type number | java type | schema notation | example |
---|---|---|---|---|
boolean | -1 | Boolean | 'boolean' | true |
guid | -2 | UUID | 'guid' | 0f14d0ab-9605-4a62-a9e4-5ed26688389b |
byte | -4 | Byte | 'byte' | 0xF4 |
short | -5 | Short | 'short' | 5 |
int | -6 | Integer | 'int' or 'integer' | 56789 |
long | -7 | Long | 'long' | 12345678 |
real | -8 | Float | 'real' | 2.5 |
float | -9 | Double | 'float' | 89.3 |
char | -10 | Character | 'char' | d |
symbol | -11 | String | 'symbol' | LLOY |
timestamp | -12 | java.sql.Timestamp | 'timestamp' or 'ts' | 2000.01.01D00:00:00.200000000 |
month | -13 | kx.c.Month | 'month' | 2002.02m |
date | -14 | java.sql.Date | 'date' | 1999.12.31 |
datetime | -15 | java.util.Date | 'datetime' | 12.31,1999.12.31T23:59:59.999 |
timespan | -16 | kx.c.Timespan | 'timespan' | 00:00:00.000000000 |
minute | -17 | kc.c.Minute | 'minute' | 00:00 |
second | -18 | kx.c.Second | 'second' | 00:00:00 |
time | -19 | java.sql.Time | 'time' | 00:00:00 |
string | 10 | char[] | 'string' | "this is a string" |
symbol type
Only make text columns into symbols when the fields will be drawn from a small, stable domain and there is significant repetition in their use. When in doubt, start with a string column. It is easier to convert a string column to symbols than it is to remove symbols from the sym list.
time type
Because java.sql.Time does not have milliseconds, some precision is lost if this type is used.