# Kafka integration

Platform offers direct integration with Kafka through a Platform Asset package. This asset packages all library requirements along with a simple configuration based implementation allowing Kafka producers and subscribers to be set up with the minimum effort. It offers the full functionality available of the Kx Fusion interface detailed at code.kx.com without the need to manually compile and install any Kafka libraries. This includes the full library support available via librdkafka. The only requirement is the ability to update the $QHOME with the necessary libraries and kdb+ script wrapper. Currently all Linux flavours supported by the Platform support Kafka integration; including Ubuntu 16, Red Hat 6 and Red Hat 7. ## Platform Kafka asset installation The Kafka Platform Asset and Eval pack are installed in the same way as all Platform, Platform Asset and Solution packages are installed. Place the package in the relevant package directory and run through the installation script. Further details on Platform and package deployment can be retrieved via the deployment guides. ## System configuration The Kafka Platform asset supports the full level of configuration via the underlying detailed at librdkafka configuration. This configuration is initialized via the config parameter kxkfkCfg. The base Kafka package includes a DEFAULT and a system override containing simple configuration. This can and should be extended by a client for all client specific overrides. Kafka broker list At a minimum the kxkfkCfg should be updated to point at a relevant kafka broker list metadata.broker.list. By default a number of configurations are provided in the DEFAULT and system overrides param default description metadata.broker.list localhost:9092 Initial list of brokers as a CSV list of broker host or host:port queue.buffering.max.ms 1 Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches (MessageSets) to transmit to brokers. fetch.wait.max.ms 10 Maximum time the broker may wait to fill the response with fetch.min.bytes group.id 0 Client group ID string. All clients sharing the same group.id belong to the same group This is the basic configuration which can be used as a starting point to connecting to Kafka. ## Publishing data to Kafka To publish, a producer must be initialized. The first step is to initialize the Kafka API available within the Kafka Platform Asset. To do this call the init instruction. // Load Kafka Integration library instruction .al.loadinstruction[kxkfkInit]; This will load the necessary functions and initialize the underlying Kafka system libraries necessary to communicate with Kafka. The .kx.kfk.InitProducer Analytic within the KxKafka package allows for the easy initialization of a producer. It relies on the kxkfkCfg parameter for Kafka broker configuration details. The DEFAULT and system configuration will be used as default, but can be overridden providing the appropriate kafkaConfig value as an input to the Analytic. parameter type example description kafkaConfig symbol uatConfig | Override forkxkfkCfg detailing Kafka Broker config topic symbol quotes Topic for producer topicConfig Dict ()!() Topic config dict, see code.kx.com for available config This analytic will return a unique ID for the producer producer:.al.callfunction[.kx.kfk.InitProducer][;quotes;()!()]; .log.out[kafka;"Producer initialized with id";producer]; A single process can be a producer for multiple topics through calling the initialize Analytic .kx.kfk.InitProducer multiple times with the relavant topics. Once initialized data can then be published to the Kafka broker. This is achieved via the function .kx.kfk.pub which again has been initialized via the previously loaded instruction kxkfkInit instruction. parameter type example description topic symbol quotes Topic to publish data on kval string string .z.t Key for published record data Any Data to be published. Can be already serialized or serialized via the sFunc analytic below partition symbol .kfk.PARTITION_UA Partition to publish to sFunc symbol .kx.kfk.JSONSerialize Serializer function. If empty symbol, assuming data already prepared A publish example: .kx.kfk.pub[Quotes;"dfxQuote"; pubData; .kfk.PARTITION_UA; .kx.kfk.IPCSerialize]; ### Example Publisher Bringing this all together once the kxkfkCfg parameter has been configured correctly, any Platform-based q Process Template can can be initialized as a Kafka producer and begin publishing data with just a few lines of code. An example initialization Analytic which could be assigned to the appropriate initStateFunc parameter of a process instance would be: kafkaProducerInit {[] .al.loadinstruction[kxkfkInit]; .al.callfunction[.kx.kfk.InitProducer][;Quotes;()!()]; pubData:(timesymprice)!(enlist .z.p;enlist $"USD/CAD";enlist 1.2345);
.kx.kfk.pub[Quotes;"dfxQuote";(abc)!(enlist 1;enlist 2;enlist 3);.kfk.PARTITION_UA;{[x] -8!x}];
}

## Consuming

In the same fashion as a producer, a consumer must be initialized. Again the first step is to initialize the the Kafka Platform Asset.

// Load Kafka Integration library instruction
.al.loadinstruction[kxkfkInit];

The .kx.kfk.InitConsumer analytic within the KxKafka package provides for the initialization of a consumer. It relies on the kxkfkCfg parameter for Kafka broker configuration details.

parameter type example description
kafkaConfig symbol uatConfig Override for dxkfkCfg detailing Kafka Broker config
topic symbol quotes Topic to consume
paritions symbol[] (.kfk.PARTITION_UA) List of partitions to consume from
consumerFunc symbol .kx.kfk.IPCDeserializeUpd Analytic to consume data from Kafka
consumerFuncOptParams Dict ()!() Optional Params which are passed to the consumer function for custom behavior

To ensure compatibility with the Kxkafka package and Platform consumer structure, the function consumerFunc used must have the following signature

parameter description
msg Relates to the incoming Kafka message for consumption
optParams Optional Parameters defined during initialization of the Kafka consumer function. Can be null, though if the consumer function relies on the contents will need to be defined

Initializing a consumer with this function will return an ID for the consumer.

consumer:.al.callfunction[.kx.kfk.InitConsumer][;quotes;(.kfk.PARTITION_UA);.kx.kfk.IPCDeserializeUpd;(kfkRetainMsgskfkRetainTimings)!(rm;0b)];
.log.out[kafka;"Consumer initialized with id";consumer];

The consumer function consumerFunc maps directly to the the Kx core .kfk.consumecb function which needs defining to consume Kakfa messages through the librdkafka library. The Platform Asset provides a means to pass additional optional parameters for data processing to the consumerFunc through the use of the consumerFuncOptParams.

Once initialized, the consumer process should start receiving messages to its consumerFunc provided during initialization. Depending on the functionality of that consumerFunc will decide how the data is consumed. There are a number of default consumers documented below

### Example Consumer

Bringing this all together, any Platform based q Process Template can can be initialized as a Kafka consumer and begin consuming data with just a few lines of code.

An example initialization Analytic which could assigned to the appropriate initStateFunc parameter of a process instance would be:

kafkaConsumerInit

{[]
.al.loadinstruction[kxkfkInit];

kfkConsumerFunc:{[msg;optParams]
// Simple Example how to use passed in optParameters to the consume function
if[optParams[kfkRetainTimings];
msg[rcvtime]:.z.p;
];
};

.al.callfunction[.kx.kfk.InitConsumer][;quotes;(.kfk.PARTITION_UA);kfkConsumerFunc;(enlist `kfkRetainTimings)!(enlist 1b)];
}

## Serializers and Deserializers

By default there are a number of serializers and deserializers available within the Platform Asset package. These provide basic support for JSON and IPC data as well as hooking into the upd functionality. However there is no limitation to allowing solutions to implement and integrate their own serializers and deserializers into the Platform Kafka architecture

analytic description
.kx.kfk.IPCSerialize Serializes Kx data to qIPC format
.kx.kfk.JSONSerialize Serializes Kx data to JSON format
.kx.kfk.IPCDeserializeUpd Deserializes Kx data from qIPC format, and publishes data to the relevant upd function defined in the process
.kx.kfk.JSONDeserializeUpd Deserializes Kx data from JSON format, and publishes data to the relevant upd function defined in the process.

## Using SSL

If the Kafka broker is set up to communicate over TLS/SSL, it will be necessary to add configuration to the client to allow the creation of the secure communication channels. This can be achieved through additional configuration values added to the kxkfkCfg configuration parameter.

param example description
security.protocol SSL Protocol used to communicate with brokers
ssl.ca.location /hom/deploy/certs/cert.pem File or directory path to CA certificate(s) for verifying the broker's key.

Full details on the SSL configuration available to the client can be viewed at librdkafka configuration