Using the kdb Insights C interface
The C interface allows you to publish data to RT and subscribe to data from RT in your own application.
This section details how to get started with the C interface. For samples showing how to use it to publish data to and subscribe to data from a kdb Insights Reliable Transport or kdb Insights Enterprise see the sample program.
Downloading the C interface
You can download the C interface from the kdb Insights Nexus registry.
The kxi-c-sdk-${VERSION}.zip file
contains everything required to use the interface. No installation is required, you can just download the zip file and unzip.
Supported Operating Systems
The C interface is supported on the following operating systems:
- CentOS 8 or later
- Red Hat Enterprise Linux (RHEL) 8 or later
- Ubuntu 18.04 or later
- Windows
Note
The interface is only supported on Linux and Windows running on x64 architectures.
Files extracted
├── include
│ └── rt_helper
│ ├── error_codes.h
│ ├── kdb
│ │ ├── k.h
│ │ └── kk.h
│ ├── ksvc_direct_c.h
│ ├── ksvcrt_c.h
│ ├── rt1_helper.h
│ ├── rt1_sub.h
│ ├── rt1_sub_flt.h
│ ├── rt1_sub_kcb.h
│ └── rt_params.h
├── librt_helper.so
├── pull_client_static
├── push_client_static
├── replicator_static.so
└── rt_helper_app
Prerequisites
Before building a program that uses the C interface you need:
- C interface
cmake
package
Using the C interface in your own application
To publish data to kdb Insights Enterprise using the C interface as part of your own application, use the rt_helper
. This reads all the information needed to connect to kdb Insights Enterprise from the variables provided and publishes the data into the system.
Do one of the following:
- Extract the details from the sample program
- Follow the sections below that describe how to publish and subscribe to data
Sample illustration
An example usage of the C interface to publish and subscribe to the sdk_sample_assembly
is as follows:
Pre-requisites
- Download the assembly
sdk_sample_assembly
following the instructions here. - Make sure that the assembly is deployed in your kdb Insights Enterprise instance.
- Ensure you have an authenticated kdb Insights Enterprise client URL.
- Make sure that the kdb Insights Enterprise ingest endpoints (as defined by the
KXI_CONFIG_URL
) are accessible.
Initializing
-
Start the
rt_helper
, passing theKXI_CONFIG_URL
and logging level as parameters. A list of all parameters is available here. -
Define the schema. For the
sdk_sample_assembly
you must define the trace table schema as "sensorID:int,captureTS:ts,readTS:ts,valFloat:float,qual:byte,alarm:byte".
#include <rt_helper/kdb/kk.h>
#include <rt_helper/ksvcrt_c.h>
#include <rt_helper/rt_params.h>
RT_Params params = {.configUrl = "{KXI_CONFIG_URL}", .logLevel = "info"};
void *h = ksvcrtc_start(¶ms);
//Define the schema
Schema *schema = ParseSchema("sensorID:int,captureTS:ts,readTS:ts,valFloat:float,qual:byte,alarm:byte");
Parameters
The following parameters are available:
parameter | required | default | description |
---|---|---|---|
configUrl | Mandatory | none | The URL or file that this program calls or reads to find the endpoint(s) it needs to connect to |
configMaxAge | Optional | 300000 | Maximum age of configuration details in milliseconds. After this amount of time, if still unable to fetch a new configuration, the connection is considered to be broken and any subsequent attempts to send data to kdb Insights Enterprise will fail with a Not connected error. This is only needed when using the kdb Insights Enterprise Information Service to obtain the configuration details. |
consoleLogLevel | Optional | err | Console log level (info/warn/err/off) |
replFileLogLevel | Optional | off | Replicator file log level (trace/debug/info/warn/err/debug/off) |
fetchConfigSleep | Optional | 5000 | Time in ms to sleep between reads of the configuration details |
localPersistencePeriod | Optional | 600000 | Local persistence period (in milliseconds) |
logLevel | Optional | info | Log level (info/warn/err/off) |
rtDir | Optional | /tmp/rt | RT directory |
dedupId | Optional | none | Id to dedup multiple publishers (typically publishing to same stream) |
queryTimeout | Optional | 2000 | Milliseconds to wait for the connection to be established |
CAInfoFile | Optional | none | File that contains the CA cert bundle. This option is provided to overcome curl CA error on Windows. |
configUrl
The configUrl
parameter can be set to:
- `KXI_CONFIG_URL` to access _kdb Insights Enterprise_ from [outside the cluster](getting-started-sdks.md#from-outside-the-cluster)
- `file:///locaton/config_file_name` to use a local file to access _kdb Insights Enterprise_ or a _kdb Insights RT Microservice_ from [inside the cluster](getting-started-sdks.md#from-inside-the-cluster)
Environment variables
The following environment variables are available:
variable | required | default | description |
---|---|---|---|
RT_LOG_PATH | Optional | /tmp/rt | RT directory. |
RT_LOGLEVEL_CONSOLE | Optional | err | Console log level (info/warn/err/off) |
KXI_C_SDK_APP_LOG_PATH | Optional | /tmp/kxi_c_sdk_logs/ |
SDK debug log path. On Windows this will be set to %TEMP%/kxi_c_sdk_logs/ |
KXI_C_SDK_BIN_DIR | Optional | path of librt_helper.so | SDK binary location |
REPLICATOR_BIN_DIR | Optional | KXI_C_SDK_BIN_DIR | Replicator binary location |
KXI_C_SDK_CURL_VERBOSE | Optional | none | Turn on curl verbose output |
Handling changing hostnames
Each publisher has a session name associated with it, to ensure it is uniquely identifiable. This is physically represented within the C interface as the directory messages are written to before being replicated into RT.
The C interface sets the session name (and hence the message directory) to the following: SDK_CLIENT_UID.streamid.hostname
. This ensures that publishers on different hosts, that are sending different data to the same kdb Insights Stream, can be correctly identified.
SDK_CLIENT_UID
SDK_CLIENT_UID is part of KXI_CONFIG_URL (https://${INSIGHTS_HOSTNAME}/informationservice/details/${SDK_CLIENT_UID})
- These can be obtained following the steps here outside the cluster
Currently it is not recommended to use the C interface in a scenario where the hostname changes between each instantiation (for example, inside a Docker container that is destroyed and re-created between each run) as a new session will be created for each instantiation.
Publishing data
To publish data:
- Create the message
- Publish the message
The example below takes a csv record and converts it to a kdb+ object before publishing:
//Create a dummy record
char csv_record[] = "10,2021.01.01D00:00:09.000000000,2021.01.01D00:00:09.000000000,97,1,0";
//Convert the CSV record to a kdb+ data type (this function is available in the sample program)
K k_record = convert_csv_record_to_k_record(schema, csv_record);
//Publish the message
ksvcrtc_insert(h, "trace", k_record);
Terminating the publisher
Once you have finished publishing messages to RT you must stop the rt_helper
:
//Stop the rt_helper
ksvcrtc_stop(h);
Subscribing to data
To subscribe to data:
- Create the message callbacks in your application
- Subscribe to data from RT using a config file with the subscribe endpoints of RT
- Optionally, you could subscribe to the stream from the latest message:
{
"_comment": "create this JSON config file as /tmp/sub_config.json",
"useSslRt":false,
"name":"test-subscribe",
"topics":{"subscribe":"<streamid>"},
"insert": {"subscribe":[":<host1:portnum>",":<host2:portnum>",":<host3:portnum>"]}
}
static int message(void* ctx, rt_pos pos[2], K x)
{
(void)ctx;
printf("message pos: %li, len: %i\n",pos[0],(I)xn);
if(is_kxi_msg(x))
printf("ts:%lli, to:%i, corr:%lli, orig:\"%s\"\n",xK[0]->j,xK[1]->i,xK[2]->j,xK[3]->s);
r0(x);
return 0;
}
static void event(void* ctx, rt_pos pos[2], int type)
{
(void)ctx;
char* event[] = {"roll","badmsg","badtail","archived","reset"};
printf("event pos: %li-%li, type: %s\n",pos[0],pos[1],type<0?"user":event[type]);
}
static int filter_table(void* filter_ctx, const kxi_message_preview* preview)
{
const char* table = filter_ctx;
printf("#filter params - ts: %li, to: %i, corr: %li, origin: \"%s\", s0: \"%s\", s1: \"%s\"\n",
preview->timestamp, preview->timeout, preview->correlator, preview->origin,
preview->payload.s0, preview->payload.s1);
return !strcmp(table,preview->payload.s1);
}
char configFileName[] = "file:///tmp/sub_config.json";
char tableName[] = "trace";
void* sub = rt1_subscriber_create();
rt1_callbacks cb =
rt1_apply_kxi_filter(
rt1_create_cb((rt1_callbacks_k){
.message = message,
.event = event
}),
tableName, // table name
filter_table,
0
);
rt1_subscribe(sub,(rt1_stream_params){.configUrl=configFileName},pos,cb)
If you want to subscribe to the stream from the latest message merged by the merger, you need to subscribe as follows:
long latest_pos = rt1_get_latest_position((rt1_stream_params){.configUrl=configFileName});
rt1_subscribe(sub,(rt1_stream_params){.configUrl=configFileName},latest_pos,cb);
Terminating the subscriber
Once you have finished subscribing to messages from RT, you must stop the subscriber
:
//Stop the subscriber
rt1_subscriber_destroy(sub);