Publishing data¶
Example publisher to Refinery environment¶
The sections below give an example of a C# client used to publish data to Refinery. In order for the client to create a publisher instance, it needs to connect to a particular channel which is a subscriber to the data that will be published. This will typically be a Tickerplant channel (process) that is subscribing to a given topic (table).
As discussed in Pipelines, a Refinery environment can exist with multiple pipelines. These pipelines can cross multiple DataClasses. In addition, the installs can be clustered across multiple servers for fault tolerance.
The example below focuses on an environment with a single pipeline clustered across a primary and secondary host. The environment will have a single Tickerplant process on each host subscribing to a single topic (demoTable)
Two publishers will be used to publish the same data to the demoTable topic on both primary and secondary hosts. This would be useful for hot-hot deployments where data ingested needs to be published to both primary and secondary servers (see note on Hot-Hot in Publishers).
In order to publish to both Tickerplant channels, the channel (process) names need be known in advance of creating the publisher. This can be achieve by two means:
- Hardcoding or storing in an accessible location.
- Expose a public server-side API to return the Tickerplant instance names.
For demonstration purposes the former is chosen in the following example.
Prerequisites¶
Make sure you follow the prerequisites.
- This includes the version of Visual Studios required.
- Which version of .NET Framework used.
- The packages to allow Visual Studio to work with KX processes, these are found on Nexus.
Getting started¶
- In the top-left corner of Visual Studio, click
File>New Project>C# Console App (C#) - Give the application a name and select a location in which to store it >
MyFirstPublisher - Select the
Frameworkto be >.NET 6.0 - Press
Create
Next let's look into getting those downloaded files from Nexus installed.
-
Installing
DLL files:- Right-click the
application name>Add>Project Reference..>Browse. - Check all available files and
click OK.
- Right-click the
-
Install additional Packages:
- Right-click the
application name>Manage NuGet Packages..>Browse. - Search and Install:
CSharpKDB(1.3.0)Microsoft.Extensions.Configuration.Json(7.0.0)Microsoft.NETCore.Platforms(7.0.1)NLog(5.1.2)Serilog.Settings.appSettings(2.2.2)System.Configuration.ConfigurationManager(4.7.0)
- Right-click the
Now that all the behind-the-scenes packages are installed, let's get on to starting our Data Publisher!
Setting up the refinerySettings.json file¶
- Right-click on your
application name>Add>New Item. - Select any of the
JSON Configuration Filesavailable. - Rename the file by right clicking and selecting
Rename, calling it >refinerySettings.json. - Fill in the
JSON filewith our Refinery Settings:
refinerySettings (JSON)¶
Below shows an example of the client-specific setting that are used to create the settings. These are referenced in the Program.Main() class shown later on:
{
"Host": "aaa.refinery.aws.kx.com",
"Port": 8081,
"secondaryHost": "bbb.refinery.aws.kx.com",
"secondaryPort": 8081,
"Username": "Administrator",
"Password": "password",
"InstanceName": "MyFirstPublisher",
"MessagingServerConfigName": "DS_MESSAGING_SERVER:refinery_a",
"Timeout": 60000,
"Exclusive": true,
"Encrypt": false,
"Pipeline": "DemoPipeline"
}
Retrieving and using the refinerySettings¶
- Add a
New Itemagain but this time, select aCode File (C#). - Rename it to
RefinerySettings
RefinerySettings.cs¶
The RefinerySettings class assigns the values within the JSON file to variables:
namespace CSRefineryGeneric
{
class RefinerySettings
{
public const string Refinery = "Refinery";
/// <summary>
/// Primary Kx Control host
/// </summary>
public string Host { get; set; }
/// <summary>
/// Primary Kx Control port
/// </summary>
public int Port { get; set; }
/// <summary>
/// Authorised user to connect as.
/// </summary>
public string secondaryHost { get; set; }
/// <summary>
/// Primary Kx Control port
/// </summary>
public int secondaryPort { get; set; }
/// <summary>
/// Authorised user to connect as.
/// </summary>
public string Username { get; set; }
/// <summary>
/// Authorised user password.
/// </summary>
public string Password { get; set; }
/// <summary>
/// Name of this running instance. Reported to Kx Control and used to allow process monitoring.
/// </summary>
public string InstanceName { get; set; }
/// <summary>
/// Set to true if duplicate instances are not allowed.
/// </summary>
public bool Exclusive { get; set; }
/// <summary>
/// Encrypt login details.
/// </summary>
public bool Encrypt { get; set; }
/// <summary>
/// Name of Delta Messaging Server (DMS) configuration parameter.
/// </summary>
public string MessagingServerConfigName { get; set; }
/// <summary>
/// Timeout in milliseconds.
/// </summary>
public double Timeout { get; set; }
/// <summary>
/// Name of pipeline being published to.
/// </summary>
public string Pipeline { get; set; }
}
}
Now let's load the values from the JSON into our project.
- Using the
Program.csfile already made for us when we first created the project, add the below code to load in and print in the console whichever value you want.
Note
The full Program.cs file is available at the bottom of the guide.
Program.cs - beginning¶
using System;
using System.IO;
using System.Text.Json;
namespace CSRefineryGeneric
{
public class GenericPublisher
{
static void Main(string[] args)
{
try
{
Console.WriteLine("Publisher Started !!");
//Right click on refinerySettings.json in Solution Explorer, click "Copy Full Path" and paste below
string jsonLocation = "<Path>\\refinerySettings.json";
string text = File.ReadAllText(@jsonLocation);
var jsonSettings = JsonSerializer.Deserialize<RefinerySettings>(text);
string Pipeline = jsonSettings.Pipeline;
var settings = new RefinerySettings()
{
Encrypt = jsonSettings.Encrypt,
Host = jsonSettings.Host,
Port = jsonSettings.Port,
InstanceName = jsonSettings.InstanceName,
MessagingServerConfigName = jsonSettings.MessagingServerConfigName,
Exclusive = jsonSettings.Exclusive,
Username = jsonSettings.Username,
Password = jsonSettings.Password,
Timeout = jsonSettings.Timeout,
secondaryHost = jsonSettings.secondaryHost,
secondaryPort = jsonSettings.secondaryPort
};
//Prints to the Console your host name in the refinerySettings
Console.WriteLine(settings.Host);
}
catch (Exception ex)
{
Console.WriteLine($"{ex}");
}
}
}
}
Let's test out it work up until now, by running the debugger and hitting the green triangle with the application name beside it in the taskbar at the top.
Note
Make sure that you change the JSON file location! Check Program.cs code.
Creating the publisher client¶
- Within the
Program.csfile, Remove theConsole.WriteLine()and add in:
// create publisher client
var client = new RefineryClient(settings);
This is the start of using the imported JSON data.
- Next we want to add two Code File (C#), one called
RefineryClient.csand the otherIClient.cs. - The
RefineryClientcreates a connection, a service, registers and creates publishers and provides methods to send data. - The
IClientis used for configuring and publishing data.
RefineryClient.cs¶
/* kx specific code */
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using DaaSCsApiCore;
using NLog;
namespace CSRefineryGeneric
{
class RefineryClient : IClient
{
// table name ---> same table name used for both primary and secondary
private const string DemoTblTopic = "DemoTable";
private RefinerySettings _settings;
private Service _service;
// create a publiser for both primary and secondary
private Publisher _demoTbl_pub_primary;
private Publisher _demoTbl_pub_secondary;
private static readonly NLog.Logger Logger = NLog.LogManager.GetCurrentClassLogger();
public RefineryClient(RefinerySettings settings)
{
_settings = settings;
}
// function to connect to Refinery and start service
public void Connect()
{
Logger.Debug("Connecting to Refinery");
var timeout = TimeSpan.FromMilliseconds(_settings.Timeout);
_service = CreateRefineryService();
_service.Start(timeout);
}
// get publishing channel for primary
public void InitPubPrimary(String pipeline, int instance)
{
String TP = pipeline + "." + Convert.ToString(instance) + ".tp.0";
GetPublisher(TP, DemoTblTopic, instance);
}
// get publishing channel for secondary
public void InitPubSecondary(String pipeline, int instance)
{
String TP = pipeline + "." + Convert.ToString(instance) + ".tp.0";
GetPublisher(TP, DemoTblTopic, instance);
}
// publish to both hosts
public void SendDemoTbl(string[] columnNames, object[] columnValues)
{
SendData(_demoTbl_pub_primary, columnNames, columnValues);
Console.WriteLine("Published data to primary TP");
SendData(_demoTbl_pub_secondary, columnNames, columnValues);
Console.WriteLine("Published data to secondary TP");
}
// Prepares the data table to be published
void SendData(Publisher publisher, string[] columnNames, object[] columnValues)
{
var result = publisher.PublishSync(columnNames, columnValues);
var errors = result.Where(x => x.Value is Exception).ToList();
if (errors.Count != 0)
{
throw new AggregateException("An error occurred sending data to Refinery", errors.Select(x => (Exception)x.Value));
}
}
// Creates a publisher and registers the pipeline to be published to
private void GetPublisher(string TP, string tableName, int instance)
{
if (instance == 0)
{
_demoTbl_pub_primary = new Publisher(tableName, true);
_service.RegisterPublisher(_demoTbl_pub_primary, TP, tableName, null, TimeSpan.FromSeconds(60));
_demoTbl_pub_primary.WaitForSubscribers(TimeSpan.FromSeconds(60));
}
if (instance == 1)
{
_demoTbl_pub_secondary = new Publisher(tableName, true);
_service.RegisterPublisher(_demoTbl_pub_secondary, TP, tableName, null, TimeSpan.FromSeconds(60));
_demoTbl_pub_secondary.WaitForSubscribers(TimeSpan.FromSeconds(60));
}
}
// Assigns the Refinery Settings to a service
Service CreateRefineryService()
{
Service.RegisterLogger(new NLogTraceListener(), SourceLevels.All, true);
return new Service(
_settings.Host,
_settings.Port,
_settings.secondaryHost,
_settings.secondaryPort,
_settings.Username,
_settings.Password,
_settings.InstanceName,
_settings.Exclusive,
_settings.MessagingServerConfigName,
_settings.Encrypt);
}
// If there is an error with creating the service this will get called
public void Dispose()
{
if (_service != null)
{
try
{
// repeat these blocks for each publisher on that topic
if (_demoTbl_pub_primary != null)
{
_service.UnregisterPublisher(_demoTbl_pub_primary, DemoTblTopic, null);
}
if (_demoTbl_pub_secondary != null)
{
_service.UnregisterPublisher(_demoTbl_pub_secondary, DemoTblTopic, null);
}
_service.Stop();
}
catch (SynchronizationLockException ex)
{
Logger.Error("Error during shutdown.", ex.Message);
}
}
}
}
}
IClient.cs¶
using System;
namespace CSRefineryGeneric
{
interface IClient : IDisposable
{
void Connect();
void InitPubPrimary(String pipeline, int instance);
void InitPubSecondary(String pipeline, int instance);
void SendDemoTbl(string[] columnNames, object[] columnValues);
}
}
Publish data¶
Now that the inner works are taken care of, it's time to finish off the Program.cs file so that data can be published to the Tickerplants.
The additional code will:
- Connect to Refinery.
- Create a connection to the Primary pipeline.
- Create a connection to the Secondary pipeline.
- Create the data table.
- Publish the table to the TP.
Add in the below code after the initialisation of the var client.
Program.cs - Ending¶
// Connect to Refinery and start service
client.Connect();
// Sets up primary host (so instance 0)
client.InitPubPrimary(Pipeline, 0);
// Sets up secondary host (so instance 1)
client.InitPubSecondary(Pipeline, 1);
// Create a sample run for publishing
var DemoTblCols = new[]
{
"time",
"sym",
"price",
"volume"
};
var DemoTblData = new object[]
{
new DateTime[] {DateTime.UtcNow },
new string[] { "MyFirstPublisher" },
new double[] { 123.456 },
new long[] {876543l }
};
// Sends the data to the TPs
client.SendDemoTbl(DemoTblCols, DemoTblData);
Logs implementation¶
To finish things off, we want to be able to see the inner workings of how our publisher connects to our Refinery.
- Add a
New Itemagain but this time, selectApplication Configuration file. - Rename it to
nlog.config - Add the below code:
<?xml version="1.0" encoding="utf-8" ?>
<!-- XSD manual extracted from package NLog.Schema: https://www.nuget.org/packages/NLog.Schema-->
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd" xsi:schemaLocation="NLog NLog.xsd"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
autoReload="true"
internalLogFile="console-example-internal.log"
internalLogLevel="Info" >
<!-- the targets to write to -->
<targets>
<!-- write logs to file -->
<target xsi:type="File" name="logfile" fileName="console-example.log"
layout="${longdate}|${level}|${message} |${all-event-properties} ${exception:format=tostring}" />
<target xsi:type="Console" name="logconsole"
layout="${longdate}|${level}|${message} |${all-event-properties} ${exception:format=tostring}" />
</targets>
<!-- rules to map from logger name to target -->
<rules>
<logger name="*" minlevel="Trace" writeTo="logfile,logconsole" />
</rules>
</nlog>
Lastly, we want to change the settings of the nlog.config file to print out the logs.
- Go to the
nlog.configfile in thesolution explorer>left-clickit. - The properties will populate below.
- In the
Advancedsection, change theCopy to Output Directoryvalue toCopy Always.
Now run your application and watch the logs flow in and publish data to your pipeline.
Note
Congratulations! You have successfully published data to your pipeline! Check out Notes for some expansion ideas.
Appendix¶
Program.cs - full¶
using System;
using System.IO;
using System.Text.Json;
namespace CSRefineryGeneric
{
public class GenericPublisher
{
static void Main(string[] args)
{
try
{
Console.WriteLine("Publisher Started !!");
//Right click on refinerySettings.json in Solution Explorer, click "Copy Full Path" and paste below
string jsonLocation = "<Path>\\refinerySettings.json";
string text = File.ReadAllText(@jsonLocation);
var jsonSettings = JsonSerializer.Deserialize<RefinerySettings>(text);
string Pipeline = jsonSettings.Pipeline;
var settings = new RefinerySettings()
{
Encrypt = jsonSettings.Encrypt,
Host = jsonSettings.Host,
Port = jsonSettings.Port,
InstanceName = jsonSettings.InstanceName,
MessagingServerConfigName = jsonSettings.MessagingServerConfigName,
Exclusive = jsonSettings.Exclusive,
Username = jsonSettings.Username,
Password = jsonSettings.Password,
Timeout = jsonSettings.Timeout,
secondaryHost = jsonSettings.secondaryHost,
secondaryPort = jsonSettings.secondaryPort
};
// Create publisher client
var client = new RefineryClient(settings);
// Connect to Refinery and start service
client.Connect();
// Sets up primary host (so instance 0)
client.InitPubPrimary(Pipeline, 0);
// Sets up secondary host (so instance 1)
client.InitPubSecondary(Pipeline, 1);
// Create a sample run for publishing
var DemoTblCols = new[]
{
"time",
"sym",
"price",
"volume"
};
var DemoTblData = new object[]
{
new DateTime[] {DateTime.UtcNow },
new string[] { "MyFirstPublisher" },
new double[] { 123.456 },
new long[] {876543l }
};
// Sends the data to the TPs
client.SendDemoTbl(DemoTblCols, DemoTblData);
}
catch (Exception ex)
{
Console.WriteLine($"{ex}");
}
}
}
}
Notes¶
-
Server-side function could be extended to check for more metadata, such as extract processes that match a given pipeline or dataClass.
-
The client could be modified to publish to multiple pipelines on a single host.
-
The server-side API is not necessary if the channels are known and can be provided to the client by some other means (such as an command line arguments or reference file).
IMPORTANT: Error Handling is needed if a messaging channel goes down. For example if the primary Tickerplant channel fails a C# expectation will be raised when trying to publish to that process.