Skip to content

Using the kdb Insights C interface

The C interface allows you to publish data to RT and subscribe to data from RT in your own application.

This section details how to get started with the C SDK. For samples showing how to use it to publish data to and subscribe to data from a kdb Insights Reliable Transport or kdb Insights Enterprise see the sample program.

Downloading the C interface

You can download the C interface from the kdb Insights Nexus registry.

The kxi-c-sdk-${VERSION}.zip file contains everything required to use the C interface. No installation is required, you can just download the zip file and unzip.

Supported Operating Systems

The C SDK is supported on the following operating systems:

  • CentOS 8 or later
  • Red Hat Enterprise Linux (RHEL) 8 or later
  • Ubuntu 18.04 or later
  • Windows

Note

The interface is only supported on Linux running on x86 architectures and Windows. We currently do not support OsX.

Files extracted

├── include
│   └── rt_helper
│       ├── error_codes.h
│       ├── kdb
│          ├── k.h
│          └── kk.h
│       ├── ksvc_direct_c.h
│       ├── ksvcrt_c.h
│       ├── rt1_helper.h
│       ├── rt1_sub.h
│       ├── rt1_sub_flt.h
│       ├── rt1_sub_kcb.h
│       └── rt_params.h
├── librt_helper.so
├── pull_client_static
├── push_client_static
├── replicator_static.so
└── rt_helper_app

Prerequisites

Before building a program that uses the C interface you need:

  • C interface
  • cmake package

Using the C interface in your own application

To publish data to kdb Insights Enterprise using the C SDK as part of your own application, use the rt_helper. This reads all the information needed to connect to kdb Insights Enterprise from the variables provided and publishes the data into the system.

Do one of the following:

  • Extract the details from the sample program
  • Follow the sections below that describe how to publish data

Sample illustration

An example usage of the C interface to publish to the sdk_sample_assembly is as follows:

Pre-requisites

  • Download the assembly sdk_sample_assembly following the instructions here.
  • Make sure that the assembly is deployed in your kdb Insights Enterprise instance.
  • Ensure you have an authenticated kdb Insights Enterprise client URL.
  • Make sure that the kdb Insights Enterprise ingest endpoints (as defined by the KXI_CONFIG_URL) are accessible.

Initializing

  1. Start the rt_helper, passing the KXI_CONFIG_URL and logging level as parameters. A list of all parameters is available here.

  2. Define the schema. For the sdk_sample_assembly you must define the trace table schema as "sensorID:int,captureTS:ts,readTS:ts,valFloat:float,qual:byte,alarm:byte".

#include <rt_helper/kdb/kk.h>
#include <rt_helper/ksvcrt_c.h>
#include <rt_helper/rt_params.h>

RT_Params params = {.configUrl = "{KXI_CONFIG_URL}", .logLevel = "info"};
void *h = ksvcrtc_start(&params);      

//Define the schema
Schema *schema = ParseSchema("sensorID:int,captureTS:ts,readTS:ts,valFloat:float,qual:byte,alarm:byte"); 
Parameters

The following parameters are available:

parameter required default description
configUrl Mandatory none The URL or file that this program calls or reads to find the endpoint(s) it needs to connect to
configMaxAge Optional 300000 Maximum age of configuration details in milliseconds. After this amount of time, if still unable to fetch a new configuration, the connection is considered to be broken and any subsequent attempts to send data to kdb Insights Enterprise will fail with a Not connected error. This is only needed when using the kdb Insights Enterprise Information Service to obtain the configuration details.
consoleLogLevel Optional err Console log level (info/warn/err/off)
replFileLogLevel Optional off Replicator file log level (trace/debug/info/warn/err/debug/off)
fetchConfigSleep Optional 5000 Time in ms to sleep between reads of the configuration details
localPersistencePeriod Optional 600000 Local persistence period (in milliseconds)
logLevel Optional info Log level (info/warn/err/off)
rtDir Optional /tmp/rt RT directory
dedupId Optional none Id to dedup multiple publishers (typically publishing to same stream)
queryTimeout Optional 2000 Milliseconds to wait for the connection to be established
CAInfoFile Optional none File that contains the CA cert bundle. This option is provided to overcome curl CA error on Windows.

configUrl

The configUrl parameter can be set to: - KXI_CONFIG_URL to access kdb Insights Enterprise from outside the cluster - file:///locaton/config_file_name to use a local file to access kdb Insights Enterprise or kdb Insights Reliable Transport from inside the cluster

Environment variables

The following environment variables are available:

variable required default description
RT_LOG_PATH Optional /tmp/rt RT directory.
RT_LOGLEVEL_CONSOLE Optional err Console log level (info/warn/err/off)
KXI_C_SDK_APP_LOG_PATH Optional /tmp/kxi_c_sdk_logs/ SDK debug log path. On Windows this will be set to %TEMP%/kxi_c_sdk_logs/. Please note that if this is not set, and RT_LOG_PATH is set, that will be used
KXI_C_SDK_BIN_DIR Optional path of librt_helper.so SDK binary location
REPLICATOR_BIN_DIR Optional KXI_C_SDK_BIN_DIR Replicator binary location
KXI_C_SDK_CURL_VERBOSE Optional none Turn on curl verbose output

Handling changing hostnames

Each publisher has a session name associated with it, to ensure it is uniquely identifiable by kdb Insights. This is physically represented within the C interface as the directory messages are written to before being replicated into kdb Insights.

The C interface sets the session name (and hence the message directory) to the following: SDK_CLIENT_UID.streamid.hostname. This ensures that publishers on different hosts, that are sending different data to the same kdb Insights Stream, can be correctly identified.

SDK_CLIENT_UID

SDK_CLIENT_UID is part of KXI_CONFIG_URL: (https://${INSIGHTS_HOSTNAME}/informationservice/details/${SDK_CLIENT_UID})) - These can be obtained following the steps here outside the cluster

Currently it is not recommended to use the C interface in a scenario where the hostname changes between each instantiation (for example, inside a Docker container that is destroyed and re-created between each run) as a new session will be created for each instantiation.

Publishing data

To publish data:

  1. Create the message
  2. Publish the message

The example below takes a csv record and converts it to a kdb+ object before publishing:

//Create a dummy record
char csv_record[] = "10,2021.01.01D00:00:09.000000000,2021.01.01D00:00:09.000000000,97,1,0";

//Convert the CSV record to a kdb+ data type (this function is available in the sample program)
K k_record = convert_csv_record_to_k_record(schema, csv_record);

//Publish the message 
ksvcrtc_insert(h, "trace", k_record);

Terminating the publisher

Once you have finished publishing messages to RT you must stop the rt_helper:

//Stop the rt_helper
ksvcrtc_stop(h);

Subscribing to data

To subscribe to data:

  1. Create the message callbacks in your application
  2. Subscribe to data from RT using a config file with the subscribe endpoints of RT
  3. Optionally, you could subscribe to the stream from the latest message:
{
    "_comment": "create this JSON config file as /tmp/sub_config.json",
    "useSslRt":false, 
    "name":"test-subscribe",
    "topics":{"subscribe":"<streamid>"},
    "insert": {"subscribe":[":<host1:portnum>",":<host2:portnum>",":<host3:portnum>"]}
}
static int message(void* ctx, rt_pos pos[2], K x)
{
    (void)ctx;
    printf("message pos: %li, len: %i\n",pos[0],(I)xn);
    if(is_kxi_msg(x))
        printf("ts:%lli, to:%i, corr:%lli, orig:\"%s\"\n",xK[0]->j,xK[1]->i,xK[2]->j,xK[3]->s);
    r0(x);
    return 0;
}

static void event(void* ctx, rt_pos pos[2], int type)
{
    (void)ctx;
    char* event[] = {"roll","badmsg","badtail","archived","reset"};
    printf("event pos: %li-%li, type: %s\n",pos[0],pos[1],type<0?"user":event[type]);
}

static int filter_table(void* filter_ctx, const kxi_message_preview* preview)
{
    const char* table = filter_ctx;
    printf("#filter params - ts: %li, to: %i, corr: %li, origin: \"%s\", s0: \"%s\", s1: \"%s\"\n", 
        preview->timestamp, preview->timeout, preview->correlator, preview->origin, 
        preview->payload.s0, preview->payload.s1);
    return !strcmp(table,preview->payload.s1);
}

char configFileName[] = "file:///tmp/sub_config.json";
char tableName[]      = "trace";
void* sub = rt1_subscriber_create();
rt1_callbacks cb = 
    rt1_apply_kxi_filter(
        rt1_create_cb((rt1_callbacks_k){
            .message = message,
            .event = event
        }),
        tableName,         // table name
        filter_table,
        0
    );
rt1_subscribe(sub,(rt1_stream_params){.configUrl=configFileName},pos,cb)

If you want to subscribe to the stream from the latest message merged by the merger, you need to subscribe as follows:

long latest_pos = rt1_get_latest_position((rt1_stream_params){.configUrl=configFileName});
rt1_subscribe(sub,(rt1_stream_params){.configUrl=configFileName},latest_pos,cb);

Terminating the subscriber

Once you have finished subscribing to messages from RT, you must stop the subscriber:

//Stop the subscriber
rt1_subscriber_destroy(sub);