Using the kdb Insights C interface
The C interface allows you to publish data to RT and subscribe to data from RT in your own application.
This section details how to get started with the C interface. For samples showing how to use it to publish data to and subscribe to data from a kdb Insights Reliable Transport or kdb Insights Enterprise see the sample programs.
Downloading the C interfaces
You can download the C interfaces from the KX Downloads Portal.
The following contains everything required to use the interface. No installation is required, you can just download the zip file and unzip.
Interface | Details |
---|---|
kxi-c-l64-${VERSION}.zip file |
APIs for publish and subscribe. There is no topic level filtering with this subscription option |
rt-c-sub-l64-${VERSION}.zip |
APIs for subscribe which include topic level filtering |
Topic Filtering
A RT subscriber can subscribe to a subset of topics within a RT stream. When using topic filtering, you do not have log files replicated from the RT server to the client.
Supported Operating Systems
The C interface is supported on the following operating systems:
- CentOS 8 or later
- Red Hat Enterprise Linux (RHEL) 8 or later
- Ubuntu 18.04 or later
Prerequisites
Before building a program that uses the C interface you need:
- C interface
cmake
package
Using the C interface in your own application
Publish
To publish data to kdb Insights Enterprise using the C interface as part of your own application, use the rt_helper
within kxi-c-l64-${VERSION}.zip file
. This reads all the information needed to connect to kdb Insights Enterprise from the variables provided and publishes the data into the system.
Files extracted
├── include
│ └── rt_helper
│ ├── error_codes.h
│ ├── kdb
│ │ ├── k.h
│ │ └── kk.h
│ ├── ksvc_direct_c.h
│ ├── ksvcrt_c.h
│ ├── rt1_helper.h
│ ├── rt1_sub.h
│ ├── rt1_sub_flt.h
│ ├── rt1_sub_kcb.h
│ └── rt_params.h
├── librt_helper.so
├── pull_client_static
├── push_client_static
├── replicator_static.so
└── rt_helper_curl
Do one of the following:
- Extract the details from the sample program
- Follow the sections below that describe how to publish and subscribe to data
An example usage of the C interface to publish and subscribe to the sdk_sample
is as follows:
Pre-requisites (for publishing to kdb Insights Enterprise)
- Download the package
kxi-db
following the instructions here. - Make sure that the package is deployed in your kdb Insights Enterprise instance.
- Ensure you have an authenticated kdb Insights Enterprise configUrl.
- Make sure that the kdb Insights Enterprise ingest endpoints (as defined by the
configUrl
) are accessible.
Initializing (for publishing to kdb Insights Enterprise)
-
Start the
rt_helper
, passing theconfigUrl
as parameter. A list of all parameters is available here. -
Define the schema. For the
sdk_sample
you must define the trace table schema as "sensorID:int,captureTS:ts,readTS:ts,valFloat:float,qual:byte,alarm:byte".
#include <rt_helper/kdb/kk.h>
#include <rt_helper/ksvcrt_c.h>
#include <rt_helper/rt_params.h>
// Initialize params using designated initializer, if using C versions >= C99
rt1_stream_params params = {.configUrl = <value of configUrl>, .logLevel = "info"};
// Alternatively, if using C versions below C99, the provided initialization macro can be used
// rt1_stream_params params = RT1_STREAM_PARAMS_INIT;
// params.configUrl = <value of configUrl>
// params.logLevel = "info"
// ...
// ...
int ret = 0;
void *h = ksvcrtc_start(¶ms, &ret);
//Define the schema
Schema *schema = ParseSchema("sensorID:int,captureTS:ts,readTS:ts,valFloat:float,qual:byte,alarm:byte");
Pre-requisites (for publishing to kdb Insights SDK)
- A kdb Insights SDK deployment running within a Docker compose or Kubernetes environment.
- A publisher host with access to the RT endpoints in the kdb Insights SDK deployment.
Docker compose
and kubernetes
For instructions on deploying kdb Insights SDK using Docker Compose, refer to the Docker Compose Quickstart Guide. For instructions on deploying kdb Insights SDK using Kubernetes, refer to the Kubernetes Quickstart Guide.
Initializing (for publishing to kdb Insights Microservices)
- Start the
rt_helper
, passing theclientName
andstreamName
as parameters. A list of all parameters is available here.
#include <rt_helper/kdb/kk.h>
#include <rt_helper/ksvcrt_c.h>
#include <rt_helper/rt_params.h>
// Initialise params using designated initializer, if using C versions >= C99
rt1_stream_params params = {.clientName = "demo-publisher", .prefix="kxi-", .stream="mystream"};
// Alternatively, if using C versions below C99, the provided initialisation macro can be used
// rt1_stream_params params = RT1_STREAM_PARAMS_INIT;
// params.clientName = "demo-publisher"
// ...
// ...
int ret = 0;
void *h = ksvcrtc_start(¶ms, &ret);
//Define the schema
Schema *schema = ParseSchema("sensorID:int,captureTS:ts,readTS:ts,valFloat:float,qual:byte,alarm:byte");
Parameters
Note
rt1_stream_params variable should be initialized correctly using the designated initializer (for C99 and above) or using the provided initializer macro.
The following parameters are available:
parameter | required | default | description |
---|---|---|---|
configUrl | Optional | none | Specifies the URL or file path that the application uses to identify the necessary endpoint(s) for connection. Note: You must provide either this parameter or both the clientName and streamName parameters. |
clientName | Optional | none | Specifies the name of the client. It can be used in conjunction with prefix and stream as an alternative to configUrl . |
prefix | Optional | none | Spcifies the stream prefix. |
stream | Optional | none | Specifies the name of the stream. It can be used together with prefix and clientName as an alternative to configUrl . |
endPoints | Optional | none | Specifies the RT endpoints. For example, hostname1:port,hostname2:port,hostname3:port . Note: this is only used if configUrl is not provided. |
configMaxAge | Optional | 300000 | Maximum age of configuration details in milliseconds. After this amount of time, if still unable to fetch a new configuration, the connection is considered to be broken and any subsequent attempts to send data to kdb Insights Enterprise will fail with a Not connected error. This is only needed when using the kdb Insights Enterprise Information Service to obtain the configuration details. |
consoleLogLevel | Optional | err | Console log level (info/warn/err/off). |
replFileLogLevel | Optional | off | Replicator file log level (trace/debug/info/warn/err/debug/off). |
replConsoleLogLevel | Optional | err | Replicator console log level (trace/debug/info/warn/err/debug/off). |
fetchConfigSleep | Optional | 5000 | Time in ms to sleep between reads of the configuration details. |
localPersistencePeriod | Optional | 600000 | Local persistence period (in milliseconds). |
logLevel | Optional | info | Log level (info/warn/err/off). |
rtDir | Optional | /tmp/rt | RT directory. |
dedupId | Optional | none | Id to dedup multiple publishers (typically publishing to same stream). |
queryTimeout | Optional | 2000 | Milliseconds to wait for the connection to be established. |
CAInfoFile | Optional | none | File that contains the CA cert bundle. This option is provided to overcome curl CA error on Windows. |
restPortStart | Optional | 0 | If set, REST servers start to listen on this port and subsequent ports with the requests being tunneled through the connection. |
truncateArchived | Optional | 1 | Flag indicating whether archived files should be auto-truncated. |
rtFlushTimeoutSecs | Optional | No timeout | The timeout for trying to flush the local log files to the remote server on closing the SDK. |
logPosition | Optional | 0 | Subscriber start position. |
pauseThreshold | Optional | 0 | Subscriber threshold in GB of destination disk usage that triggers pause or resume of replication. |
exchangeArchived | Optional | 1 | Subscriber flag indicating whether archived events should be sent back to the server. |
Environment variables
The following environment variables are available:
variable | required | default | description |
---|---|---|---|
RT_LOG_PATH | Optional | /tmp/rt | RT directory. |
RT_LOGLEVEL_CONSOLE | Optional | err | Console log level (info/warn/err/off) |
KXI_C_SDK_APP_LOG_PATH | Optional | /tmp/kxi_c_sdk_logs/ |
SDK debug log path. On Windows this will be set to %TEMP%/kxi_c_sdk_logs/ |
KXI_C_SDK_BIN_DIR | Optional | path of librt_helper.so | SDK binary location |
REPLICATOR_BIN_DIR | Optional | KXI_C_SDK_BIN_DIR | Replicator binary location |
KXI_C_SDK_CURL_VERBOSE | Optional | none | Turn on curl verbose output |
Handling changing hostnames
Each publisher has a session name associated with it, to ensure it is uniquely identifiable. This is physically represented within the C interface as the directory messages are written to before being replicated into RT.
The C interface assigns the session name, which also determines the message directory, as clientName.streamname.hostname
. This assignment allows the correct identification of publishers from various hosts sending distinct data to the same kdb Insights Stream.
Currently it is not recommended to use the C interface in a scenario where the hostname changes between each instantiation (for example, inside a Docker container that is destroyed and re-created between each run) as a new session will be created for each instantiation.
Publishing data
To publish data:
- Create the message
- Publish the message
The example below takes a csv record and converts it to a kdb+ object before publishing:
//Create a dummy record
char csv_record[] = "10,2021.01.01D00:00:09.000000000,2021.01.01D00:00:09.000000000,97,1,0";
//Convert the CSV record to a kdb+ data type (this function is available in the sample program)
K k_record = convert_csv_record_to_k_record(schema, csv_record);
//Publish the message
KxiError ret = ksvcrtc_insert(h, "trace", k_record);
Terminating the publisher
Once you have finished publishing messages to RT you must stop the rt_helper
:
//Stop the rt_helper
ksvcrtc_stop(h);
Subscription
To subscribe to kdb Insights Enterprise using the C interface as part of your own application, you can use the rt_helper
within kxi-c-l64-${VERSION}.zip file
or alternatively, you can use the rt-c-sub-l64-${VERSION}.zip
files. These provides all the information needed to connect to kdb Insights SDK and kdb Insights Enterprise from the variables provided and subscribe to updates.
Do one of the following:
- Extract the details from the sample program
- Follow the sections below that describe how to subscribe to data
Subscribing to data
The interface includes APIs to subscribe and unsubscribe. The samples(c-sdk-samples.md) include examples for each.
SubscribeRT
An API to subscribe to RT. You can pass in a stream to connect to, a position in the stream to start receiveing updates and a callback function to execute when your subscriber receives an update There are different ways of defining the endpoints that you wish to connect to. Please refer to the samples for more information.
Topic Filtering
Topic filtering allows you to subscribe to a subset of the data on the RT stream.
UnsubscribeRT
An API to unsubscribe to RT.
To subscribe with the rt_helper
you can connect to RT and receive all of the data in the RT stream. A list of all parameters is available here.
These environment variables and parameters can be set for your subscriber.
To subscribe to data:
- Create the message callbacks in your application
- Subscribe to data from RT using a config file with the subscribe endpoints of RT
- Optionally, you could subscribe to the stream from the latest message:
static int message(void* ctx, rt_pos pos[2], K x)
{
(void)ctx;
printf("message pos: %li, len: %i\n",pos[0],(I)xn);
if(is_kxi_msg(x))
printf("ts:%lli, to:%i, corr:%lli, orig:\"%s\"\n",xK[0]->j,xK[1]->i,xK[2]->j,xK[3]->s);
r0(x);
return 0;
}
static void event(void* ctx, rt_pos pos[2], int type)
{
(void)ctx;
char* event[] = {"roll","badmsg","badtail","archived","reset"};
printf("event pos: %li-%li, type: %s\n",pos[0],pos[1],type<0?"user":event[type]);
}
static int filter_table(void* filter_ctx, const kxi_message_preview* preview)
{
const char* table = filter_ctx;
printf("#filter params - ts: %li, to: %i, corr: %li, origin: \"%s\", s0: \"%s\", s1: \"%s\"\n",
preview->timestamp, preview->timeout, preview->correlator, preview->origin,
preview->payload.s0, preview->payload.s1);
return !strcmp(table,preview->payload.s1);
}
char tableName[] = "trace";
void* sub = rt1_subscriber_create();
rt1_callbacks cb =
rt1_apply_kxi_filter(
rt1_create_cb((rt1_callbacks_k){
.message = message,
.event = event
}),
tableName, // table name
filter_table,
0
);
rt1_subscribe(sub,(rt1_stream_params){.clientName="demo-subscriber", .prefix="kxi-", .stream="mystream"},pos,cb)
If you want to subscribe to the stream from the latest message merged by the merger, you need to subscribe as follows:
long latest_pos = rt1_get_latest_position((rt1_stream_params){.clientName="demo-subscriber", .prefix="kxi-", .stream="mystream"});
rt1_subscribe(sub,(rt1_stream_params){.clientName="demo-subscriber", .prefix="kxi-", .stream="mystream"},latest_pos,cb);
Terminating the subscriber
Once you have finished subscribing to messages from RT, you must stop the subscriber
:
//Stop the subscriber
rt1_subscriber_destroy(sub);
Error Codes
The error codes are defined in the header file error_codes.h
. There is also a utility function (kxi_error_to_string
) that translates them to strings.
Enum | Numeric Value | Description |
---|---|---|
KXI_SUCCESS | 0 | Success |
KXI_EACCESS | 13 | Permission denied |
KXI_UNEXPECTED_FAILURE | -1 | Unexpected failure |
KXI_APPLICATION | -2 | Application error |
KXI_INVALID_CONFIG | -3 | Invalid configuration |
KXI_RUNTIME_ERROR | -4 | Runtime error |
KXI_TIMEOUT | -5 | Operation timed out |
KXI_ALLOC_FAILED | -6 | Memory allocation failed |
KXI_INVALID_ARGUMENTS | -7 | Invalid arguments |
KXI_REST_PROXY_FAILED | -8 | REST proxy failed |
KXI_NETWORK_ERROR | -9 | Network error |
KXI_KERROR | -10 | KDB error |