Skip to content

Using the kdb Insights C# Interface

This page describes how to get started with the C# interface.

The C# interface allows you to publish data to RT or subscribe to data from RT in your own .NET application.

The C# language interface is a C# wrapper of the C interface to RT.

Downloading the C# interface

You can download the C# interface from the KX Downloads Portal.

Supported operating systems

The C# interface is supported on the following operating systems:

  • CentOS 8 or later
  • Red Hat Enterprise Linux (RHEL) 8 or later
  • Ubuntu 18.04 or later
  • Windows

Note

The interface is supported only on x86 architectures. Currently, macOS is not supported.

Prerequisites

The C# interface requires the following system and utility packages:

  • libssl
  • libcurl
  • .NET 8 SDK or later
  • CSharpKDB 1.3.0 or later

Installation

You can install the C# interface using NuGet.

  • Install the packages:

    dotnet add package CSharpKDB --version 1.3.0
    dotnet add package RTHelper
    

Using the C# interface in your own application for publishing data

To publish data to an RT stream using the C# interface as part of your own application, use the RtConnection module from the namespace RtHelperNs. This module contains the functionality to connect to and publish the data into the RT stream.

Authentication and Connectivity

kdb Insights Enterprise provides an Information Service which returns a client URL (KXI_CONFIG_URL). This URL is used to authenticate to the kdb Insights Enterprise instance. If you are not using kdb Insights Enterprise you can populate a local JSON file with the connectivity details. Further information on this is available in the Getting Started Guide.

Initializing

  • Create an Rt1StreamParams configuration object. Review the following Sample params objects:

    var parameters = new Rt1StreamParams { ConfigUrl = "file:///tmp/rt_config.json", ClientName = "TestPublisher1"};
    var parameters = new Rt1StreamParams {Prefix="rt-",Stream="data",ClientName="TestPublisher2"}
    

    Refer to the Parameters section for the full list of configuration parameters.

    ConfigUrl

    The ConfigUrl can be either a kdb Insights Enterprise client URL or the path to a local JSON file (using the file URI scheme) containing the RT configuration. Refer to the Getting Started Guide for more details.

Parameters

The RTParams class contains the following RT connection configuration parameters.

Parameter Required Description
ConfigUrl No URL to configuration JSON
ClientName No The client name that the user can assign to their publisher. It's useful when running multiple publishers from a single node/server.
Prefix No The RT stream prefix you want to send data to
Stream No The RT stream name you want to send data to
ClientType No publisher or subscriber
EndPoints No RT endPoints, for example, "hostname1:port,hostname2:port,hostname3:port" Note: These will be used only if config URL is not provided.
RtDir No RT directory
QueryTimeout No Milliseconds to wait for the connection to be established
FetchConfigSleep No Time in ms to sleep between reads of the configuration details
ConfigMaxAge No Maximum age of the configuration in milliseconds
LocalPersistencePeriod No Local persistence period in milliseconds
LogLevel No Log level: info, warn, err, or off
ConsoleLogLevel No Console log level: info, warn, err, or off
CAInfoFile No Path to Certificate Authority certificate bundle
DedupId No ID to dedup multiple publishers

Refer to the C interface documentation for the list of default values.

Refer to the C interface environment variables section for the list of environment variables.

Publishing data

  • Create some data and publish it to RT:

    using System;
    using System.Collections.Generic;
    using RtHelperNs;
    using kx;
    
    
    internal class CSharpKdb : c {} // Instance of kx.c to access Serialize/Deserialize
    
    class OneRecordUpload
    {
      static void Main(string[] args)
      {
        DateTime now = DateTime.UtcNow;
        Guid tradeId = Guid.NewGuid();
    
        object[] data = { new string[] { "hello" }, new int[] { 1234 }, new double[] { 3.14 }, new DateTime[] { now } };
    
        var parameters = new Rt1StreamParams { ClientName = "TestClient1", Stream = "rt-data" };
        using var rt_conn = new RtConnection(parameters);    
        using CSharpKdb csharpKDB = new();
    
        byte[] insertData1 = csharpKDB.Serialize(2, data);
        rt_conn.InsertSerialized("trace", insertData1); 
    
        string[] columnNames = { "time", "sym", "exch", "side", "price", "size", "tradeID" };     
        object[] columns = {
            new DateTime[] { now },
            new string[] { "VOD" },
            new string[] { "LSE" },
            new string[] { "buy" },
            new double[] { 75.90 },
            new int[] { 100 },
            new string[] { tradeId.ToString() }
        };
    
        c.Flip tab = new c.Flip(new c.Dict(columnNames, columns));
        byte[] insertData2 = csharpKDB.Serialize(2, tab);
        rt_conn.InsertSerialized("trace", insertData2); 
    
        Console.WriteLine("Calling stop!");
        rt_conn.Stop();
      }
    }
    

Using the C# interface in your own application for subscribing to data

To subscribe to data from an RT stream using the C# interface as part of your own application, use the RtSubHelper module from the namespace RtSubHelperNs. This module contains the functionality to connect to and subscribe to data from the RT stream.

Authentication and Connectivity

kdb Insights Enterprise provides an Information Service which returns a client URL (KXI_CONFIG_URL). This URL is used to authenticate to the kdb Insights Enterprise instance. If you are not using kdb Insights Enterprise, you can populate a local JSON file with the connectivity details. Further information on this is available in the Getting Started Guide.

Initializing

  • Create the configuration objects StreamInfo, SubscriptionInfo and CallbackInfo. Sample configuration objects below:

    var streamInfo = new StreamInfo
    {
      ConfigUrl = null,
      StreamId = "rt-data",
      TopicPrefix = null,
      Replicas = 3,
      SubPort = 5003,
      ReplPort = 5001,
      FragmentSlots = 10,
      ConnectTimeout = 5000,
      Endpoints = endpoints
    };
    
    var subInfo = new SubscriptionInfo
    {
      ClientName = "csharp_subscriber",
      RtPosition = 0,
      Filter = filter != "NONE" ? filter : null
    };
    
    var cbInfo = new CallbackInfo
    {
      MessageAsDummyKByteArray = false,
      ParallelMessages = 2,
      MessageCb = MessageCb,
      EventCb = EventCb,
      Context = IntPtr.Zero
    };    
    

    Refer to the Parameters section for the full list of configuration parameters.

    ConfigUrl

    The ConfigUrl can be either a kdb Insights Enterprise client URL or the path to a local JSON file (using the file URI scheme) containing the RT configuration. Refer to the Getting Started Guide for more details and example files.

Subscription Parameters

The RTSubParams class in the RtSubHelperNs namespace contains the following RT subscription configuration classes.

StreamInfo
Parameter Required Description
Endpoints No RT endPoints, for example, "hostname1:port,hostname2:port,hostname3:port"
ConfigUrl No URL to configuration JSON
StreamId No Name of the RT stream
TopicPrefix No Prefix of RT stream which is prepended to StreamId
Replicas No Number of RT nodes to connect to
SubPort No Port of the subscriber endpoint(s) on the RT node.
ReplPort No Port of the publisher endpoint(s) on the RT node.
FragmentSlots No Specifies the number of unacknowledged fragments that can be in flight at any time
ConnectTimeout No Timeout in milliseconds when attempting a connection
SubscriptionInfo
Parameter Required Description
ClientName No Name identifying this subscription
RtPosition No Position in the RT stream from which the subscription should start receiving updates.
Filter No Table name filter to apply to the subscription. Use "" to receive all messages
CallbackInfo
Parameter Required Description
MessageAsDummyKByteArray Yes Need to be set to false always
ParallelMessages No If non-zero, the rt_sub_client will buffer the specified number of messages in memory for better parallelism.
MessageCb No Message callback function if non-NULL messages are delivered to this function.
EventCb No Event callback function. If non-NULL events are delivered to this function.
Context No Pointer passed through to the message and event callback functions
Subscription functions

The RTSubHelper class in the RtSubHelperNs namespace contains the following RT subscription methods and callbacks.

InitialiseRT

public bool InitialiseRT();

Initialise the RT subscription interface. This function must be called before a subcription is made.

Subscribe

public RtSubscription(StreamInfo streamInfo, SubscriptionInfo subInfo, CallbackInfo cbInfo);

Subscribe to an RT stream.

Parameter Description
streamInfo Defines the connection details for the RT stream to which the client is subscribing
subInfo Details how the subscription should be made to the RT stream
cbInfo Specifies the callback functions to be used to deliver messages and events to the subscriber
MessageHandled

public bool MessgeHandled(IntPtr subHandle);

Acknowlege that the subscriber has completed processing of the current message.

UpdateEndpoints

public void UpdateEndpoints(string[] endpoints)

Dynamically update the list of endpoints to the RT rt_sub_server running on each RT node on an active subscription.

Parameter Description
endPoints RT endPoints, for example, "hostname1:port,hostname2:port,hostname3:port"
GetEndpoints

public string[] GetEndpoints()

Obtain the current list of RT endpoints.

Stop

public void Stop()

Unsubcribe from an RT stream.

MessageCallback

public delegate void MessageCallback(IntPtr subHandle, ulong rtPos, IntPtr buffer, ulong bufferLen, IntPtr ctx);

Message callback function. Messages are delivered to this function.

Parameter Description
subHandle The handle returned by SubscribeRT() used to identify the subscription
rtPos The position in the RT stream of this message
buffer The message data
bufferLen The length of the message
ctx Pointer passed through to the message and event callback functions
EventCallback

public delegate void EventCallback(IntPtr subHandle, EventType eventType, ulong rtPos, ulong nextPos, IntPtr ctx);

Events are delivered to this function.

Parameter Description
subHandle The handle returned by SubscribeRT() used to identify the subscription
eventType The type of event (refer to EventType below)
rtPos The position in the RT stream where the event occurred
nextPos The next valid position in the RT stream following the event
ctx Pointer passed through to the message and event callback functions
EventType

RT EventType values are defined as an enum in the RTSubHelperNs namespace.

Event type Description
BadTail Indicates RT log corruption
BadMsg Indicates invalid RT message
Reset RT reset event
SkipForward RT skip forward event
LogRoll RT log roll event

Subscribing to data

Here is an example of subscribing to some data from RT:

```C#
using System;
using System.Threading;
using System.Runtime.InteropServices;
using RtSubHelperNs;
using kx;
using System.Linq;
using System.IO;
using System.Text;
using System.Collections;
using System.Collections.Generic;

class SampleSubscriber
{
  static CSharpKdb csharpKDB = new CSharpKdb();

  static void MessageCb(IntPtr handle, ulong rtPos, IntPtr buffer, ulong bufferLen, IntPtr ctx)
  {
    Console.WriteLine($"Message Received - Position: {rtPos}");
    RtSubHelper.MessgeHandled(handle);
  }

  static void EventCb(IntPtr handle, EventType eventType, ulong rtPos, ulong nextPos, IntPtr ctx)
  {
    string eventStr = EventTypeToString(eventType);
    Console.WriteLine($"Event occurred - EventTypeCode: {eventType}, Position: {rtPos}, Next: {nextPos}");
  }

  static void Main(string[] args)
  {
    if (args.Length < 3)
    {
      Console.Error.WriteLine("Usage: <filter> <loglevel> <host:port>...");
      return;
    }

    string filter = args[0];
    string logLevel = args[1];
    string[] endpoints = args[2..];

    RtSubHelper.InitialiseRT();
    RtSubHelper.SetLogLevel(logLevel);

    var streamInfo = new StreamInfo
    {
      ConfigUrl = null,
      StreamId = "rt-data",
      TopicPrefix = null,
      Replicas = 3,
      SubPort = 5003,
      ReplPort = 5001,
      FragmentSlots = 10,
      ConnectTimeout = 5000,
      Endpoints = endpoints
    };

    var subInfo = new SubscriptionInfo
    {
      ClientName = "csharp_subscriber",
      RtPosition = 0,
      Filter = filter != "NONE" ? filter : null
    };

    var cbInfo = new CallbackInfo
    {
      MessageAsDummyKByteArray = false,
      ParallelMessages = 2,
      MessageCb = MessageCb,
      EventCb = EventCb,
      Context = IntPtr.Zero
    };

    using var subscription = new RtSubscription(streamInfo, subInfo, cbInfo);
    Console.WriteLine("Subscribed. Waiting for messages...");

    Thread.Sleep(30000);
    subscription.Stop();
    Console.WriteLine("Unsubscribed.");
    Thread.Sleep(5000);
  }
}
```