Using the KX Insights PlatformSDKs
This use case highlights how to deploy the sdk-sample-assembly
Custom Resource (CR) pipeline, publish data into it and query the data, all through secure and authenticated endpoints within KX Insights Platform.
The goal is to take the data pipeline highlighted below and deploy to the KX Insights Platform Base Application to allow data to be ingested, persisted and queried.
Deploying the Assembly CR
You can deploy the assembly on top of the base instance of the KX Insights Platform in your cluster.
To deploy the assembly, download the sdk_sample_assembly.yaml from the release repository and deploy to your cluster with the command.
kubectl apply -f sdk_sample_assembly.yaml
The label insights.kx.com/app
currently gets set to the name of the assembly so this can be used as a filter to view the resources. For example if you deployed an assembly called sdk-sample-assembly
you can see the pods associated with the assembly by running
$ kubectl get pods -l insights.kx.com/app=sdk-sample-assembly
NAME READY STATUS RESTARTS AGE
sdk-sample-assembly-dap-hdb-0 2/2 Running 0 2d15h
sdk-sample-assembly-dap-hdb-1 2/2 Running 0 2d15h
sdk-sample-assembly-dap-hdb-2 2/2 Running 0 2d15h
sdk-sample-assembly-dap-idb-0 2/2 Running 0 2d15h
sdk-sample-assembly-dap-idb-1 2/2 Running 0 2d15h
sdk-sample-assembly-dap-idb-2 2/2 Running 0 2d15h
sdk-sample-assembly-dap-rdb-0 2/2 Running 0 2d15h
sdk-sample-assembly-dap-rdb-1 2/2 Running 0 2d15h
sdk-sample-assembly-dap-rdb-2 2/2 Running 0 2d15h
sdk-sample-assembly-sm-0 5/5 Running 0 2d15h
insights-spctl-sdtransform-v5ma6j9p0o-0 2/2 Running 0 2d15h
insights-spwork-sdtransform-v5ma6j9p0o-1-0 2/2 Running 0 2d15h
rt-sdk-sample-assembly-north-0 1/1 Running 0 2d15h
rt-sdk-sample-assembly-north-1 1/1 Running 0 2d15h
rt-sdk-sample-assembly-north-2 1/1 Running 0 2d15h
rt-sdk-sample-assembly-north-arc-5489dfff9f-88w4h 1/1 Running 0 2d15h
rt-sdk-sample-assembly-south-0 1/1 Running 0 2d15h
rt-sdk-sample-assembly-south-1 1/1 Running 0 2d15h
rt-sdk-sample-assembly-south-2 1/1 Running 0 2d15h
rt-sdk-sample-assembly-south-arc-7d75955d89-8jbq8 1/1 Running 0 2d15h
Client Integration
Now you've deployed the KX Insights Platform along with a sdk-sample-assembly
to ingest data, a client must be enrolled to allow data to be published via the authenticated and secure RT entry-point. Here we'll walk through the registration of a simple client while full details on the client enrollment workflow can be found here
The diagram below highlights the key components that allow the flow of data into the system from external sources. To do this securely it is necessary to utilize the Information Service and the Client Controller to register clients and allow them to understand the endpoints. It must also authorize itself and publish the data to the appropriate location
Key to client integration is the ingress endpoint where external clients are accessing the system. The endpoint will have a hostname dependent on the DNS setup of the cluster deploy. In this deployment and sdk-sample-assembly
CR the hostname is dev.insights.kx.com
and have initialized an ENV INSIGHTS_HOSTNAME
to be utilized throughout the working example
export INSIGHTS_HOSTNAME="dev.insights.kx.com"
Client Enrollment
External clients need to be enrolled in the system. This provisions resources for them and configures their topics.
This step requires an access token that can be retrieved as described here.
curl https://${INSIGHTS_HOSTNAME}/clientcontroller/enrol -H "Authorization: Bearer <access token>" -d '{ "name": "client-1", "topics": { "insert": "sdk-sample-assembly", "query": "requests" }}'
{"message":"success","detail":"Client enrolled","url":"d0c8ae0ae920f4ed78b2b8725dffdb74"}
Note
The topics.insert
field must match the topic used in the assembly.
In this example, it is sdk-sample-assembly
. See the next section for more information.
Matching topics
Topics assigned to clients need to be known by the system. There needs to be an assembly configured to consume that topic otherwise the data will not be published.
To illustrate this the YAML snippet below shows where the topic is configured for an assembly. The sequencer named north
is configured as external and its topic is set as sdk-sample-assembly
.
spec:
elements:
..
sequencer:
..
north:
external: true
topicConfig:
subTopic: "sdk-sample-assembly"
Clients configured with the sdk-sample-assembly
topic will be matched to this assembly.
Publishing data
The KX Insights Platform SDKs can be used to ingest data into the KX Insights Platform over the Reliable Transport (RT) protocol.
There are two SDKs currently available:
- ODBC driver (click here for installation instructions)
- Java SDK (click here for installation instructions)
Sample programs
Separate C and Java sample programs can be downloaded from the same release repository as the sdk_sample_assembly.yaml download.
Sample Java program
This section describes how to use the sample Java program to send the data from the sample.csv file to the KX Insights Platform using the sdk_sample_assembly. The sample Java program uses the Java SDK to upload a csv file, or a random set of data to the KX Insights Platform. The source code can be taken and edited for use inside your applications.
The sample files located on Nexus are as follows:
rtdemo.jar
- shadowJar that can be used to load the CSV filesample.csv
- sample data based on the sdk-sample-assembly schema
Publishing data from the CSV file
The first line of the CSV file contains the schema. This is a comma separated list, with each element in the format "columnName:type".
Run the following.
RT_REP_DIR=<REPLICATOR_LOCATION> KXI_CONFIG_URL=<CONFIG_URL> RT_LOG_PATH=<RT_LOG_PATH> java -jar ./rtdemo-<VERSION>-all.jar --runCsvLoadDemo=<CSV_FILE>
Parameter | Description |
---|---|
REPLICATOR_LOCATION |
Path where the replicator can be extracted to. It must be possible to execute a program from this location. |
CONFIG_URL |
The URL that this program will call to find the endpoint(s) it will need to connect to. This will be in the following form https://\({INSIGHTS_HOSTNAME}/informationservice/details/\) |
RT_LOG_PATH |
The location where the RT messages streams will be written to. |
CSV_FILE |
The csv file you want to load (see the sample.csv file provided in the project as an example). |
Run the following.
java -jar ./rtdemo.jar --runCsvLoadDemo=<CSV_FILE>
Building
If you want to build the shadowJar make sure a Java development environment is installed on your system and run the following in the root of the project.
./gradlew rtdemo:shadowJar
gradlew.bat rtdemo:shadowJar
The resulting jar file will be in
Using the Java SDK in your own application
To publish data to the KX Insights Platform using the Java SDK as part of your own application you need to use the RTClient class. This class reads all the information needed to connect to the KX Insights Platform from the environment and publishes the data into the Platform.
Initializing the RTClient
To start a new RtClient in java, just do the following.
RtClient client = new RtClient("https://someConfigEnpoint.com");
client.start();
If you want to want to pull the configuration url from the registry (on Windows) or the KXI_CONFIG_URL environment variable (On Linux) use the StreamingClientFactory
StreamingClientFactory scf = new StreamingClientFactory(new HttpsConfigRequestor(parser));
StreamingClient client = scf.getStreamingClient();
client.start()
Terminating the RTClient
Once you are done with the client you must call stop() on it.
client.stop()
Publishing data
An example of publishing data can be seen in the BulkLoader.java class.
private void wrapDataAndPushToTopic(Flip row) throws RtClientException, RtConfigException
{
Object[] unwrappedMsg = {RtClient.BULK_LOAD_MSGNAME, tableName, row};
client.pasync("insert", unwrappedMsg);
}
Note
"RtClient.BULK_LOAD_MSGNAME is .b"
Alternatively you can just use the BulkLoader class to publish data. It will handle turning 2-dimensional arrays into Flip objects before sending them into the KX Insights Platform. You can make one like this (example taken from RandomDataSample.java)
String[] columns = new String[] { "Time", "SensorName", "Voltage", "Current" };
BulkLoader tw = new BulkLoader(tableName, client, columns);
Then you can just pass in 2D arrays, where the first dimension is the row number, and the second is the column number.
Random rand = new Random();
Timestamp now = new Timestamp(System.currentTimeMillis());
for (int i = 0; i < numEntries; i++)
{
Object[][] tableToSend = new Object[NUM_SENSORS][];
for (int j = 0; j < NUM_SENSORS; j++)
{
tableToSend[j] = new Object[columns.length];
tableToSend[j][0] = now;
tableToSend[j][1] = String.format("Sensor_%d", j);
tableToSend[j][2] = rand.nextFloat() * 240.0f;
tableToSend[j][3] = rand.nextFloat() * 10.0f;
System.out.printf("Writing line, %s, %s, %f, %f\n",
tableToSend[j][0], tableToSend[j][1], tableToSend[j][2], tableToSend[j][3]);
}
tw.writeTable(tableToSend);
}
Sample ODBC driver program for Linux
This section describes how to build the sample program on Linux and use it to send the data from the sample.csv file to the KX Insights Platform using the SDK. A sample c
program 'csvupload' uses the ODBC driver to upload a csv file to the KX Insights Platform. The source code can be taken and edited for use inside your applications to send data to the KX Insights Platform.
The kodbc-${VERSION}.zip file locate on Nexus contains the following files:
csvupload.c
- csv upload source codeCMakeLists.txt
- cmake filesample.csv
- sample data based on the sdk-sample-assembly schema
Prerequisites
Before building the sample program the following pre-requisites are required:
- KX ODBC driver
unixodbc
packageunixodbc-dev
packagecmake
packagelibcsv-dev
package
Building
Once these are installed:
- Extract the files from the zip file
- Move to the extracted folder
- Build the application:
cmake --clean-first --build . && make
Application parameters
-c str
, ODBC connection string-t str
, Table name-s str
, Schema (e.g., field1:datatype,field2:datatype)
Note
The schema must match that defined in the pipeline assembly. For the SDK, described here, the schema definition is as follows: sensorID:int,captureTS:ts,readTS:ts,valFloat:float,qual:int,alarm:int
ODBC connection string parameters
The following list includes the recognized ODBC connection string parameters that can be included in the -c parameter:
DRIVER
, path to the driver file.DSN
, data source name.URL
, URL of the configuration file.RTDIR
, path to the RT log directory.TIMEOUT
, operation timeout in ms.CONF_SLEEP
, time in ms to sleep between fetches of the configuration details from the Information Service. Default value: 5000.CONF_MAX_AGE
, maximum age of configuration details in ms. Default value: 60000. After this amount of time, if still unable to fetch a new configuration, the connection is considered to be broken and any subsequent attempts to send data to the KX Insights Platform will fail withNot connected
error.
The URL
is mandatory, as well as one of the following: DRIVER
or DSN
.
Supported data types
The following data types are supported by the sample application:
type | schema notation | example |
---|---|---|
string | string | name |
symbol | symbol | LLOY |
int | int | 1 |
byte | byte | 0 |
long | long | 200000 |
float | float | 1.2345 |
timestamp | ts | 2000.01.01D00:00:00.000000000 |
Publishing data
In the example below the environment variables INSIGHTS_HOSTNAME and INSIGHTS_CLIENT_URL have been initialized.
DSN="DRIVER=/usr/local/lib/kodbc/libkodbc.so;URL=https://${INSIGHTS_HOSTNAME}/informationservice/details/${INSIGHTS_CLIENT_URL}"
Schema="sensorID:int,captureTS:ts,readTS:ts,valFloat:float,qual:byte,alarm:byte"
Table="trace"
./csvupload -c "$DSN" -t "$Table" -s "$Schema" < sample.csv
Note
The path to the DRIVER
may be different in your environment.
Removing a client
A client can be removed by making a REST request as below.
Removing a client requires an access token that can be retrieved as described here
curl https://${INSIGHTS_HOSTNAME}/clientcontroller/leave \
-H "Authorization: Bearer <access token>" \
-d '{ "name": "client-1" }'
{ "message": "success", "detail": "Client removed" }