Using the kdb Insights C# Interface
This page describes how to get started with the C# interface.
The C# interface allows you to publish data to RT or subscribe to data from RT in your own .NET application.
The C# language interface is a C# wrapper of the C interface to RT.
Downloading the C# interface
You can download the C# interface from the KX Downloads Portal.
Supported operating systems
The C# interface is supported on the following operating systems:
- CentOS 8 or later
- Red Hat Enterprise Linux (RHEL) 8 or later
- Ubuntu 18.04 or later
- Windows
Note
The interface is supported only on x86 architectures. Currently, macOS is not supported.
Prerequisites
The C# interface requires the following system and utility packages:
libssl
libcurl
.NET 8 SDK or later
CSharpKDB 1.3.0 or later
Installation
You can install the C# interface using NuGet
.
-
Install the packages:
dotnet add package CSharpKDB --version 1.3.0 dotnet add package RTHelper
Using the C# interface in your own application for publishing data
To publish data to an RT stream using the C# interface as part of your own application, use the RtConnection
module from the namespace RtHelperNs
. This module contains the functionality to connect to and publish the data into the RT stream.
Authentication and Connectivity
kdb Insights Enterprise provides an Information Service which returns a client URL (KXI_CONFIG_URL
).
This URL is used to authenticate to the kdb Insights Enterprise instance. If you are not using kdb Insights Enterprise you can populate a local JSON file with the connectivity details. Further information on this is available in the Getting Started Guide.
Initializing
-
Create an
Rt1StreamParams
configuration object. Review the following Sample params objects:var parameters = new Rt1StreamParams { ConfigUrl = "file:///tmp/rt_config.json", ClientName = "TestPublisher1"}; var parameters = new Rt1StreamParams {Prefix="rt-",Stream="data",ClientName="TestPublisher2"}
Refer to the Parameters section for the full list of configuration parameters.
ConfigUrl
The
ConfigUrl
can be either a kdb Insights Enterprise client URL or the path to a local JSON file (using the file URI scheme) containing the RT configuration. Refer to the Getting Started Guide for more details.
Parameters
The RTParams
class contains the following RT connection configuration parameters.
Parameter | Required | Description |
---|---|---|
ConfigUrl |
No | URL to configuration JSON |
ClientName |
No | The client name that the user can assign to their publisher. It's useful when running multiple publishers from a single node/server. |
Prefix |
No | The RT stream prefix you want to send data to |
Stream |
No | The RT stream name you want to send data to |
ClientType |
No | publisher or subscriber |
EndPoints |
No | RT endPoints, for example, "hostname1:port,hostname2:port,hostname3:port" Note: These will be used only if config URL is not provided. |
RtDir |
No | RT directory |
QueryTimeout |
No | Milliseconds to wait for the connection to be established |
FetchConfigSleep |
No | Time in ms to sleep between reads of the configuration details |
ConfigMaxAge |
No | Maximum age of the configuration in milliseconds |
LocalPersistencePeriod |
No | Local persistence period in milliseconds |
LogLevel |
No | Log level: info , warn , err , or off |
ConsoleLogLevel |
No | Console log level: info , warn , err , or off |
CAInfoFile |
No | Path to Certificate Authority certificate bundle |
DedupId |
No | ID to dedup multiple publishers |
Refer to the C interface documentation for the list of default values.
Refer to the C interface environment variables section for the list of environment variables.
Publishing data
-
Create some data and publish it to RT:
using System; using System.Collections.Generic; using RtHelperNs; using kx; internal class CSharpKdb : c {} // Instance of kx.c to access Serialize/Deserialize class OneRecordUpload { static void Main(string[] args) { DateTime now = DateTime.UtcNow; Guid tradeId = Guid.NewGuid(); object[] data = { new string[] { "hello" }, new int[] { 1234 }, new double[] { 3.14 }, new DateTime[] { now } }; var parameters = new Rt1StreamParams { ClientName = "TestClient1", Stream = "rt-data" }; using var rt_conn = new RtConnection(parameters); using CSharpKdb csharpKDB = new(); byte[] insertData1 = csharpKDB.Serialize(2, data); rt_conn.InsertSerialized("trace", insertData1); string[] columnNames = { "time", "sym", "exch", "side", "price", "size", "tradeID" }; object[] columns = { new DateTime[] { now }, new string[] { "VOD" }, new string[] { "LSE" }, new string[] { "buy" }, new double[] { 75.90 }, new int[] { 100 }, new string[] { tradeId.ToString() } }; c.Flip tab = new c.Flip(new c.Dict(columnNames, columns)); byte[] insertData2 = csharpKDB.Serialize(2, tab); rt_conn.InsertSerialized("trace", insertData2); Console.WriteLine("Calling stop!"); rt_conn.Stop(); } }
Using the C# interface in your own application for subscribing to data
To subscribe to data from an RT stream using the C# interface as part of your own application, use the RtSubHelper
module from the namespace RtSubHelperNs
. This module contains the functionality to connect to and subscribe to data from the RT stream.
Authentication and Connectivity
kdb Insights Enterprise provides an Information Service which returns a client URL (KXI_CONFIG_URL
).
This URL is used to authenticate to the kdb Insights Enterprise instance. If you are not using kdb Insights Enterprise, you can populate a local JSON file with the connectivity details. Further information on this is available in the Getting Started Guide.
Initializing
-
Create the configuration objects
StreamInfo
,SubscriptionInfo
andCallbackInfo
. Sample configuration objects below:var streamInfo = new StreamInfo { ConfigUrl = null, StreamId = "rt-data", TopicPrefix = null, Replicas = 3, SubPort = 5003, ReplPort = 5001, FragmentSlots = 10, ConnectTimeout = 5000, Endpoints = endpoints }; var subInfo = new SubscriptionInfo { ClientName = "csharp_subscriber", RtPosition = 0, Filter = filter != "NONE" ? filter : null }; var cbInfo = new CallbackInfo { MessageAsDummyKByteArray = false, ParallelMessages = 2, MessageCb = MessageCb, EventCb = EventCb, Context = IntPtr.Zero };
Refer to the Parameters section for the full list of configuration parameters.
ConfigUrl
The
ConfigUrl
can be either a kdb Insights Enterprise client URL or the path to a local JSON file (using the file URI scheme) containing the RT configuration. Refer to the Getting Started Guide for more details and example files.
Subscription Parameters
The RTSubParams
class in the RtSubHelperNs
namespace contains the following RT subscription configuration classes.
StreamInfo
Parameter | Required | Description |
---|---|---|
Endpoints |
No | RT endPoints, for example, "hostname1:port,hostname2:port,hostname3:port" |
ConfigUrl |
No | URL to configuration JSON |
StreamId |
No | Name of the RT stream |
TopicPrefix |
No | Prefix of RT stream which is prepended to StreamId |
Replicas |
No | Number of RT nodes to connect to |
SubPort |
No | Port of the subscriber endpoint(s) on the RT node. |
ReplPort |
No | Port of the publisher endpoint(s) on the RT node. |
FragmentSlots |
No | Specifies the number of unacknowledged fragments that can be in flight at any time |
ConnectTimeout |
No | Timeout in milliseconds when attempting a connection |
SubscriptionInfo
Parameter | Required | Description |
---|---|---|
ClientName |
No | Name identifying this subscription |
RtPosition |
No | Position in the RT stream from which the subscription should start receiving updates. |
Filter |
No | Table name filter to apply to the subscription. Use "" to receive all messages |
CallbackInfo
Parameter | Required | Description |
---|---|---|
MessageAsDummyKByteArray |
Yes | Need to be set to false always |
ParallelMessages |
No | If non-zero, the rt_sub_client will buffer the specified number of messages in memory for better parallelism. |
MessageCb |
No | Message callback function if non-NULL messages are delivered to this function. |
EventCb |
No | Event callback function. If non-NULL events are delivered to this function. |
Context |
No | Pointer passed through to the message and event callback functions |
Subscription functions
The RTSubHelper
class in the RtSubHelperNs
namespace contains the following RT subscription methods and callbacks.
InitialiseRT
public bool InitialiseRT();
Initialise the RT subscription interface. This function must be called before a subcription is made.
Subscribe
public RtSubscription(StreamInfo streamInfo, SubscriptionInfo subInfo, CallbackInfo cbInfo);
Subscribe to an RT stream.
Parameter | Description |
---|---|
streamInfo |
Defines the connection details for the RT stream to which the client is subscribing |
subInfo |
Details how the subscription should be made to the RT stream |
cbInfo |
Specifies the callback functions to be used to deliver messages and events to the subscriber |
MessageHandled
public bool MessgeHandled(IntPtr subHandle);
Acknowlege that the subscriber has completed processing of the current message.
UpdateEndpoints
public void UpdateEndpoints(string[] endpoints)
Dynamically update the list of endpoints to the RT rt_sub_server running on each RT node on an active subscription.
Parameter | Description |
---|---|
endPoints |
RT endPoints, for example, "hostname1:port,hostname2:port,hostname3:port" |
GetEndpoints
public string[] GetEndpoints()
Obtain the current list of RT endpoints.
Stop
public void Stop()
Unsubcribe from an RT stream.
MessageCallback
public delegate void MessageCallback(IntPtr subHandle, ulong rtPos, IntPtr buffer, ulong bufferLen, IntPtr ctx);
Message callback function. Messages are delivered to this function.
Parameter | Description |
---|---|
subHandle |
The handle returned by SubscribeRT() used to identify the subscription |
rtPos |
The position in the RT stream of this message |
buffer |
The message data |
bufferLen |
The length of the message |
ctx |
Pointer passed through to the message and event callback functions |
EventCallback
public delegate void EventCallback(IntPtr subHandle, EventType eventType, ulong rtPos, ulong nextPos, IntPtr ctx);
Events are delivered to this function.
Parameter | Description |
---|---|
subHandle |
The handle returned by SubscribeRT() used to identify the subscription |
eventType |
The type of event (refer to EventType below) |
rtPos |
The position in the RT stream where the event occurred |
nextPos |
The next valid position in the RT stream following the event |
ctx |
Pointer passed through to the message and event callback functions |
EventType
RT EventType values are defined as an enum in the RTSubHelperNs
namespace.
Event type | Description |
---|---|
BadTail |
Indicates RT log corruption |
BadMsg |
Indicates invalid RT message |
Reset |
RT reset event |
SkipForward |
RT skip forward event |
LogRoll |
RT log roll event |
Subscribing to data
Here is an example of subscribing to some data from RT:
```C#
using System;
using System.Threading;
using System.Runtime.InteropServices;
using RtSubHelperNs;
using kx;
using System.Linq;
using System.IO;
using System.Text;
using System.Collections;
using System.Collections.Generic;
class SampleSubscriber
{
static CSharpKdb csharpKDB = new CSharpKdb();
static void MessageCb(IntPtr handle, ulong rtPos, IntPtr buffer, ulong bufferLen, IntPtr ctx)
{
Console.WriteLine($"Message Received - Position: {rtPos}");
RtSubHelper.MessgeHandled(handle);
}
static void EventCb(IntPtr handle, EventType eventType, ulong rtPos, ulong nextPos, IntPtr ctx)
{
string eventStr = EventTypeToString(eventType);
Console.WriteLine($"Event occurred - EventTypeCode: {eventType}, Position: {rtPos}, Next: {nextPos}");
}
static void Main(string[] args)
{
if (args.Length < 3)
{
Console.Error.WriteLine("Usage: <filter> <loglevel> <host:port>...");
return;
}
string filter = args[0];
string logLevel = args[1];
string[] endpoints = args[2..];
RtSubHelper.InitialiseRT();
RtSubHelper.SetLogLevel(logLevel);
var streamInfo = new StreamInfo
{
ConfigUrl = null,
StreamId = "rt-data",
TopicPrefix = null,
Replicas = 3,
SubPort = 5003,
ReplPort = 5001,
FragmentSlots = 10,
ConnectTimeout = 5000,
Endpoints = endpoints
};
var subInfo = new SubscriptionInfo
{
ClientName = "csharp_subscriber",
RtPosition = 0,
Filter = filter != "NONE" ? filter : null
};
var cbInfo = new CallbackInfo
{
MessageAsDummyKByteArray = false,
ParallelMessages = 2,
MessageCb = MessageCb,
EventCb = EventCb,
Context = IntPtr.Zero
};
using var subscription = new RtSubscription(streamInfo, subInfo, cbInfo);
Console.WriteLine("Subscribed. Waiting for messages...");
Thread.Sleep(30000);
subscription.Stop();
Console.WriteLine("Unsubscribed.");
Thread.Sleep(5000);
}
}
```