Kafka integration
KX Delta Platform offers direct integration with Kafka through a KX Delta Platform asset package. This asset packages all library requirements along with a simple configuration based implementation allowing Kafka producers and subscribers to be set up with the minimum effort. It offers the full functionality available of the KX Fusion interface detailed at code.kx.com without the need to manually compile and install any Kafka libraries. This includes the full library support available via librdkafka. The only requirement is the ability to update the $QHOME
with the necessary libraries and kdb+ script wrapper.
Currently, all Linux flavors supported by the KX Delta Platform, including Ubuntu 18/20/22 and Red Hat 7/8/9, can integrate with Kafka.
KX Delta Platform Kafka asset installation
The Kafka KX Delta Platform Asset and Eval pack are installed in the same way as all KX Delta Platform, KX Delta Platform asset and Solution packages are installed. Place the package in the relevant package directory and run through the installation script. Further details on KX Delta Platform and package deployment can be retrieved via the deployment guides.
System configuration
The Kafka KX Delta Platform asset supports the full level of configuration via the underlying detailed at librdkafka configuration. This configuration is initialized via the config parameter kxkfkCfg
. The base Kafka package includes a DEFAULT
and a system
override containing simple configuration. This can and should be extended by a client for all client specific overrides.
Kafka broker list
At a minimum the kxkfkCfg
should be updated to point at a relevant kafka broker list metadata.broker.list
.
By default a number of configurations are provided in the DEFAULT
and system
overrides
param | default | description |
---|---|---|
metadata.broker.list | localhost:9092 | Initial list of brokers as a CSV list of broker host or host:port |
queue.buffering.max.ms | 1 | Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches (MessageSets) to transmit to brokers. |
fetch.wait.max.ms | 10 | Maximum time the broker may wait to fill the response with fetch.min.bytes |
group.id | 0 | Client group ID string. All clients sharing the same group.id belong to the same group |
This is the basic configuration which can be used as a starting point to connecting to Kafka.
Publishing data to Kafka
To publish, a producer must be initialized. The first step is to initialize the Kafka API available within the Kafka KX Delta Platform Asset. To do this call the init instruction.
// Load Kafka Integration library instruction
.al.loadinstruction[`kxkfkInit];
This will load the necessary functions and initialize the underlying Kafka system libraries necessary to communicate with Kafka.
The .kx.kfk.InitProducer
Analytic within the KxKafka package allows for the easy initialization of a producer. It relies on the kxkfkCfg
parameter for Kafka broker configuration details. The DEFAULT
and system
configuration will be used as default, but can be overridden providing the appropriate kafkaConfig value as an input to the Analytic.
parameter | type | example | description |
---|---|---|---|
kafkaConfig | symbol | uatConfig | Override for kxkfkCfg` detailing Kafka Broker config |
|
topic | symbol | `quotes | Topic for producer |
topicConfig | Dict | ()!() | Topic config dict, see code.kx.com for available config |
This analytic will return a unique ID for the producer
producer:.al.callfunction[`.kx.kfk.InitProducer][`;`quotes;()!()];
.log.out[`kafka;"Producer initialized with id";producer];
A single process can be a producer for multiple topics through calling the initialize Analytic .kx.kfk.InitProducer
multiple times with the relavant topics.
Once initialized data can then be published to the Kafka broker. This is achieved via the function .kx.kfk.pub
which again has been initialized via the previously loaded instruction kxkfkInit
instruction.
parameter | type | example | description |
---|---|---|---|
topic | symbol | `quotes | Topic to publish data on |
kval | string | string .z.t | Key for published record |
data | Any | Data to be published. Can be already serialized or serialized via the sFunc analytic below |
|
partition | symbol | .kfk.PARTITION_UA | Partition to publish to |
sFunc | symbol | .kx.kfk.JSONSerialize | Serializer function. If empty symbol, assuming data already prepared |
A publish example:
.kx.kfk.pub[`Quotes;"dfxQuote"; pubData; .kfk.PARTITION_UA; `.kx.kfk.IPCSerialize];
Example publisher
Bringing this all together once the kxkfkCfg
parameter has been configured correctly, any KX Delta Platform-based q Process Template can can be initialized as a Kafka producer and begin publishing data with just a few lines of code.
An example initialization Analytic which could be assigned to the appropriate initStateFunc
parameter of a process instance would be:
kafkaProducerInit
{[]
.al.loadinstruction[`kxkfkInit];
.al.callfunction[`.kx.kfk.InitProducer][`;`Quotes;()!()];
pubData:(`time`sym`price)!(enlist .z.p;enlist `$"USD/CAD";enlist 1.2345);
.kx.kfk.pub[`Quotes;"dfxQuote";(`a`b`c)!(enlist 1;enlist 2;enlist 3);.kfk.PARTITION_UA;{[x] -8!x}];
}
Consuming
In the same fashion as a producer, a consumer must be initialized. Again the first step is to initialize the the Kafka KX Delta Platform Asset.
// Load Kafka Integration library instruction
.al.loadinstruction[`kxkfkInit];
The .kx.kfk.InitConsumer
analytic within the KxKafka package provides for the initialization of a consumer. It relies on the kxkfkCfg
parameter for Kafka broker configuration details.
parameter | type | example | description |
---|---|---|---|
kafkaConfig | symbol | `uatConfig | Override for dxkfkCfg detailing Kafka Broker config |
topic | symbol | `quotes | Topic to consume |
paritions | symbol[] | (.kfk.PARTITION_UA) | List of partitions to consume from |
consumerFunc | symbol | .kx.kfk.IPCDeserializeUpd |
Analytic to consume data from Kafka |
consumerFuncOptParams | Dict | ()!() | Optional Params which are passed to the consumer function for custom behavior |
To ensure compatibility with the Kxkafka
package and KX Delta Platform consumer structure, the function consumerFunc
used must have the following signature
parameter | description |
---|---|
msg | Relates to the incoming Kafka message for consumption |
optParams | Optional Parameters defined during initialization of the Kafka consumer function. Can be null, though if the consumer function relies on the contents will need to be defined |
Initializing a consumer with this function will return an ID for the consumer.
consumer:.al.callfunction[`.kx.kfk.InitConsumer][`;quotes;(.kfk.PARTITION_UA);`.kx.kfk.IPCDeserializeUpd;(`kfkRetainMsgs`kfkRetainTimings)!(rm;0b)];
.log.out[`kafka;"Consumer initialized with id";consumer];
The consumer function consumerFunc
maps directly to the the KX core .kfk.consumecb
function which needs defining to consume Kakfa messages through the librdkafka
library. The KX Delta Platform Asset provides a means to pass additional optional parameters for data processing to the consumerFunc
through the use of the consumerFuncOptParams
.
Once initialized, the consumer process should start receiving messages to its consumerFunc
provided during initialization. Depending on the functionality of that consumerFunc
will decide how the data is consumed. There are a number of default consumers documented below
Example consumer
Bringing this all together, any KX Delta Platform based q Process Template can can be initialized as a Kafka consumer and begin consuming data with just a few lines of code.
An example initialization Analytic which could assigned to the appropriate initStateFunc
parameter of a process instance would be:
kafkaConsumerInit
{[]
.al.loadinstruction[`kxkfkInit];
kafkaData::();
kfkConsumerFunc:{[msg;optParams]
// Simple Example how to use passed in optParameters to the consume function
if[optParams[`kfkRetainTimings];
msg[`rcvtime]:.z.p;
];
kafkaData,::enlist msg;
};
.al.callfunction[`.kx.kfk.InitConsumer][`;`quotes;(.kfk.PARTITION_UA);kfkConsumerFunc;(enlist `kfkRetainTimings)!(enlist 1b)];
}
Serializers and deserializers
By default there are a number of serializers and deserializers available within the KX Delta Platform Asset package. These provide basic support for JSON and IPC data as well as hooking into the upd
functionality. However there is no limitation to allowing solutions to implement and integrate their own serializers and deserializers into the KX Delta Platform Kafka architecture
analytic | description |
---|---|
.kx.kfk.IPCSerialize |
Serializes KX data to qIPC format |
.kx.kfk.JSONSerialize |
Serializes KX data to JSON format |
.kx.kfk.IPCDeserializeUpd |
Deserializes KX data from qIPC format, and publishes data to the relevant upd function defined in the process |
.kx.kfk.JSONDeserializeUpd |
Deserializes KX data from JSON format, and publishes data to the relevant upd function defined in the process. |
Using SSL
If the Kafka broker is set up to communicate over TLS/SSL, it will be necessary to add configuration to the client to allow the creation of the secure communication channels. This can be achieved through additional configuration values added to the kxkfkCfg
configuration parameter.
param | example | description |
---|---|---|
security.protocol | SSL | Protocol used to communicate with brokers |
ssl.ca.location | /hom/deploy/certs/cert.pem | File or directory path to CA certificate(s) for verifying the broker's key. |
Full details on the SSL configuration available to the client can be viewed at librdkafka configuration