KX Stream C# .NET API
The KX Stream C# API offers programmable access via the C# programming language to core features available as part of KX Delta Platform. This API extends the KX core C# interface aimed at providing a higher-level service- based access to key features available as part of KX Delta Platform. This includes features such as:
- Configuration management
- Query Router service
- Public Query API service
- Streaming Analytics API
- Messaging and data discovery service
Developing with the KX Stream C# API
The key libraries required to build a C# application with the C# Stream API are available in the a number of DeltaCsApi*.zip
files. There are AnyCPU, 32-bit and 64-bit versions of two packages:
DeltaCsAPICore
provides the DLL libraries for implementing an application on top of the C# Stream APIDeltaCsAPIDemo
provides a Visual Studio demo solution with examples of integrating with the KX Delta Platform services
DeltaCsAPICore.zip
The file contains two key DLL libraries:
DeltaApiCore.dll
is the main DLL required to utilize features of the C# Stream API.Newtonsoft.Json.dll
is a dependency of the DeltaApiCore providing JSON support to the C# Stream API
Both these should be imported as references to the application to allow interfacing with the KX Stream services within the KX Delta Platform.
The DeltaApiCore.dll
is compiled on .NET Standard 2.0 and supports applications on
- .NET Core 2.0 - 3.1
- .NET Framework 4.6.1 - 4.8
See https://dotnet.microsoft.com/platform/dotnet-standard for further details
DeltaCsAPIDemo.zip
To help provide examples of usage, the file contains source code and a DeltaApiDemo.sln
solution which can be imported to Visual Studio and run highlighting the various services available.
By default the DeltaApiDemo
is compiled as a .NET Core 3.0 application. To build and run the application you will need to have .NET Core 3.0 SDK and runtime installed.
To run the Demo application from within Visual Studio
- Open the
DeltaApiDemo
solution - Update the
App.config
file to point to the KX Delta Platform environment setting appropriate KX Control details. - Depending on the demo class being executed, a number of additional config values should be updates within the
App.config
file.
<appSettings>
<add key="DC_HOST" value="<host>" />
<add key="DC_PORT" value="<port>" />
<add key="DC_HOST_SECONDARY" value="" />
<add key="DC_PORT_SECONDARY" value="" />
<add key="DC_TLS" value="false" />
<add key="USER" value="<user>" />
<add key="PASS" value="<password>" />
<add key="THIS_INSTANCE_NAME" value="<instanceName>" />
<add key="ANOTHER_INSTANCE_NAME" value="<instanceName>" />
<add key="CONNECTION_GROUP_NAME" value="<connectionName>" />
<add key="SCHEMA_TABLE_GROUP_NAME" value="<SchemaGroupName>" />
<add key="CONFIG_NAME" value="<configName>" />
<add key="CONFIG_NAME_OVERRIDE" value="<overrideName>" />
<add key="CONFIG_NAME_CLONE_SOURCE" value="<configName>" />
<add key="CONFIG_NAME_CLONE_TARGET" value="<configName>" />
<add key="CONFIG_NAME_CLONE_OVERRIDE" value="DEFAULT" />
<add key="QR_QUERY_USER" value="Administrator" />
<add key="QR_QUERY_TARGET_DB" value="<databaseName>" />
<add key="DMS_CFG_NAME" value="DS_MESSAGING_SERVER:DS" />
</appSettings>
A number of main classes can be run as part of the demo.
class | description |
---|---|
DMSServiceDemo | messaging and data discovery service |
DeltaAPIServiceDemo | public query API |
QueryRouterServiceDemo | Query Router service |
StreamingAnalyticServiceDemo | Streaming Analytic API |
Right-click the DeltacsAPIDemo
project, select properties and update the startup project to run the appropriate demo
DeltaCsAPIDemo
application config
The App.config
allows for a user configuration to be used within the Demo application. A number of key configurations are used with each service demo.
key | description |
---|---|
nlog.config | configuration file |
DC_HOST | hostname of the primary KX Control process |
DC_PORT | port of the primary KX Control process |
DC_HOST_SECONDARY | hostname of the secondary KX Control process when deployed in a cluster. Can be blank if not in a cluster |
DC_PORT_SECONDARY | port of the secondary KX Control process when deployed in a cluster. Can be blank if not in a cluster |
DC_TLS | whether a TLS connection to KX Control should be used. Requires TLS enabled on KX Control |
USER | authorized user to connect as |
PASS | user password |
THIS_INSTANCE_NAME | name of this running instance: reported to KX Control, and used to allow monitoring of status of the process; and ensures uniqueness of a instance |
ANOTHER_INSTANCE_NAME | used as an example of how to look up information of another instance via the Configuration Management Service |
CONNECTION_GROUP_NAME | used as an example of how to look up Connection Group information via the Configuration Management Service |
SCHEMA_TABLE_GROUP_NAME | used as an example of how to look up Schema Group information via the Configuration Management Service |
CONFIG_NAME | used as an example of how to look up Configuration Parameter information via the Configuration Management Service |
CONFIG_NAME_OVERRIDE | used as an example of how to look up Config Parameter override information via the Configuration Management Service |
CONFIG_NAME_CLONE_SOURCE | used as an example of create and update Config Parameter information via the Configuration Management Service |
CONFIG_NAME_CLONE_TARGET | used as an example of create and update Config Parameter information via the Configuration Management Service |
CONFIG_NAME_CLONE_OVERRIDE | used as an example of create and update Config Parameter information via the Configuration Management Service |
QR_QUERY_USER | used as part of the Query Router Service to highlight how queries can be executed as specific users |
QR_QUERY_TARGET_DB | used as part of the Query Router Service to highlight how queries are targeted to specific processes |
DMS_CFG_NAME | key configuration within the Messaging and data discovery service to point the C# Stream API at the environment DMS process |
DeltaCsAPIDemo
logging config
The DeltaCsAPIDemo
uses nlog
for logging purposes with configuration defined in the nlog.config
file. By default logger will output to trace.log
, debug.log
, info.log
, warn.log
and error.log
files depending on log level.
See https://nlog-project.org/ for further details.
<?xml version="1.0" encoding="utf-8" ?>
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >
<targets>
<target name="traceLogger"
xsi:type="File"
layout="${longdate} [${threadid}] ${level:uppercase=true} ${logger} - ${message}"
fileName="logs\trace.log"
archiveAboveSize="10485760"
maxArchiveFiles="10"
archiveNumbering="Sequence"
concurrentWrites="true"
keepFileOpen="false"/>
<target name="debugLogger"
xsi:type="File"
layout="${longdate} [${threadid}] ${level:uppercase=true} ${logger} - ${message}"
fileName="logs\debug.log"
archiveAboveSize="10485760"
maxArchiveFiles="10"
archiveNumbering="Sequence"
concurrentWrites="true"
keepFileOpen="false"/>
<target name="infoLogger"
xsi:type="File"
layout="${longdate} [${threadid}] ${level:uppercase=true} ${logger} - ${message}"
fileName="logs\info.log"
archiveAboveSize="10485760"
maxArchiveFiles="10"
archiveNumbering="Sequence"
concurrentWrites="true"
keepFileOpen="false"/>
<target name="warnLogger"
xsi:type="File"
layout="${longdate} [${threadid}] ${level:uppercase=true} ${logger} - ${message}"
fileName="logs\warn.log"
archiveAboveSize="10485760"
maxArchiveFiles="10"
archiveNumbering="Sequence"
concurrentWrites="true"
keepFileOpen="false"/>
<target name="errorLogger"
xsi:type="File"
layout="${longdate} [${threadid}] ${level:uppercase=true} ${logger} - ${message}"
fileName="logs\error.log"
archiveAboveSize="10485760"
maxArchiveFiles="10"
archiveNumbering="Sequence"
concurrentWrites="true"
keepFileOpen="false"/>
</targets>
<rules>
<logger name="*"
minlevel="Trace"
writeTo="traceLogger"/>
<logger name="*"
minlevel="Debug"
writeTo="debugLogger"/>
<logger name="*"
minlevel="Info"
writeTo="infoLogger"/>
<logger name="*"
minlevel="Warn"
writeTo="warnLogger"/>
<logger name="*"
minlevel="Error"
writeTo="errorLogger"/>
</rules>
</nlog>
Creating a KX Stream instance
When writing an application to interface with the KX Delta Platform it is necessary to use the core C# Stream API components to create the connection; initialize services and execute the actions.
The main entry point to the C# KX Stream API is an instance of IDeltaStream
. This is implemented by the DefaultDeltaService
entity.
DefaultDeltaService
contains a default implementation for all core components.
component | component | purpose |
---|---|---|
DcConnection | IDcPrclConnection | Core connectivity to Delta Control |
ConfigManager | IConfigurationManager | For managing configuration of connections, processes etc. in Delta Control |
QueryRouter | IDeltaQueryRouter | A feature to queue and dispatch requests across database resources, taking availability into account |
DeltaApiService | IDeltaApiService | Managed calling of public analytics |
DeltaMessagingServer | IDeltaMessagingServer | Message publication |
StreamingQmConnection | IStreamingQmConnection | For receiving data from streaming analytics |
IDeltaService
extends the IDisposable
interface so the library core, and components can be created and destroyed, using the C# using
pattern, or by instantiating and manually destroying using the Dispose method.
A creation of a single DeltaStream
can be achieved by
HostPort[] controlCluster = new HostPort[3];
controlCluster[0] = new HostPort(host1, 2001, false);
controlCluster[1] = new HostPort(host2, 2001, false);
controlCluster[2] = new HostPort(host3, 2001, false);
using(
IDeltaService deltaService = new DefaultDeltaService
(
controlCluster, USER_PASS,
THIS_INSTANCE_NAME, dmsCfgName: "DS_MESSAGING_SERVER:DS",
isExclusive: true
)
)
{
deltaService.Start(TimeSpan.FromSeconds(5));
// Execute Service logic
....
}
KX Control connectivity
KX Control connectivity is key to enable all services available within the KX Stream API interface. If configured correctly as part of the IDeltaService
this should automatically happen as part of the internal workflow of the IDeltaService.Start
method.
DcConnection
provides a number of relevant features e.g. events, and methods:
event or method |
purpose |
---|---|
Start | Starts the connection to DC, including registration of this process with DC. Should be called automatically by IDeltaService.Start . |
Stop | Stops the connection to DC. Should be called automatically by IDeltaService.Stop . |
CurrentHandle | The ID of this process in DC following successful registration. Corresponds to the Handle in KX Control UI Process Library Status View window. |
Connected | An event whose handlers are called once a connection has been established, but before registration of this process completes. |
Registration | An event whose handlers are called once process registration completes. |
Disconnected | An event whose handlers are called when a connection is lost. (This implies any process registration is also lost.) |
Handlers for any of the above events should generally be set up before IDeltaService.Start
(or even DcConnection.Start
) is called, otherwise the relevant event may be triggered before the handler is in place to use it.
A Registration handler, for example, could be used to set some shared waitable object instance, thereby pausing program execution for at most X seconds, otherwise continuing immediately when registration completes. This would be a better alternative to a fixed delay after calling Start
, where with or without registration, the fixed delay is always present
private ManualResetEventSlim WAITING_FOR_REGISTRATION =
new ManualResetEventSlim(false);
private ManualResetEventSlim WAITING_FOR_SERVICES =
new ManualResetEventSlim(false);
...
...
void run()
{
using (
IDeltaService deltaService = new DefaultDeltaService(
controlCluster, USER_PASS, THIS_INSTANCE_NAME,
dmsCfgName: DMS_CFG_NAME
),
isExclusive: true
)
{
deltaService.DcConnection.Connected += DcConn_Connected;
deltaService.DcConnection.Registration += DcConn_Registration;
deltaService.DcConnection.Disconnected += DcConn_Disconnected;
deltaService.startupComplete += StreamServices_Ready;
deltaService.Start(TimeSpan.FromSeconds(5));
LOG.Info("Waiting for registration...");
if (!WAITING_FOR_REGISTRATION.Wait(TimeSpan.FromSeconds(5)))
{
throw new Exception(
"KX Control Registration did not complete promptly"
);
}
LOG.Info("Waiting for services...");
if (!WAITING_FOR_SERVICES.Wait(TimeSpan.FromSeconds(5)))
{
throw new Exception("KX Stream Services not available promptly");
}
LOG.Info("KX Stream API ready.");
// Execute Business Logic
....
deltaService.Stop();
}
}
void DcConn_Registration(object sender, bool success, HostPort hostPort, int? handle)
{
if (success)
{
LOG.Info("DC Registration successful to {0}, handle:{1}", hostPort, handle);
WAITING_FOR_REGISTRATION.Set();
}
else
{
LOG.Info("DC Registration failed to {0}", hostPort);
}
}
void StreamServices_Ready(object sender, EventArgs e)
{
WAITING_FOR_SERVICES.Set();
}
void DcConn_Connected(object sender, HostPort hostPort, int? handle)
{
LOG.Info("Connected to " + hostPort + " , handle:" + handle);
}
void DcConn_Disconnected(object sender, HostPort hostPort)
{
LOG.Error("Disconnected from " + hostPort);
}
This will create a connection to KX Control and authorize the user.
Enable TLS server certificate verification
To allow the C# Stream API to use SSL verification it necessary to import and install a TLS server certificate on the client side.
-
Set up KX Delta Platform installation to have TLS enabled.
-
Take the
ca.pem
certificate from the Linux server and save the contents asca.cer
file to allow Windows to automatically recognise it as a certificate. -
Save file to appropriate location on Windows client machine
-
If you double-click on the certificate you will see the details of the certificate.
-
Additionally if you check the Certification Path tab you will see the certificate status is not trusted.
-
To install the certificate follow the steps as detailed at Windows Import Certificate.
-
Once the certificate has been installed you should see the certificate listed under Trusted Root Certificates Authorities/Certificates.
-
Additionally if you double-click on the certificate and check the Certification Path tab you should see the certificate has been successfully installed and is now trusted.
-
Provided the local certificate matches remote certificate on the Linux server the C# Stream API should automatically validate the connection on startup.
Available services
Depending on the arguments provided to the IDeltaService
constructor, a number of services will be automatically initialized and available for use. Each has a Demo class within the DeltaApiCoreDemo solution as an example of available functionality:
Service | Demo Class | Description |
---|---|---|
Configuration management | Program.cs | Allows interaction with Control configuration data |
Query Router Service | QueryRouterServiceDemo.cs | Execute async queries against a running KX Delta Platform environment |
Public API service | DeltaAPIServiceDemo.cs | Execute public API functions via the C# Stream API |
Streaming Analytics API | StreamingAnalyticServiceDemo.cs | Receive Streaming analytic updates from the KX Delta Platform |
Messaging and data discovery service | DMSServiceDemo.cs | Integrate with the KX Delta Messaging Service |
Configuration management
Configuration management includes reading, updating and deleting various settings inside Delta Control. The ConfigManager
property of IDeltaService
encapsulates this feature.
The ConfigManager
service is available on the creation of an IDeltaService
instances. currently available API includes:
// Returns details of the Configuration Parameter
// DS_GW_Servers:ds_gw_ops_a for Key/Value pairs;
// assumes Config parameter has only 2 columns
Properties configParamDetails =
deltaService.ConfigManager.GetProperties("DS_GW_Servers", "ds_gw_ops_a");
// Returns the configuration parameter as a list of Properties.
// The Properties will relate to the each row of the configuration element.
IList<Properties> configParamDetails =
deltaService.ConfigManager.GetPropertyList("DS_GW_Servers", "ds_gw_ops_a");
// Returns the configuration parameter as a list of Dictionaries.
// The Dictionary will relate to the each row of the configuration element.
IList<Dictionary<string, object>> configParamDetails =
deltaService.ConfigManager.GetMapList("DS_GW_Servers", "ds_gw_ops_a");
// Returns the configuration parameter as a Kk.c.Flip object
kx.c.Flip configParamDetails =
deltaService.ConfigManager.GetFlip("DS_GW_Servers", "ds_gw_ops_a");
// Returns a list of overrides for a configuration parameter
string[] configParamOverrides =
deltaService.ConfigManager.GetOverrides("DS_GW_Servers");
// Returns connection details for a defined Connection entity in the KX Delta Platform
ExtConnectionDetails extConnDetails =
deltaService.ConfigManager.GetConnection("rdb_process");
// Returns a kx.c.Dict of publicly available Analytic APIs defined in the KX Delta Platform
object analyticAPI = deltaService.ConfigManager.GetAnalyticApi();
// Returns true/false if an configuration parameter exists
bool paramExist =
deltaService.ConfigManager.DoesParameterExists("DS_GW_Servers", "ds_gw_ops_a");
// Returns schema details for a requested Schema
DeltaSchema schemaDetails =
deltaService.ConfigManager.GetSchema("dmxsysCPU");
// Returns list of schemas in a Schema Group
string[] schemasInGroup =
deltaService.ConfigManager.GetSchemasForGroup("DM_Monitoring");
// Returns Connection details for all connections in a group
ConnectionGroupDetails connGroupDetails =
deltaService.ConfigManager.GetConnectionGroup("ds_hdb_ops");
// Returns connection details for a specific instance
BasicConnectionDetails connDetails2 =
deltaService.ConfigManager.GetConnectionDetails("ds_hdb_ops_a");
// Retrieves the Process Template name for an instance
string processTemplateName =
deltaService.ConfigManager.GetProcessTemplateName("ds_rdb_ops_a");
// Return the instance parameters and values
Dictionary<String, object> instanceParams =
deltaService.ConfigManager.GetProcessInstanceParamValues("ds_rdb_ops_a");
// Returns the task details
Dictionary<String, object> taskInfo =
deltaService.ConfigManager.GetTaskInfo("ds_rdb_ops_a.1");
List<Dictionary<string, object>> paramData =
deltaService.ConfigManager.GetMapList("DS_GW_Servers", "ds_gw_ops_a");
deltaService.ConfigManager.AddConfig("clonedConfig", "DEFAULT", paramData);
// Modify paramData and update
deltaService.ConfigManager.UpdateConfig("clonedConfig", "DEFAULT", paramData);
// Remove configuration parameter
deltaService.ConfigManager.UpdateConfig("clonedConfig", "DEFAULT");
Query router service
The Query Router (QR) is a KX Delta Platform process used to manage client requests and database availability. The goal of the framework is to queue and dispatch requests in a way that best manages database resources, minimizing contention. While it is beyond the scope of this document to describe the server-side features in more detail, the library support of this feature will be summarized below.
Before use, connectivity (including QR registration) must be established. This should automatically happen as part of the internal workflow of the IDeltaService.Start
method when the enableQueryAPI
is set to True in the constructor of the IDeltaService
class.
How to enable the query interface:
IDeltaService deltaService = new DefaultDeltaService(
controlCluster, USER_PASS, THIS_INSTANCE_NAME, enableQueryAPI: true,
isExclusive: true,
dmsCfgName: DMS_CFG_NAME
)
The QueryRouter
property of IDeltaService
includes a QrRegistration
event, whose handlers are called when QR registration completes.
Handlers for any of this event should generally be set up before IDeltaService.Start
(or even QueryRouter.Start
) is called, otherwise the relevant event may be triggered before the handler is in place to use it.
A Registration handler, for example, could be used to set some shared waitable object instance, thereby pausing program execution for at most X seconds, otherwise continuing immediately when registration completes. This would be a better alternative to a fixed delay after calling start, where with or without registration, the fixed delay is always present.
// Listen for QR registration
ManualResetEventSlim qrRegisteredTrigger = new ManualResetEventSlim(false);
deltaService.QueryRouter.QrRegistration += (sender, success) =>
{
if (success)
{
qrRegisteredTrigger.Set();
}
};
bool successFulQrRegistration = qrRegisteredTrigger.Wait(TimeSpan.FromSeconds(5));
if (!successFulQrRegistration)
{
throw new Exception("QR Registration did not succeed promptly");
}
// Execute QR Queries
Request/response query
Once fully registered, queries can be executed against the Query Router Service.
object[] queryObj = new object[2];
queryObj[0] = ".api.getQuotes";
queryObj[1] = new c.Dict(
new object[] { "symList" },
new object[] { new string[] { "EUR/USD" } }
);
deltaService.QueryRouter.RunQuery(
queryObj:queryObj,
user: "Administrator",
database: "ds_rdb",
optParams: new object[] { },
messageHandler: responseHandler
);
parameter | required | description |
---|---|---|
queryObj | yes | The query object which is being executed. |
user | yes | User executing the query. The User who authenticated the connections does not have to be the same user who is is executing the query. |
database | yes | The connection; connection group where the query should be executed. (This may be overridden in the cause of routed queries). |
optParams | yes | A Dictionary containing optional parameters for the query. Can be empty |
messageHandler | yes | The IQrMessageHandler handler instance which will handle the results/exception. Alternatively the IQrBinaryMessageHandler interface can be used and will deliver the results in raw byte array which can be deserialised later using kx core IPC deserialiser. |
This will execute the query asynchronously and the result provided by the messageHandler
. The format of the queryObj
is as per defined in the KX C# interface. For example
object queryObj = "2+2";
queryObj = new object[] { '+', 2, 2 };
KX Delta Platform adds additional secure-handling checks depending on the executing users and configuration of the environment. In such cases, string queries or functions the calling user is unpermissioned for will fail with a suitable error.
Configuring query router service
Client applications can configure settings for QueryRouterService by accessing the DeltaApiCore.QueryRouter.Support.DeltaQueryRouterSettings
. To reset configuration settings simply invoke DeltaApiCore.QueryRouter.Support.DeltaQueryRouterSettings.ResetDefaults()
function.
Any changes to the configuration settings should be applied before initializing the DeltaApiCore.DefaultDeltaService
to avoid inconsistent behaviour.
DeltaAsynConnectionIsListeningInterval
The DeltaAsynConnectionIsListeningInterval setting determines a wait interval in milli-seconds that is used when attempting to connect in a DeltaQpConnection
or DeltaQrConnection
. If the connection is not in a listening state when sending the client registration then the interval will block before re-checking and executing the client registration message. By default the wait is 2000 milli-seconds.
// set DeltaAsynConnectionIsListeningInterval wait to 100 milli-seconds
DeltaQueryRouterSettings.DeltaAsynConnectionIsListeningInterval = 100;
// initialize API
using(var service = new DefaultDeltaService(...))
{
...
}
Polled queries
The Query Router also supports polled queries where a query can be submitted to the Query Router and the Query Router will execute that query on a timer and publish results back until the client disconnects or a StopPolledQueries
is requested.
string runId = deltaService.QueryRouter
.StartPolledQuery(
queryObj : "100 sublist reverse select from dfxQuote",
user : "Administrator",
frequency : 1000,
database : "ds_rdb_fx_eval",
optParams : new Dictionary<string, object>(),
messageHandler)
parameter | required | description |
---|---|---|
queryObj | yes | The query object which is being executed. |
user | yes | User executing the query. The User who authenticated the connections does not have to be the same user who is is executing the query. |
frequency | yes | How often to the QR should execute the query in milli-seconds |
database | yes | The connection; connection group where the query should be executed. (This may be overridden in the cause of routed queries). |
optParams | yes | A Dictionary containing optional parameters for the query. Can be empty |
messageHandler | yes | The IQrMessageHandler handler instance which will handle the results/exception. This will return the data in native IPC binary format allowing the application to process the response manually |
Polled Queries are stopped via the API.
deltaStream.QueryRouter.StopPolledQueries
(runIds, user, messageHandler);
parameter | required | description |
---|---|---|
runIds | yes | A string[] of runIds to stop |
user | yes | User requesting the stop. |
messageHandler | yes | The IQrStopPollMessageHandler handler instance which will handle return of the request |
Public query API
This component is responsible for interacting with the Delta Control API Service to call ‘public’ analytics in a more managed, type-safe manner.
Library support for this feature is encapsulated in the DeltaApiService
property of an IDeltaService
instance.
Before use, connectivity must be established. As the Public Query API remains a part of the Query interface, like the Query Router this service must be enabled as part of the creation of the IDeltaService
.
IDeltaService deltaService = new DefaultDeltaService(
controlCluster, USER_PASS, THIS_INSTANCE_NAME, enableQueryAPI: true,
isExclusive: true, dmsCfgName: DMS_CFG_NAME
)
The available Public Query API can be retrieve via Api
property of the DeltaAPIService
. This is built from the publicly-available APIs as defined within the KX Delta Platform environment.
Dictionary<string, IDeltaApi> availableAPI = deltaService.DeltaApiService.Api;
The key is the name for the API; associated with the alias
value of the Analytics within your KX Delta Platform environment, and the value is an IDeltaAPI
object representing the API which can be executed. The IDeltaAPI
object is then used to build an IDeltaQuery
via the IDeltaAPIService
IDeltaApi api = deltaService.DeltaApiService.Api["aliasName"];
IDeltaQuery query = deltaService.DeltaApiService.CreateDeltaQuery(api);
At this point you have an IDeltaQuery
framework object containing the API definition. The user must then apply parameterization to the IDeltaQuery
object. This will depend on what the API is expecting as parameters. The IDeltaAPI
will detail the expected parameters; as will the Analytic definition in the KX Delta Platform environment.
For example to apply Dict input parameters:
Dictionary<string, object> params = new Dictionary<string, object>();
parms["symList"] = new string[] {"EUR/USD"};
query.AddParameterValues(parms);
Alternatively, if the expected input is a Table, the parameters would be applied:
Dictionary<string, object> params = new Dictionary<string, object>();
params["col1"]=1;
params["col2"]= new int[] {1, 2, 3};
query.AddParameterValues(params);
params = new Dictionary<string, object>();
params["col1"]=10;
params["col2"]= new int[] {99, 62, 33};
query.AddParameterValues(params);
Once the IDeltaQuery
object has been created, it can then be executed with the runQuery
method API. This call will block until the query execution finishes or the call times out. The data returned will be as received from the q process; typically a .kx.c.Dict
or a kx.c.Flip
object.
object result = deltaService.DeltaApiService.RunQuery(query, username, optParams);
parameter | required | description |
---|---|---|
query | yes | IDeltaQuery being executed |
username | yes | user to run query as |
optParams | yes | Dictionary of optional parameters. Can be null |
messageHandler | no | The IQrMessageHandler handler instance which will handle the results/exception. Alternatively the IQrBinaryMessageHandler interface can be used and will deliver the results in raw byte array which can be deserialised later using kx core IPC deserializer. |
The optional parameters provided to the runQuery method allow customized processing within the KX Delta Platform Query framework.
name | type | description |
---|---|---|
timeout | int | Override the default timeout for this request (in seconds) |
clientTime | timestamp | Timestamp assigned by client when entering request. Used in QR timestamp logging |
logCorr | char[] | Client assigned logging correlator |
page | int | Number of rows to return (same behavior as sublist) |
page | int[] | Two element list of start and number rows (same as sublist). If specified in this format, results will be returned in the Control paged format |
cache | boolean | Whether to cache paged results |
sort | boolean | Sort results as part of paging |
qp | string[] | List of qps to consider executing the query on |
When including optional parameters, provide a Dictionary object as part of the method call:
Dictionary<string, object> optParams = new Dictionary<string, object>();
optParams["logCorr"] = "SimpleQueryAPI".toCharArray();
object result =
deltaService.DeltaApiService.runQuery(query, "<username>", optParams);
For full details of the available optional parameters please refer to the Query Router section of the Process Template API documentation.
Streaming Analytics API
The KX Stream C# API provides access to a Streaming Analytics Service, which can be used to subscribe to and receive updates from a Streaming Analytic. The service is enabled by the same query interface property enableQueryAPI
provided during the construction of the IDeltaService
There exists an additional trigger to ensure the service is fully connected and ready to stream.
IDeltaService deltaService = new DefaultDeltaService(
new[] { new HostPort(DC_HOST, DC_PORT, USE_TLS) },
USER_PASS, THIS_INSTANCE_NAME, enableQueryAPI: true,
isExclusive: true,
dmsCfgName: DMS_CFG_NAME
)
ManualResetEventSlim qmStartedTrigger = new ManualResetEventSlim(false);
deltaService.StreamingQmConnection.Started += qmStartedTrigger.Set;
if (!qmStartedTrigger.Wait(TimeSpan.FromSeconds(10)))
{
throw new Exception("KX Stream Services not available promptly");
}
To start streaming simply call startStreaming
.
kx.c.Dict parameters = new kx.c.Dict(
new[] { "size", "sym" },
(new object[] { 1000.0, "EUR/USD" })
);
Guid uid=deltaService.StreamingQmConnection.Subscribe(
"<analyticName>",
parameters,
"<databaseName>",
"<username>",
(guid, dataTable) => {
// Business Logic
}
);
A number of parameters are required.
parameter | description |
---|---|
analyticName | Name of the streaming analytic as defined in KX Delta Platform |
parameters | Parameters as defined in the analytic within KX Delta Platform. The type of this object will depend on the parameter types expected by the analytic. |
database | Database where the Streaming Analytic should be run |
Action<Guid, kx.c.Flip> | Handles incoming messages. |
To stop streaming, call removeSubscription
method:
deltaService.StreamingQmConnection.Unsubscribe(uid);
Messaging and data discovery service
The Delta Messaging Server (DMS) is a feature to allow publishing of data to remote subscribers, or subscribe from a remote publisher based upon a channel/topic configuration. In order to enable the DMS service connectivity to the DMS server is required. This should automatically happen as part of the internal workflow of the IDeltaService.Start
method when a valid dmsCfgName
is provided. This should point to a valid configuration paameter containing the list of available messaging servers in the environment. As part of KX Stream the configuration parameter used is DS_MESSAGING_SERVER:DS
. This contains the messaging server instances ds_ms_a
and ds_ms_b
. Atleast one of these should be running to have a valid DMS service in your environment
Note
The configuration parameter containing the DMS instances is fully configurable, as is the ability to run alternative DMS Servers with an appropriately updated configuration. See KX Stream for more details
IDeltaService deltaService = new DefaultDeltaService(
new[] { new HostPort(DC_HOST, DC_PORT, USE_TLS) },
USER_PASS, THIS_INSTANCE_NAME, enableQueryAPI: true,
isExclusive: true,
dmsCfgName: DMS_CFG_NAME
)
Publishing
To publish via DMS; it is necessary to create a class implementing the IDeltaMessagingPublisher
interface; this allows it to listen for incoming subscriptions requests allowing it to maintain active subscriptions to remote clients.
Publisher
public class DmsPublisher : IDeltaMessagingPublisher
{
private readonly ConcurrentDictionary<DeltaMessagingSubscription, DeltaMessagingSubscription> _subscriptions;
private readonly ISubscriptionStatusListener _subscriptionStatusListener = new DefaultSubscriptionStatusListener();
private readonly string _table;
private readonly bool syncPublisher;
/// <summary>
///
/// </summary>
/// <param name="table"></param>
public DmsPublisher(string table, bool syncPublisher = false, string publishSyncFunction = "upd")
{
this.syncPublisher = syncPublisher;
this.SyncPublishFunction = publishSyncFunction;
_table = table;
_subscriptions = new ConcurrentDictionary<DeltaMessagingSubscription, DeltaMessagingSubscription>();
}
/// <summary>
///
/// </summary>
/// <param name="topicName"></param>
/// <param name="subscription"></param>
public void SubscriptionAdded(string topicName, DeltaMessagingSubscription subscription)
{
_subscriptions[subscription] = subscription;
}
/// <summary>
///
/// </summary>
/// <param name="subscription"></param>
public void SubscriptionRemoved(DeltaMessagingSubscription subscription)
{
DeltaMessagingSubscription s;
_subscriptions.TryRemove(subscription, out s);
}
/// <summary>
/// Protected Publish. Will throw exception if there are no subscribers.
/// Will return the list of subscribers published to
/// </summary>
/// <param name="data"></param>
public List<string> PublishP(kx.c.Flip data)
{
List<string> publishedList = new List<string>();
if (!HasSubscribers())
{
throw new KxStreamException("No subscribers to topic: " + _table);
}
foreach (DeltaMessagingSubscription subscription in _subscriptions.Values)
{
subscription.Publish(_table, data);
publishedList.Add(subscription.ToString());
}
return publishedList;
}
/// <summary>
/// PubSync, Will throw exception if there are no subscribers.
/// Needs to return success/fail
/// </summary>
/// <param name="data"></param>
/// //build a dictionary,
public Dictionary<string, object> PublishSync(kx.c.Flip data)
{
return doSyncPublish(data);
}
/// <summary>
/// Simple publish. Will to anyone interested. Will not error or report if/who it has been published to
/// </summary>
/// <param name="data"></param>
public void Publish(kx.c.Flip data)
{
foreach (DeltaMessagingSubscription subscription in _subscriptions.Values)
{
subscription.Publish(_table, data);
}
}
/// <summary>
///
/// </summary>
/// <returns></returns>
public bool HasSubscribers()
{
if (!_subscriptions.Any())
{
return false;
}
foreach (DeltaMessagingSubscription s in _subscriptions.Values)
{
if (s.Subscriber.IsConnected)
{
return true;
}
}
return false;
}
private Dictionary<string, object> doSyncPublish(kx.c.Flip data)
{
Dictionary<string, object> publishedDict = new Dictionary<string, object>();
if (!HasSubscribers())
{
throw new KxStreamException("No subscribers to topic: " + _table);
}
foreach (DeltaMessagingSubscription subscription in _subscriptions.Values)
{
try
{
publishedDict.Add(subscription.ToString(), subscription.Publish(_table, data, true));
}
catch (Exception e)
{
publishedDict.Add(subscription.ToString(), e);
}
}
return publishedDict;
}
public bool SyncPublisher()
{
return syncPublisher;
}
public string SyncPublishFunction { get; } = "upd";
public ISubscriptionStatusListener SubscriptionStatusListener
{
get { return _subscriptionStatusListener; }
}
}
Your publisher can then be instantiated and registered with the Delta Messaging Server.
string channel = "tickerplant";
string table = "dfxAsk";
DmsPublisher publisher = new DmsPublisher(table);
deltaService.DeltaMessagingServer.RegisterPublisher(
publisher, channel, table, filterColumns: filterColumns);
Once initialized and registered, publishing would be achieved by using the defined DeltaMessageSubscriptions
and publishing to each.
DateTime now = DateTime.UtcNow;
c.Dict dataRow = new c.Dict(
new[] {} "sym", "price", "time", "size", "src", "sizes", "orderIds"},
new object[] { new object[] { "EUR/USD" },
new object[] { 1.23 },
new object[] { DateTime.Now },
new object[] { 10000 },
new object[] { "Cantor" },
new object[] { "M" }, new object[] { 1234532 } }
);
c.Flip f = new c.Flip(dataRow);
publisher.Publish(f);
Publishing Synchronously
In order to publish synchronously, your IDeltaMessagingPublisher
implementation needs to be marked as such. The SyncPublisher()
method on the implementation needs to return true to make use of this functionality otherwise sync publishing calls will be blocked.
This can be achieved in a variety of ways, but generally you could pass a boolean to your constructor and encapsulate it within a private field. This could then be returned from the SyncPublisher()
method.
public DmsPublisher(string table, bool syncPublisher = false, string publishSyncFunction = "upd")
{
this.syncPublisher = syncPublisher;
this.SyncPublishFunction = publishSyncFunction;
_table = table;
_subscriptions = new ConcurrentDictionary<DeltaMessagingSubscription, DeltaMessagingSubscription>();
}
public bool SyncPublisher()
{
return syncPublisher;
}
Additionally when performing the publish ensure that when calling DeltaMessagingSubscription.Publish
that the optional parameter publishSync
of the method is set to true, otherwise it will attempt to publish async.
Listening to connected subscribers
Its possible to roll your own ISubscriptionStatusListener
which allows you to react when a subscriber is connected to your publisher.
public class YourSubscriptionStatusListener : ISubscriptionStatusListener
{
public YourSubscriptionStatusListener()
{
}
public void SubscriptionConnected(string subUid)
{
//Do some something
}
public void SubscriptionDisconnected(string subUid)
{
//Do some something
}
}
This subscription status listener needs to be assigned to your publishers ISubscriptionStatusListener
property.
//DmsPublisher.cs
public ISubscriptionStatusListener SubscriptionStatusListener
{
get { return new YourSubscriptionStatusListener(); }
}
Every time a subscriber is connected to your publisher the SubscriptionConnected
method will be invoked with the subscription identifier as a parameter and vice versa when a subscriber disconnects the SubscriptionDisconnected
method will be invoked.
Subscribing
To subscribe via DMS, you should create a class implementing IDeltaMessagingSubscriber
. This enforces the need for a upd
function to accept the incoming data for processing. This will contain the topic and a flip of the data. Like publication, the DMS instance should be registered as a subscriber with a channel, topic and, if applicable, any filters.
string channel = "tickerplant";
string table = "dfxQuote";
DmsSubscriber subscriber = new DmsSubscriber();
ManualResetEventSlim registrationTrigger = new ManualResetEventSlim(false);
deltaService.DeltaMessagingServer.RegisterSubscriber(
subscriber, channel, table, filterColumns: null,
registrationTrigger: registrationTrigger
);
if (!registrationTrigger.Wait(TimeSpan.FromSeconds(5)))
{
throw new Exception("Subscriber registration did not complete promptly");
}
...
...
// When complete
deltaService.DeltaMessagingServer.UnregisterSubscriber(
subscriber, channel, table, filterColumns: null);
DMSSubscriber
public class DmsSubscriber : IDeltaMessagingSubscriber
{
public readonly ConcurrentDictionary<string,ConcurrentQueue<object>>
ReceivedData = new ConcurrentDictionary<string, ConcurrentQueue<object>>();
public void Upd(string baseTopic, object data)
{
ConcurrentQueue<object> dataForTopic = ReceivedData.GetOrAdd(
baseTopic,
bt => new ConcurrentQueue<object>()
);
dataForTopic.Enqueue(data);
}
}
Configuring DMS
Client applications can configure settings for DMS Publishers and Subscribers by accessing the DeltaApiCore.Dms.Config.DmsConfigurationSettings
class. To reset configuration settings simply invoke the DeltaApiCore.Dms.Config.DmsConfigurationSettings.ResetDefaults()
function.
Any changes to the configuration settings should be set before initializing the main DeltaApiCore.DefaultDeltaService
to avoid inconsistent behaviour.
RemoteSubscriberQueueDequeueTimeout
The RemoteSubscriberQueueDequeueTimeout setting determines the time-out in milli-seconds for dequeuing data to publish from the RemoteSubscriber
's underlying queue. If time-out is exceeded a System.TimeoutException
will be thrown by the underlying queue. By default the time-out is 1 milli-second.
// set RemoteSubscriber dequeue time-out to 100 mili-seconds
DmsConfigurationSettings.RemoteSubscriberQueueDequeueTimeout = 100;
//initialize API
using(var service = new DefaultDeltaService(...))
{
...
}
RemoteSubscriberTcpNoDelay
The RemoteSubscriberTcpNoDelay setting allows clients to enabled or disable TCP NoDelay on the underlying TCP connection in the RemoteSubscriber
when creating a new connection. Effectively System.Net.Sockets.TcpClient.NoDelay
// set RemoteSubscriber to enable TCP NoDelay
DmsConfigurationSettings.RemoteSubscriberTcpNoDelay = TcpNoDelay.True
//initialize API
using(var service = new DefaultDeltaService(...))
{
...
}
// set RemoteSubscriber to disable TCP NoDelay
DmsConfigurationSettings.RemoteSubscriberTcpNoDelay = TcpNoDelay.False
//initialize API
using(var service = new DefaultDeltaService(...))
{
...
}
RemoteSubscribeSendBufferSize
The RemoteSubscribeSendBufferSize setting allows clients to set the SendBufferSize on the underlying connection in the RemoteSubscriber
when creating a new connection. Effectively System.Net.Sockets.TcpClient.SendBufferSize
.
Note RemoteSubscribeSendBufferSize must have value greater than 0 to be set, null, 0 or negative will be ignored.
// set RemoteSubscriber SendBufferSize to 10000
DmsConfigurationSettings.RemoteSubscribeSendBufferSize = 10000;
//initialize API
using(var service = new DefaultDeltaService(...))
{
...
}
RemoteSubscriberZipEnabled
The RemoteSubscriberZipEnabled settings allows clients to zip data being published on the underlying connection in the RemoteSubscriber
. Default is false (disabled).
Note RemoteSubscriberZipEnabled is determined when underlying connection is initialized and connected so should be set before RemoteSubscriber
is created.
// enable zip on RemoteSubscriber
DmsConfigurationSettings.RemoteSubscriberZipEnabled = true;
//initialize API
using(var service = new DefaultDeltaService(...))
{
...
}
DmsConnectionMaximumRetryPeriod
The DmsConnectionMaximumRetryPeriod setting allows clients to set the maximum retry period, in milli-seconds, to attempt to open the underlying connection in the DmsConnection
. Default is 10,000 milli-seconds.
Note DmsConnectionMaximumRetryPeriod is determined when underlying connection is initialized and connected so should be set before DmsConnection
is created.
// set period to 20,000 milli-seconds
DmsConfigurationSettings.DmsConnectionMaximumRetryPeriod = 20000;
//initialize API
using(var service = new DefaultDeltaService(...))
{
...
}
DmsConnectionRetryInterval
The DmsConnectionRetryInterval setting allows clients to set the retry internval, in milli-seconds, to attempt to open the underlying connection in the DmsConnection
. Default is 2,000 milli-seconds.
Note DmsConnectionRetryInterval is determined when underlying connection is initialized and connected so should be set before DmsConnection
is created.
// set period to 500 milli-seconds
DmsConfigurationSettings.DmsConnectionRetryInterval = 500;
//initialize API
using(var service = new DefaultDeltaService(...))
{
...
}
Type mapping
C# kdb+
-----------------------------------------------------------
System.Boolean/bool boolean b
System.Byte/byte byte x
System.Int16/short short h
System.Int32/int int i
System.Int64/long long j
System.Single/float real e
System.Double/double float f
System.String/string symbol s
System.Char[]/char[] string C
System.Char/char char c
DeltaApiCore.Connections.Kdb.Support.Date date d
System.DateTime datetime z
DeltaApiCore.Connections.Kdb.Support.Month month m
DeltaApiCore.Types.QTimestamp timestamp p
DeltaApiCore.Connections.Kdb.Support.Minute minute u
DeltaApiCore.Connections.Kdb.Support.Second second v
System.TimeSpan time t
DeltaApiCore.Types.QTimespan timespan n
System.Guid guid g
DeltaApiCore.Connections.Kdb.Support.Dict dictionary
DeltaApiCore.Connections.Kdb.Support.Flip table