Using the kdb Insights C interface
The C interface allows you to publish data to RT and subscribe to data from RT in your own application.
This section details how to get started with the C interface. For samples showing how to use it to publish data to and subscribe to data from a kdb Insights Reliable Transport or kdb Insights Enterprise see the sample program.
Downloading the C interface
You can download the C interface from the kdb Insights Nexus registry.
The kxi-c-sdk-${VERSION}.zip file contains everything required to use the interface. No installation is required, you can just download the zip file and unzip.
Supported Operating Systems
The C interface is supported on the following operating systems:
- CentOS 8 or later
 - Red Hat Enterprise Linux (RHEL) 8 or later
 - Ubuntu 18.04 or later
 - Windows
 
Note
The interface is only supported on Linux and Windows running on x64 architectures.
Files extracted
├── include
│   └── rt_helper
│       ├── error_codes.h
│       ├── kdb
│       │   ├── k.h
│       │   └── kk.h
│       ├── ksvc_direct_c.h
│       ├── ksvcrt_c.h
│       ├── rt1_helper.h
│       ├── rt1_sub.h
│       ├── rt1_sub_flt.h
│       ├── rt1_sub_kcb.h
│       └── rt_params.h
├── librt_helper.so
├── pull_client_static
├── push_client_static
├── replicator_static.so
└── rt_helper_app
Prerequisites
Before building a program that uses the C interface you need:
- C interface
 cmakepackage
Using the C interface in your own application
To publish data to kdb Insights Enterprise using the C interface as part of your own application, use the rt_helper. This reads all the information needed to connect to kdb Insights Enterprise from the variables provided and publishes the data into the system.
Do one of the following:
- Extract the details from the sample program
 - Follow the sections below that describe how to publish and subscribe to data
 
Sample illustration
An example usage of the C interface to publish and subscribe to the sdk_sample_assembly is as follows:
Pre-requisites
- Download the assembly 
sdk_sample_assemblyfollowing the instructions here. - Make sure that the assembly is deployed in your kdb Insights Enterprise instance.
 - Ensure you have an authenticated kdb Insights Enterprise client URL.
 - Make sure that the kdb Insights Enterprise ingest endpoints (as defined by the 
KXI_CONFIG_URL) are accessible. 
Initializing
- 
Start the
rt_helper, passing theKXI_CONFIG_URLand logging level as parameters. A list of all parameters is available here. - 
Define the schema. For the
sdk_sample_assemblyyou must define the trace table schema as "sensorID:int,captureTS:ts,readTS:ts,valFloat:float,qual:byte,alarm:byte". 
#include <rt_helper/kdb/kk.h>
#include <rt_helper/ksvcrt_c.h>
#include <rt_helper/rt_params.h>
RT_Params params = {.configUrl = "{KXI_CONFIG_URL}", .logLevel = "info"};
void *h = ksvcrtc_start(¶ms);
//Define the schema
Schema *schema = ParseSchema("sensorID:int,captureTS:ts,readTS:ts,valFloat:float,qual:byte,alarm:byte");
Parameters
The following parameters are available:
| parameter | required | default | description | 
|---|---|---|---|
| configUrl | Mandatory | none | The URL or file that this program calls or reads to find the endpoint(s) it needs to connect to. | 
| configMaxAge | Optional | 300000 | Maximum age of configuration details in milliseconds. After this amount of time, if still unable to fetch a new configuration, the connection is considered to be broken and any subsequent attempts to send data to kdb Insights Enterprise will fail with a Not connected error. This is only needed when using the kdb Insights Enterprise Information Service to obtain the configuration details. | 
| consoleLogLevel | Optional | err | Console log level (info/warn/err/off). | 
| replFileLogLevel | Optional | off | Replicator file log level (trace/debug/info/warn/err/debug/off). | 
| replConsoleLogLevel | Optional | err | Replicator console log level (trace/debug/info/warn/err/debug/off). | 
| fetchConfigSleep | Optional | 5000 | Time in ms to sleep between reads of the configuration details. | 
| localPersistencePeriod | Optional | 600000 | Local persistence period (in milliseconds). | 
| logLevel | Optional | info | Log level (info/warn/err/off). | 
| rtDir | Optional | /tmp/rt | RT directory. | 
| dedupId | Optional | none | Id to dedup multiple publishers (typically publishing to same stream). | 
| queryTimeout | Optional | 2000 | Milliseconds to wait for the connection to be established. | 
| CAInfoFile | Optional | none | File that contains the CA cert bundle. This option is provided to overcome curl CA error on Windows. | 
| restPortStart | Optional | 0 | If set, REST servers start to listen on this port and subsequent ports with the requests being tunneled through the connection. | 
| truncateArchived | Optional | 1 | Flag indicating whether archived files should be auto-truncated. | 
| rtFlushTimeoutSecs | Optional | No timeout | The timeout for trying to flush the local log files to the remote server on closing the SDK. | 
| logPosition | Optional | 0 | Subscriber start position. | 
| pauseThreshold | Optional | 0 | Subscriber threshold in GB of destination disk usage that triggers pause or resume of replication. | 
| exchangeArchived | Optional | 1 | Subscriber flag indicating whether archived events should be sent back to the server. | 
configUrl
The configUrl parameter can be set to:
- `KXI_CONFIG_URL` to access _kdb Insights Enterprise_ from [outside the cluster](getting-started-sdks.md#from-outside-the-cluster)
- `file:///location/config_file_name` to use a local file to access _kdb Insights Enterprise_ or a _kdb Insights RT Microservice_ from [inside the cluster](getting-started-sdks.md#from-inside-the-cluster)
Environment variables
The following environment variables are available:
| variable | required | default | description | 
|---|---|---|---|
| RT_LOG_PATH | Optional | /tmp/rt | RT directory. | 
| RT_LOGLEVEL_CONSOLE | Optional | err | Console log level (info/warn/err/off) | 
| KXI_C_SDK_APP_LOG_PATH | Optional | /tmp/kxi_c_sdk_logs/ | 
SDK debug log path. On Windows this will be set to %TEMP%/kxi_c_sdk_logs/ | 
| KXI_C_SDK_BIN_DIR | Optional | path of librt_helper.so | SDK binary location | 
| REPLICATOR_BIN_DIR | Optional | KXI_C_SDK_BIN_DIR | Replicator binary location | 
| KXI_C_SDK_CURL_VERBOSE | Optional | none | Turn on curl verbose output | 
Handling changing hostnames
Each publisher has a session name associated with it, to ensure it is uniquely identifiable. This is physically represented within the C interface as the directory messages are written to before being replicated into RT.
The C interface sets the session name (and hence the message directory) to the following: SDK_CLIENT_UID.streamid.hostname. This ensures that publishers on different hosts, that are sending different data to the same kdb Insights Stream, can be correctly identified.
SDK_CLIENT_UID
SDK_CLIENT_UID is part of KXI_CONFIG_URL (https://${INSIGHTS_HOSTNAME}/informationservice/details/${SDK_CLIENT_UID})
- These can be obtained following the steps here outside the cluster
 
Currently it is not recommended to use the C interface in a scenario where the hostname changes between each instantiation (for example, inside a Docker container that is destroyed and re-created between each run) as a new session will be created for each instantiation.
Publishing data
To publish data:
- Create the message
 - Publish the message
 
The example below takes a csv record and converts it to a kdb+ object before publishing:
//Create a dummy record
char csv_record[] = "10,2021.01.01D00:00:09.000000000,2021.01.01D00:00:09.000000000,97,1,0";
//Convert the CSV record to a kdb+ data type (this function is available in the sample program)
K k_record = convert_csv_record_to_k_record(schema, csv_record);
//Publish the message
ksvcrtc_insert(h, "trace", k_record);
Terminating the publisher
Once you have finished publishing messages to RT you must stop the rt_helper:
//Stop the rt_helper
ksvcrtc_stop(h);
Subscribing to data
To subscribe to data:
- Create the message callbacks in your application
 - Subscribe to data from RT using a config file with the subscribe endpoints of RT
 - Optionally, you could subscribe to the stream from the latest message:
 
{
    "_comment": "create this JSON config file as /tmp/sub_config.json",
    "useSslRt":false,
    "name":"test-subscribe",
    "topics":{"subscribe":"<streamid>"},
    "subscribe": {"subscribe":[":<host1:portnum>",":<host2:portnum>",":<host3:portnum>"]}
}
static int message(void* ctx, rt_pos pos[2], K x)
{
    (void)ctx;
    printf("message pos: %li, len: %i\n",pos[0],(I)xn);
    if(is_kxi_msg(x))
        printf("ts:%lli, to:%i, corr:%lli, orig:\"%s\"\n",xK[0]->j,xK[1]->i,xK[2]->j,xK[3]->s);
    r0(x);
    return 0;
}
static void event(void* ctx, rt_pos pos[2], int type)
{
    (void)ctx;
    char* event[] = {"roll","badmsg","badtail","archived","reset"};
    printf("event pos: %li-%li, type: %s\n",pos[0],pos[1],type<0?"user":event[type]);
}
static int filter_table(void* filter_ctx, const kxi_message_preview* preview)
{
    const char* table = filter_ctx;
    printf("#filter params - ts: %li, to: %i, corr: %li, origin: \"%s\", s0: \"%s\", s1: \"%s\"\n",
        preview->timestamp, preview->timeout, preview->correlator, preview->origin,
        preview->payload.s0, preview->payload.s1);
    return !strcmp(table,preview->payload.s1);
}
char configFileName[] = "file:///tmp/sub_config.json";
char tableName[]      = "trace";
void* sub = rt1_subscriber_create();
rt1_callbacks cb =
    rt1_apply_kxi_filter(
        rt1_create_cb((rt1_callbacks_k){
            .message = message,
            .event = event
        }),
        tableName,         // table name
        filter_table,
        0
    );
rt1_subscribe(sub,(rt1_stream_params){.configUrl=configFileName},pos,cb)
If you want to subscribe to the stream from the latest message merged by the merger, you need to subscribe as follows:
long latest_pos = rt1_get_latest_position((rt1_stream_params){.configUrl=configFileName});
rt1_subscribe(sub,(rt1_stream_params){.configUrl=configFileName},latest_pos,cb);
Terminating the subscriber
Once you have finished subscribing to messages from RT, you must stop the subscriber:
//Stop the subscriber
rt1_subscriber_destroy(sub);