Kafka integration
KX Delta Platform offers direct integration with Kafka through a KX Delta Platform asset package. This asset packages all library requirements along with a simple configuration based implementation allowing Kafka producers and subscribers to be set up with the minimum effort. It offers the full functionality available of the KX Fusion interface detailed at code.kx.com without the need to manually compile and install any Kafka libraries. This includes the full library support available via librdkafka. The only requirement is the ability to update the $QHOME with the necessary libraries and kdb+ script wrapper.
Currently, all Linux flavors supported by the KX Delta Platform, including Ubuntu 20/22/24 and Red Hat 7/8/9, can integrate with Kafka.
KX Delta Platform Kafka asset installation
The Kafka KX Delta Platform Asset and Eval pack are installed in the same way as all KX Delta Platform, KX Delta Platform asset and Solution packages are installed. Place the package in the relevant package directory and run through the installation script. Further details on KX Delta Platform and package deployment can be retrieved via the deployment guides.
System configuration
The Kafka KX Delta Platform asset supports the full level of configuration via the underlying detailed at librdkafka configuration. This configuration is initialized via the config parameter kxkfkCfg. The base Kafka package includes a DEFAULT and a system override containing simple configuration. This can and should be extended by a client for all client specific overrides.
Kafka broker list
At a minimum the kxkfkCfg should be updated to point at a relevant kafka broker list metadata.broker.list.
By default a number of configurations are provided in the DEFAULT and system overrides
| param | default | description |
|---|---|---|
| metadata.broker.list | localhost:9092 | Initial list of brokers as a CSV list of broker host or host:port |
| queue.buffering.max.ms | 1 | Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches (MessageSets) to transmit to brokers. |
| fetch.wait.max.ms | 10 | Maximum time the broker may wait to fill the response with fetch.min.bytes |
| group.id | 0 | Client group ID string. All clients sharing the same group.id belong to the same group |
This is the basic configuration which can be used as a starting point to connecting to Kafka.
Publishing data to Kafka
To publish, a producer must be initialized. The first step is to initialize the Kafka API available within the Kafka KX Delta Platform Asset. To do this call the init instruction.
// Load Kafka Integration library instruction
.al.loadinstruction[`kxkfkInit];
This will load the necessary functions and initialize the underlying Kafka system libraries necessary to communicate with Kafka.
The .kx.kfk.InitProducer Analytic within the KxKafka package allows for the easy initialization of a producer. It relies on the kxkfkCfg parameter for Kafka broker configuration details. The DEFAULT and system configuration will be used as default, but can be overridden providing the appropriate kafkaConfig value as an input to the Analytic.
| parameter | type | example | description |
|---|---|---|---|
| kafkaConfig | symbol | uatConfig | Override forkxkfkCfg` detailing Kafka Broker config |
|
| topic | symbol | `quotes | Topic for producer |
| topicConfig | Dict | ()!() | Topic config dict, see code.kx.com for available config |
This analytic will return a unique ID for the producer
producer:.al.callfunction[`.kx.kfk.InitProducer][`;`quotes;()!()];
.log.out[`kafka;"Producer initialized with id";producer];
A single process can be a producer for multiple topics through calling the initialize Analytic .kx.kfk.InitProducer multiple times with the relevant topics.
Once initialized data can then be published to the Kafka broker. This is achieved via the function .kx.kfk.pub which again has been initialized via the previously loaded instruction kxkfkInit instruction.
| parameter | type | example | description |
|---|---|---|---|
| topic | symbol | `quotes | Topic to publish data on |
| kval | string | string .z.t | Key for published record |
| data | Any | Data to be published. Can be already serialized or serialized via the sFunc analytic below |
|
| partition | symbol | .kfk.PARTITION_UA | Partition to publish to |
| sFunc | symbol | .kx.kfk.JSONSerialize | Serializer function. If empty symbol, assuming data already prepared |
A publish example:
.kx.kfk.pub[`Quotes;"dfxQuote"; pubData; .kfk.PARTITION_UA; `.kx.kfk.IPCSerialize];
Example publisher
Bringing this all together once the kxkfkCfg parameter has been configured correctly, any KX Delta Platform-based q Process Template can can be initialized as a Kafka producer and begin publishing data with just a few lines of code.
An example initialization Analytic which could be assigned to the appropriate initStateFunc parameter of a process instance would be:
kafkaProducerInit
{[]
.al.loadinstruction[`kxkfkInit];
.al.callfunction[`.kx.kfk.InitProducer][`;`Quotes;()!()];
pubData:(`time`sym`price)!(enlist .z.p;enlist `$"USD/CAD";enlist 1.2345);
.kx.kfk.pub[`Quotes;"dfxQuote";(`a`b`c)!(enlist 1;enlist 2;enlist 3);.kfk.PARTITION_UA;{[x] -8!x}];
}
Consuming
In the same fashion as a producer, a consumer must be initialized. Again the first step is to initialize the the Kafka KX Delta Platform Asset.
// Load Kafka Integration library instruction
.al.loadinstruction[`kxkfkInit];
The .kx.kfk.InitConsumer analytic within the KxKafka package provides for the initialization of a consumer. It relies on the kxkfkCfg parameter for Kafka broker configuration details.
| parameter | type | example | description |
|---|---|---|---|
| kafkaConfig | symbol | `uatConfig | Override for dxkfkCfg detailing Kafka Broker config |
| topic | symbol | `quotes | Topic to consume |
| partitions | symbol[] | (.kfk.PARTITION_UA) | List of partitions to consume from |
| consumerFunc | symbol | .kx.kfk.IPCDeserializeUpd |
Analytic to consume data from Kafka |
| consumerFuncOptParams | Dict | ()!() | Optional Params which are passed to the consumer function for custom behavior |
To ensure compatibility with the Kxkafka package and KX Delta Platform consumer structure, the function consumerFunc used must have the following signature
| parameter | description |
|---|---|
| msg | Relates to the incoming Kafka message for consumption |
| optParams | Optional Parameters defined during initialization of the Kafka consumer function. Can be null, though if the consumer function relies on the contents will need to be defined |
Initializing a consumer with this function will return an ID for the consumer.
consumer:.al.callfunction[`.kx.kfk.InitConsumer][`;quotes;(.kfk.PARTITION_UA);`.kx.kfk.IPCDeserializeUpd;(`kfkRetainMsgs`kfkRetainTimings)!(rm;0b)];
.log.out[`kafka;"Consumer initialized with id";consumer];
The consumer function consumerFunc maps directly to the the KX core .kfk.consumecb function which needs defining to consume Kafka messages through the librdkafka library. The KX Delta Platform Asset provides a means to pass additional optional parameters for data processing to the consumerFunc through the use of the consumerFuncOptParams.
Once initialized, the consumer process should start receiving messages to its consumerFunc provided during initialization. Depending on the functionality of that consumerFunc will decide how the data is consumed. There are a number of default consumers documented below
Example consumer
Bringing this all together, any KX Delta Platform based q Process Template can can be initialized as a Kafka consumer and begin consuming data with just a few lines of code.
An example initialization Analytic which could assigned to the appropriate initStateFunc parameter of a process instance would be:
kafkaConsumerInit
{[]
.al.loadinstruction[`kxkfkInit];
kafkaData::();
kfkConsumerFunc:{[msg;optParams]
// Simple Example how to use passed in optParameters to the consume function
if[optParams[`kfkRetainTimings];
msg[`rcvtime]:.z.p;
];
kafkaData,::enlist msg;
};
.al.callfunction[`.kx.kfk.InitConsumer][`;`quotes;(.kfk.PARTITION_UA);kfkConsumerFunc;(enlist `kfkRetainTimings)!(enlist 1b)];
}
Serializers and deserializers
By default there are a number of serializers and deserializers available within the KX Delta Platform Asset package. These provide basic support for JSON and IPC data as well as hooking into the upd functionality. However there is no limitation to allowing solutions to implement and integrate their own serializers and deserializers into the KX Delta Platform Kafka architecture
| analytic | description |
|---|---|
.kx.kfk.IPCSerialize |
Serializes KX data to qIPC format |
.kx.kfk.JSONSerialize |
Serializes KX data to JSON format |
.kx.kfk.IPCDeserializeUpd |
Deserializes KX data from qIPC format, and publishes data to the relevant upd function defined in the process |
.kx.kfk.JSONDeserializeUpd |
Deserializes KX data from JSON format, and publishes data to the relevant upd function defined in the process. |
Using SSL
If the Kafka broker is set up to communicate over TLS/SSL, it will be necessary to add configuration to the client to allow the creation of the secure communication channels. This can be achieved through additional configuration values added to the kxkfkCfg configuration parameter.
| param | example | description |
|---|---|---|
| security.protocol | SSL | Protocol used to communicate with brokers |
| ssl.ca.location | /hom/deploy/certs/cert.pem | File or directory path to CA certificate(s) for verifying the broker's key. |
Full details on the SSL configuration available to the client can be viewed at librdkafka configuration