Kx Stream C# .NET API

The Kx Stream C# API offers programmable access via the C# programming language to core features available as part of Kx Platform. This API extends the Kx core C# interface aimed at providing a higher-level service- based access to key features available as part of Kx Platform. This includes features such as:

Developing with the Kx Stream C# API

The key libraries required to build a C# application with the C# Stream API are available in the a number of DeltaCsApi*.zip files. There are both 32-bit and 64-bit versions of two packages:

  • DeltaCsAPICore provides the DLL libraries for implementing an application on top of the C# Stream API
  • DeltaCsAPIDemo provides a Visual Studio demo solution with examples of integrating with the Kx Platform services

DeltaCsAPICore.zip

The file contains two key DLL libraries:

  • DeltaApiCore.dll is the main DLL required to utilize features of the C# Stream API.
  • Newtonsoft.Json.dll is a dependency of the DeltaApiCore providing JSON support to the C# Stream API

Both these should be imported as references to the application to allow interfacing with the Kx Stream services within the Kx Platform.

DeltaCsAPIDemo.zip

To help provide examples of usage, the file contains source code and a DeltaApiDemo.sln solution which can be imported to Visual Studio and run highlighting the various services available.

To run the Demo applcation from within Visual Studio

  • Open the DeltaApiDemo solution
  • Update the App.config file to point to the Kx Platform environment setting appropriate Kx Control details.
  • Depending on the demo class being executed, a number of additional config values should be updates within the App.config file.
<appSettings>
  <add key="DC_HOST" value="<host>" />
  <add key="DC_PORT" value="<port>" />
  <add key="DC_HOST_SECONDARY" value="" />
  <add key="DC_PORT_SECONDARY" value="" />
  <add key="DC_TLS" value="false" />
  <add key="USER" value="<user>" />
  <add key="PASS" value="<password>" />
  <add key="THIS_INSTANCE_NAME" value="<instanceName>" />
  <add key="ANOTHER_INSTANCE_NAME" value="<instanceName>" />
  <add key="CONNECTION_GROUP_NAME" value="<connectionName>" />
  <add key="SCHEMA_TABLE_GROUP_NAME" value="<SchemaGroupName>" />
  <add key="CONFIG_NAME" value="<configName>" />
  <add key="CONFIG_NAME_OVERRIDE" value="<overrideName>" />
  <add key="CONFIG_NAME_CLONE_SOURCE" value="<configName>" />
  <add key="CONFIG_NAME_CLONE_TARGET" value="<configName>" />
  <add key="CONFIG_NAME_CLONE_OVERRIDE" value="DEFAULT" />
  <add key="QR_QUERY_USER" value="Administrator" />
  <add key="QR_QUERY_TARGET_DB" value="<databaseName>" />
  <add key="DMS_CFG_NAME" value="DS_MESSAGING_SERVER:DS" />
</appSettings>

A number of main classes can be run as part of the demo.

class description
DMSServiceDemo messaging and data discovery service
DeltaAPIServiceDemo public query API
QueryRouterServiceDemo Query Router service
StreamingAnalyticServiceDemo Streaming Analytic API

Right-click the DeltacsAPIDemo project, select properties and update the startup project to run the appropriate demo

DeltaCsAPIDemo application config

The App.config allows for a user configuration to be used within the Demo application. A number of key configurations are used with each service demo.

key description
nlog.config configuration file
DC_HOST hostname of the primary Kx Control process
DC_PORT port of the primary Kx Control process
DC_HOST_SECONDARY hostname of the secondary Kx Control process when deployed in a cluster. Can be blank if not in a cluster
DC_PORT_SECONDARY port of the secondary Kx Control process when deployed in a cluster. Can be blank if not in a cluster
DC_TLS whether a TLS connection to Kx Control should be used. Requires TLS enabled on Kx Control
USER authorized user to connect as
PASS user password
THIS_INSTANCE_NAME name of this running instance: reported to Kx Control, and used to allow monitoring of status of the process; and ensures uniqueness of a instance
ANOTHER_INSTANCE_NAME used as an example of how to look up information of another instance via the Configuration Management Service
CONNECTION_GROUP_NAME used as an example of how to look up Connection Group information via the Configuration Management Service
SCHEMA_TABLE_GROUP_NAME used as an example of how to look up Schema Group information via the Configuration Management Service
CONFIG_NAME used as an example of how to look up Configuration Parameter information via the Configuration Management Service
CONFIG_NAME_OVERRIDE used as an example of how to look up Config Parameter override information via the Configuration Management Service
CONFIG_NAME_CLONE_SOURCE used as an example of create and update Config Parameter information via the Configuration Management Service
CONFIG_NAME_CLONE_TARGET used as an example of create and update Config Parameter information via the Configuration Management Service
CONFIG_NAME_CLONE_OVERRIDE used as an example of create and update Config Parameter information via the Configuration Management Service
QR_QUERY_USER used as part of the Query Router Service to highlight how queries can be executed as specific users
QR_QUERY_TARGET_DB used as part of the Query Router Service to highlight how queries are targeted to specific processes
DMS_CFG_NAME key configuration within the Messaging and data discovery service to point the C# Stream API at the environment DMS process

DeltaCsAPIDemo logging config

The DeltaCsAPIDemo uses nlog for logging purposes with configuration defined in the nlog.config file. By default logger will output to trace.log, debug.log, info.log, warn.log and error.log files depending on log level.

See https://nlog-project.org/ for further details.

<?xml version="1.0" encoding="utf-8" ?>
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >
  <targets>
    <target name="traceLogger"
            xsi:type="File"
            layout="${longdate} [${threadid}] ${level:uppercase=true} ${logger} - ${message}"
            fileName="logs\trace.log"
            archiveAboveSize="10485760"
            maxArchiveFiles="10"
            archiveNumbering="Sequence"
            concurrentWrites="true"
            keepFileOpen="false"/>
    <target name="debugLogger"
            xsi:type="File"
            layout="${longdate} [${threadid}] ${level:uppercase=true} ${logger} - ${message}"
            fileName="logs\debug.log"
            archiveAboveSize="10485760"
            maxArchiveFiles="10"
            archiveNumbering="Sequence"
            concurrentWrites="true"
            keepFileOpen="false"/>
    <target name="infoLogger"
            xsi:type="File"
            layout="${longdate} [${threadid}] ${level:uppercase=true} ${logger} - ${message}"
            fileName="logs\info.log"
            archiveAboveSize="10485760"
            maxArchiveFiles="10"
            archiveNumbering="Sequence"
            concurrentWrites="true"
            keepFileOpen="false"/>
    <target name="warnLogger"
            xsi:type="File"
            layout="${longdate} [${threadid}] ${level:uppercase=true} ${logger} - ${message}"
            fileName="logs\warn.log"
            archiveAboveSize="10485760"
            maxArchiveFiles="10"
            archiveNumbering="Sequence"
            concurrentWrites="true"
            keepFileOpen="false"/>
    <target name="errorLogger"
            xsi:type="File"
            layout="${longdate} [${threadid}] ${level:uppercase=true} ${logger} - ${message}"
            fileName="logs\error.log"
            archiveAboveSize="10485760"
            maxArchiveFiles="10"
            archiveNumbering="Sequence"
            concurrentWrites="true"
            keepFileOpen="false"/>
  </targets>

  <rules>
    <logger name="*"
            minlevel="Trace"
            writeTo="traceLogger"/>
    <logger name="*"
            minlevel="Debug"
            writeTo="debugLogger"/>
    <logger name="*"
            minlevel="Info"
            writeTo="infoLogger"/>
    <logger name="*"
            minlevel="Warn"
            writeTo="warnLogger"/>
    <logger name="*"
            minlevel="Error"
            writeTo="errorLogger"/>
  </rules>
</nlog>

Creating a Kx Stream instance

When writing an appliction to interface with the Kx Platform it is necessary to use the core C# Stream API components to create the connection; initialize services and execute the actions.

The main entry point to the C# Kx Stream API is an instance of IDeltaStream. This is implemented by the DefaultDeltaService entity.

DefaultDeltaService contains a default implementation for all core components.

component component purpose
DcConnection IDcPrclConnection Core connectivity to Delta Control
ConfigManager IConfigurationManager For managing configuration of connections, processes etc. in Delta Control
QueryRouter IDeltaQueryRouter A feature to queue and dispatch requests across database resources, taking availability into account
DeltaApiService IDeltaApiService Managed calling of public analytics
DeltaMessagingServer IDeltaMessagingServer Message publication
StreamingQmConnection IStreamingQmConnection For receiving data from streaming analytics

IDeltaService extends the IDisposable interface so the library core, and components can be created and destroyed, using the C# using pattern, or by instantiating and manually destroying using the Dispose method.

A creation of a single DeltaStream can be achieved by

HostPort[] controlCluster = new HostPort[3];
controlCluster[0] = new HostPort(host1, 2001, false);
controlCluster[1] = new HostPort(host2, 2001, false);
controlCluster[2] = new HostPort(host3, 2001, false);
using(
        IDeltaService deltaService = new DefaultDeltaService
        (
            controlCluster, USER_PASS, 
                THIS_INSTANCE_NAME, dmsCfgName: "DS_MESSAGING_SERVER:DS",
            isExclusive: true
        )
    )
    {
        deltaService.Start(TimeSpan.FromSeconds(5));

        // Excecute Service logic
        ....
    }

Kx Control connectivity

Kx Control connectivity is key to enable all services available within the Kx Stream API interface. If configured correctly as part of the IDeltaService this should automatically happen as part of the internal workflow of the IDeltaService.Start method.

DcConnection provides a number of relevant features e.g. events, and methods:

event or
method
purpose
Start Starts the connection to DC, including registration of this process with DC. Should be called automatically by IDeltaService.Start.
Stop Stops the connection to DC. Should be called automatically by IDeltaService.Stop.
CurrentHandle The ID of this process in DC following successful registration. Corresponds to the Handle in Kx Control UI Process Library Status View window.
Connected An event whose handlers are called once a connection has been established, but before registration of this process completes.
Registration An event whose handlers are called once process registration completes.
Disconnected An event whose handlers are called when a connection is lost. (This implies any process registration is also lost.)

Handlers for any of the above events should generally be set up before IDeltaService.Start (or even DcConnection.Start) is called, otherwise the relevant event may be triggered before the handler is in place to use it. A Registration handler, for example, could be used to set some shared waitable object instance, thereby pausing program execution for at most X seconds, otherwise continuing immediately when registration completes. This would be a better alternative to a fixed delay after calling Start, where with or without registration, the fixed delay is always present

private ManualResetEventSlim WAITING_FOR_REGISTRATION =
    new ManualResetEventSlim(false);
private ManualResetEventSlim WAITING_FOR_SERVICES =
    new ManualResetEventSlim(false);

...
...

void run()
{
    using (
            IDeltaService deltaService = new DefaultDeltaService(
                controlCluster, USER_PASS, THIS_INSTANCE_NAME,
                    dmsCfgName: DMS_CFG_NAME
                ),
            isExclusive: true
        )
        {
            deltaService.DcConnection.Connected += DcConn_Connected;
            deltaService.DcConnection.Registration += DcConn_Registration;
            deltaService.DcConnection.Disconnected += DcConn_Disconnected;
            deltaService.startupComplete += StreamServices_Ready;

            deltaService.Start(TimeSpan.FromSeconds(5));

            LOG.Info("Waiting for registration...");
            if (!WAITING_FOR_REGISTRATION.Wait(TimeSpan.FromSeconds(5)))
            {
                throw new Exception(
                    "Kx Control Registration did not complete promptly"
                );
            }
            LOG.Info("Waiting for services...");
            if (!WAITING_FOR_SERVICES.Wait(TimeSpan.FromSeconds(5)))
            {
                throw new Exception("Kx Stream Services not available promptly");
            }
            LOG.Info("Kx Stream API ready.");

            // Execute Business Logic
            ....

            deltaService.Stop();

        }
}

void DcConn_Registration(object sender, bool success, HostPort hostPort, int? handle)
{
    if (success)
    {
        LOG.Info("DC Registration successful to {0}, handle:{1}", hostPort, handle);
        WAITING_FOR_REGISTRATION.Set();
    }
    else
    {
        LOG.Info("DC Registration failed to {0}", hostPort);
    }
}

void StreamServices_Ready(object sender, EventArgs e)
{
    WAITING_FOR_SERVICES.Set();
}

void DcConn_Connected(object sender, HostPort hostPort, int? handle)
{
    LOG.Info("Connected to " + hostPort + " , handle:" + handle);
}

void DcConn_Disconnected(object sender, HostPort hostPort)
{
    LOG.Error("Disconnected from " + hostPort);
}

This will create a connection to Kx Control and authorize the user.

Enable TLS Server certificate verification

To allow the C# Stream API to use SSL verification it necessary to import and install a TLS server certificate on the client side.

  1. Setup platform installation to have TLS enabled.

  2. Take the ca.pem certificate from the Linux server and save the contents as ca.cer file to allow Windows to automatically recognise it as a certificate.

  3. Save file to approriate location on Windows client machine

  4. If you double-click on the certificate you will see the details of the certificate.

    Screenshot

  5. Additionally if you check the Certification Path tab you will see the certificate status is not trusted.

    Screenshot

  6. To install the certificate follow the steps as detailed at Windows Import Certificate.

  7. Once the certificate has been installed you should see the certificate listed under Trusted Root Certificates Authorities/Certificates.

    Screenshot

  8. Additionally if you double-click on the certificate and check the Certification Path tab you should see the certificate has been successfully installed and is now trusted.

    Screenshot

  9. Provided the local certificate matches remote certificate on the Linux server the C# Stream API should automatically validate the connection on startup.

Available services

Depending on the arguments provided to the IDeltaService constructor, a number of services will be automatically initialized and available for use. Each has a Demo class within the DeltaApiCoreDemo solution as an example of available functionality:

Service Demo Class Description
Configuration management Program.cs Allows interaction with Control configuration data
Query Router Service QueryRouterServiceDemo.cs Execute async queries aganst a running Kx Platform environment
Public API service DeltaAPIServiceDemo.cs Execute public API functions via the C# Stream API
Streaming Analytics API StreamingAnalyticServiceDemo.cs Receive Streaming analytic updates from the Kx Platform
Messaging and data discovery service DMSServiceDemo.cs Integrate with the Kx Platform Messaging Service

Configuration management

Configuration management includes reading, updating and deleting various settings inside Delta Control. The ConfigManager property of IDeltaService encapsulates this feature.

The ConfigManager servie is available on the creation of an IDeltaService instances. currently available API includes:

// Returns details of the Configuration Parameter
// DS_GW_Servers:ds_gw_ops_a for Key/Value pairs;
// assumes Config parameter has only 2 columns
Properties configParamDetails =
    deltaService.ConfigManager.GetProperties("DS_GW_Servers", "ds_gw_ops_a");

// Returns the configuration parameter as a list of Properties.
// The Properties will relate to the each row of the configuration element.
IList<Properties> configParamDetails =
    deltaService.ConfigManager.GetPropertyList("DS_GW_Servers", "ds_gw_ops_a");

// Returns the configuration parameter as a list of Dictionaries.
// The Dictionary will relate to the each row of the configuration element.
IList<Dictionary<string, object>> configParamDetails =
    deltaService.ConfigManager.GetMapList("DS_GW_Servers", "ds_gw_ops_a");

// Returns the configuration parameter as a Kk.c.Flip object
kx.c.Flip configParamDetails =
    deltaService.ConfigManager.GetFlip("DS_GW_Servers", "ds_gw_ops_a");

// Returns a list of overrides for a configuration parameter
string[] configParamOverrides =
    deltaService.ConfigManager.GetOverrides("DS_GW_Servers");

// Returns connection details for a defined Connection entity in the Kx Platform
ExtConnectionDetails extConnDetails =
    deltaService.ConfigManager.GetConnection("rdb_process");

// Returns a kx.c.Dict of publicly available Analytic APIs defined in the Kx Platform
object analyticAPI = deltaService.ConfigManager.GetAnalyticApi();

// Returns true/false if an configuration parameter exists
bool paramExist =
    deltaService.ConfigManager.DoesParameterExists("DS_GW_Servers", "ds_gw_ops_a");

// Returns schema details for a requested Schema
DeltaSchema schemaDetails =
    deltaService.ConfigManager.GetSchema("dmxsysCPU");

// Returns list of schemas in a Schema Group
string[] schemasInGroup =
    deltaService.ConfigManager.GetSchemasForGroup("DM_Monitoring");

// Returns Connection details for all connections in a group
ConnectionGroupDetails connGroupDetails =
    deltaService.ConfigManager.GetConnectionGroup("ds_hdb_ops");

// Returns connection details for a specific instance
BasicConnectionDetails connDetails2 =
    deltaService.ConfigManager.GetConnectionDetails("ds_hdb_ops_a");

// Retrieves the Process Template name for an instance
string processTemplateName =
    deltaService.ConfigManager.GetProcessTemplateName("ds_rdb_ops_a");

// Return the instance parameters and values
Dictionary<String, object> instanceParams =
    deltaService.ConfigManager.GetProcessInstanceParamValues("ds_rdb_ops_a");

// Returns the task details
Dictionary<String, object> taskInfo =
    deltaService.ConfigManager.GetTaskInfo("ds_rdb_ops_a.1");

List<Dictionary<string, object>> paramData =
    deltaService.ConfigManager.GetMapList("DS_GW_Servers", "ds_gw_ops_a");
    deltaService.ConfigManager.AddConfig("clonedConfig", "DEFAULT", paramData);

// Modify paramData and update
deltaService.ConfigManager.UpdateConfig("clonedConfig", "DEFAULT", paramData);

// Remove configuration parameter
deltaService.ConfigManager.UpdateConfig("clonedConfig", "DEFAULT");

Query Router service

The Query Router (QR) is a Kx Platform process used to manage client requests and database availability. The goal of the framework is to queue and dispatch requests in a way that best manages database resources, minimizing contention. While it is beyond the scope of this document to describe the server-side features in more detail, the library support of this feature will be summarized below.

Before use, connectivity (including QR registration) must be established. This should automatically happen as part of the internal workflow of the IDeltaService.Start method when the enableQueryAPI is set to True in the constructor of the IDeltaService class.

How to enable the query interface:

IDeltaService deltaService = new DefaultDeltaService(
        controlCluster, USER_PASS, THIS_INSTANCE_NAME, enableQueryAPI: true,
        isExclusive: true,
        dmsCfgName: DMS_CFG_NAME
    )

The QueryRouter property of IDeltaService includes a QrRegistration event, whose handlers are called when QR registration completes.

Handlers for any of this event should generally be set up before IDeltaService.Start (or even QueryRouter.Start) is called, otherwise the relevant event may be triggered before the handler is in place to use it. A Registration handler, for example, could be used to set some shared waitable object instance, thereby pausing program execution for at most X seconds, otherwise continuing immediately when registration completes. This would be a better alternative to a fixed delay after calling start, where with or without registration, the fixed delay is always present.

// Listen for QR registration
ManualResetEventSlim qrRegisteredTrigger = new ManualResetEventSlim(false);
deltaService.QueryRouter.QrRegistration += (sender, success) =>
{
    if (success)
    {
        qrRegisteredTrigger.Set();
    }
};
bool successFulQrRegistration = qrRegisteredTrigger.Wait(TimeSpan.FromSeconds(5));
if (!successFulQrRegistration)
{
    throw new Exception("QR Registration did not succeed promptly");
}
// Execute QR Queries

Request/response query

Once fully registered, queries can be executed against the Query Router Service.

object[] queryObj = new object[2];
queryObj[0] = ".api.getQuotes";
queryObj[1] = new c.Dict(
        new object[] { "symList" },
        new object[] { new string[] { "EUR/USD" } }
    );

deltaService.QueryRouter.RunQuery(
        queryObj:queryObj,
        user: "Administrator",
        database: "ds_rdb",
        optParams: new object[] { },
        messageHandler: responseHandler
    );
parameter required description
queryObj yes The query object which is being executed.
user yes User executing the query. The User who authenticated the connections does not have to be the same user who is is executing the query.
database yes The connection; connection group where the query should be executed. (This may be overriden in the cause of routed queries).
optParams yes A Dictionary containing optional parameters for the query. Can be empty
messageHandler yes The IQrMessageHandler handler instance which will handle the results/exception. Alternatively the IQrBinaryMessageHandler interface can be used and will deliver the results in raw byte array which can be deserialised later using kx core IPC deserialiser.

This will execute the query asynchronously and the result provided by the messageHandler. The format of the queryObj is as per defined in the Kx C# interface. For example

object queryObj = "2+2";
queryObj = new object[] { '+', 2, 2 };

Kx Platform adds additional secure-handling checks depending on the executing users and configuration of the environment. In such cases, string queries or functions the calling user is unpermissioned for will fail with a suitable error.

Public Query API

This component is responsible for interacting with the Delta Control API Service to call ‘public’ analytics in a more managed, type-safe manner. Library support for this feature is encapsulated in the DeltaApiService property of an IDeltaService instance.

Before use, connectivity must be established. As the Public Query API remains a part of the Query interface, like the Query Router this service must be enabled as part of the creation of the IDeltaService.

IDeltaService deltaService = new DefaultDeltaService(
        controlCluster, USER_PASS, THIS_INSTANCE_NAME, enableQueryAPI: true,
        isExclusive: true, dmsCfgName: DMS_CFG_NAME
    )

The available Public Query API can be retrieve via Api property of the DeltaAPIService. This is built from the publicly-available APIs as defined within the Kx Platform environment.

Dictionary<string, IDeltaApi> availableAPI = deltaService.DeltaApiService.Api;

The key is the name for the API; associated with the alias value of the Analytics within your Platform environment, and the value is an IDeltaAPI object representing the API which can be executed. The IDeltaAPI object is then used to build an IDeltaQuery via the IDeltaAPIService

IDeltaApi api = deltaService.DeltaApiService.Api["aliasName"];
IDeltaQuery query = deltaService.DeltaApiService.CreateDeltaQuery(api);

At this point you have an IDeltaQuery framework object containing the API definition. The user must then apply parameterization to the IDeltaQuery object. This will depend on what the API is expecting as parameters. The IDeltaAPI will detail the expected parameters; as will the Analytic definition in the Platform environment.

For example to apply Dict input parameters:

Dictionary<string, object> params = new Dictionary<string, object>();
parms["symList"] = new string[] {"EUR/USD"};
query.AddParameterValues(parms);

Alternatively, if the expected input is a Table, the parameters would be applied:

Dictionary<string, object> params = new Dictionary<string, object>();
params["col1"]=1;
params["col2"]= new int[] {1, 2, 3};
query.AddParameterValues(params);
params = new Dictionary<string, object>();
params["col1"]=10;
params["col2"]= new int[] {99, 62, 33};
query.AddParameterValues(params);

Once the IDeltaQuery object has been created, it can then be executed with the runQuery method API. This call will block until the query execution finishes or the call times out. The data returned will be as received from the q process; typically a .kx.c.Dict or a kx.c.Flip object.

object result = deltaService.DeltaApiService.RunQuery(query, username, optParams);
parameter required description
query yes IDeltaQuery being executed
username yes user to run query as
optParams yes Dictionary of optional parameters. Can be null

The optional parameters provided to the runQuery method allow customized processing within the Kx Platform Query framework.

name type description
timeout int Override the default timeout for this request (in seconds)
clientTime timestamp Timestamp assigned by client when entering request. Used in QR timestamp logging
logCorr char[] Client assigned logging correlator
page int Number of rows to return (same behavior as sublist)
page int[] Two element list of start and number rows (same as sublist). If specified in this format, results will be returned in the Control paged format
cache boolean Whether to cache paged results
sort boolean Sort results as part of paging
qp string[] List of qps to consider executing the query on

When including optional parameters, provide a Dictionary object as part of the method call:

Dictionary<string, object> optParams = new Dictionary<string, object>();
optParams["logCorr"] = "SimpleQueryAPI".toCharArray();
object result =
    deltaService.DeltaApiService.runQuery(query, "<username>", optParams);

For full details of the available optional parameters please refer to the Query Router section of the Process Template API documentation.

Streaming Analytics API

The Kx Stream C# API provides access to a Streaming Analytics Service, which can be used to subscribe to and receive updates from a Streaming Analytic. The service is enabled by the same query interface property enableQueryAPI provided during the construction of the IDeltaService

There exists an additional trigger to ensure the service is fully connected and ready to stream.

IDeltaService deltaService = new DefaultDeltaService(
        new[] { new HostPort(DC_HOST, DC_PORT, USE_TLS) },
        USER_PASS, THIS_INSTANCE_NAME, enableQueryAPI: true,
        isExclusive: true,
        dmsCfgName: DMS_CFG_NAME
    )

ManualResetEventSlim qmStartedTrigger = new ManualResetEventSlim(false);
deltaService.StreamingQmConnection.Started += qmStartedTrigger.Set;
if (!qmStartedTrigger.Wait(TimeSpan.FromSeconds(10)))
{
    throw new Exception("Kx Stream Services not available promptly");
}

To start streaming simply call startStreaming.

kx.c.Dict parameters = new kx.c.Dict(
        new[] { "size", "sym" },
        (new object[] { 1000.0, "EUR/USD" })
    );
Guid uid=deltaService.StreamingQmConnection.Subscribe(
        "<analyticName>",
        parameters,
        "<databaseName>",
        "<username>",
        (guid, dataTable) => {
            // Business Logic
        }
    );

A number of parameters are required.

parameter description
analyticName Name of the streaming analytic as defined in the Platform
parameters Parameters as defined in the analytic within the Platform. The type of this object will depend on the parameter types expected by the analytic.
database Database where the Streaming Analytic should be run
Action<Guid, kx.c.Flip> Handles incoming messages.

To stop streaming, call removeSubscription method:

deltaService.StreamingQmConnection.Unsubscribe(uid);

Messaging and data discovery service

The Delta Messaging Server (DMS) is a feature to allow publishing of data to remote subscribers, or subscribe from a remote publisher based upon a channel/topic configuration. In order to enable the DMS service connectivity to the DMS server is required. This should automatically happen as part of the internal workflow of the IDeltaService.Start method when a valid dmsCfgName is provided. This should point to a valid configuration paameter containing the list of available messaging servers in the environment. As part of Kx Stream the configuration parameter used is DS_MESSAGING_SERVER:DS. This contains the messaging server instances ds_ms_a and ds_ms_b. Atleast one of these should be running to have a valid DMS service in your environment

Note

The configuration parameter containing the DMS instances is fully configurable, as is the ability to run alternative DMS Servers with an appropriately updated configuration. See Kx Stream for more details

IDeltaService deltaService = new DefaultDeltaService(
        new[] { new HostPort(DC_HOST, DC_PORT, USE_TLS) },
        USER_PASS, THIS_INSTANCE_NAME, enableQueryAPI: true,
        isExclusive: true,
        dmsCfgName: DMS_CFG_NAME
    )

Publishing

To publish via DMS; it is necessary to create a class implementing the IDeltaMessagingPublisher interface; this allows it to listen for incoming subscriptions requests allowing it to maintain active subscriptions to remote clients.

string channel = "tickerplant";
string table = "dfxAsk";
DmsPublisher publisher = new DmsPublisher(table);
deltaService.DeltaMessagingServer.RegisterPublisher(
    publisher, channel, table, filterColumns: filterColumns);

DMSPublisher

public class DmsPublisher : IDeltaMessagingPublisher
{
    private readonly ConcurrentDictionary<DeltaMessagingSubscription,
        DeltaMessagingSubscription> _subscriptions;
    private readonly string _table;
    private readonly bool syncPublisher;

    public DmsPublisher(string table, bool syncPublisher = false)
    {
        this.syncPublisher = syncPublisher;
        _table = table;
        _subscriptions = new ConcurrentDictionary<DeltaMessagingSubscription,
            DeltaMessagingSubscription>();
    }

    public void SubscriptionAdded(string topicName,
        DeltaMessagingSubscription subscription)
    {
        _subscriptions[subscription] = subscription;
    }

    public void SubscriptionRemoved(DeltaMessagingSubscription subscription)
    {
        DeltaMessagingSubscription s;
        _subscriptions.TryRemove(subscription, out s);
    }


    public void Publish(kx.c.Flip data)
    {
        foreach (DeltaMessagingSubscription subscription in _subscriptions.Values)
        {
            subscription.Publish(_table, data);
        }

    }

    // If st up as a sync publisher can publish sync
    public Dictionary<string, object> PublishSync(kx.c.Flip data)
    {
        Dictionary<string, object> results = new Dictionary<string, object>();
        foreach (DeltaMessagingSubscription subscription in _subscriptions.Values)
        {
            try {
                results.Add(
                        subscription.ToString(),
                        subscription.Publish(_table, data, true)
                    );
            } catch(Exception e){
                results.Add(subscription.ToString(), e);
            }
        }
        return results;
    }

    public string SyncPublishFunction { get; } = "upd";
}

Once initialized and registered, publishing would be achieved by using the defined DeltaMessageSubscriptions and publishing to each.

DateTime now = DateTime.UtcNow;
c.Dict dataRow = new c.Dict(
        new[] {} "sym", "price", "time", "size", "src", "sizes", "orderIds"},
        new object[] { new object[] { "EUR/USD" },
        new object[] { 1.23 },
        new object[] { DateTime.Now },
        new object[] { 10000 },
        new object[] { "Cantor" },
        new object[] { "M" }, new object[] { 1234532 } }
    );
c.Flip f = new c.Flip(dataRow);
publisher.Publish(f);

Subscribing

To subscribe via DMS, you should create a class implementing IDeltaMessagingSubscriber. This enforces the need for a upd function to accept the incoming data for processing. This will contain the topic and a flip of the data. Like publication, the DMS instance should be registered as a subscriber with a channel, topic and, if applicable, any filters.

string channel = "tickerplant";
string table = "dfxQuote";
DmsSubscriber subscriber = new DmsSubscriber();
ManualResetEventSlim registrationTrigger = new ManualResetEventSlim(false);
deltaService.DeltaMessagingServer.RegisterSubscriber(
        subscriber, channel, table, filterColumns: null,
        registrationTrigger: registrationTrigger
    );
if (!registrationTrigger.Wait(TimeSpan.FromSeconds(5)))
{
    throw new Exception("Subscriber registration did not complete promptly");
}
...
...
// When complete
deltaService.DeltaMessagingServer.UnregisterSubscriber(
    subscriber, channel, table, filterColumns: null);

DMSSubscriber

public class DmsSubscriber : IDeltaMessagingSubscriber
{
    public readonly ConcurrentDictionary<string,ConcurrentQueue<object>>
        ReceivedData = new ConcurrentDictionary<string, ConcurrentQueue<object>>();

    public void Upd(string baseTopic, object data)
    {
        ConcurrentQueue<object> dataForTopic = ReceivedData.GetOrAdd(
                baseTopic,
                bt => new ConcurrentQueue<object>()
            );
        dataForTopic.Enqueue(data);
    }
}

Type mapping

C#                                            kdb+
-----------------------------------------------------------
System.Boolean/bool                           boolean     b
System.Byte/byte                              byte        x
System.Int16/short                            short       h
System.Int32/int                              int         i
System.Int64/long                             long        j
System.Single/float                           real        e
System.Double/double                          float       f
System.String/string                          symbol      s
System.Char[]/char[]                          string      C
System.Char/char                              char        c
DeltaApiCore.Connections.Kdb.Support.Date     date        d
System.DateTime                               datetime    z
DeltaApiCore.Connections.Kdb.Support.Month    month       m
DeltaApiCore.Types.QTimestamp                 timestamp   p
DeltaApiCore.Connections.Kdb.Support.Minute   minute      u
DeltaApiCore.Connections.Kdb.Support.Second   second      v
System.TimeSpan                               time        t
DeltaApiCore.Types.QTimespan                  timespan    n
System.Guid                                   guid        g
DeltaApiCore.Connections.Kdb.Support.Dict     dictionary
DeltaApiCore.Connections.Kdb.Support.Flip     table