Apache Kafka Using Kafka with kdb+

KxSystems/kafka

kfk is a thin wrapper for kdb+ around the edenhill/librdkafka C API for Apache Kafka.

Follow the installation instructions for set-up.

To run examples on this page you will need a Kafka broker available. It is easy to set up a local instance for testing.

API

The library follows the librdkafka API closely where possible. As per its introduction:

  • Base container rd_kafka_t is a client create by .kfk.Client. .kfk.Producer and .kfk.Consumer are provided for simplicity. Provides global configuration and shared state.
  • One or more topics rd_kafka_topic_t, which are either producers or consumers and created by function .kfk.Topic

Both clients and topics accept an optional configuration dictionary. .kfk.Client and .kfk.Topic return an int which acts as a Client or Topic ID (index into an internal array). Client IDs are used to create topics and Topic IDs are used to publish or subscribe to data on that topic. They can also be used to query metadata – state of subscription, pending queues, etc.

Minimal producer example

\l kfk.q
// specify kafka brokers to connect to and statistics settings.
kfk_cfg:`metadata.broker.list`statistics.interval.ms!`localhost:9092`10000
// create producer with the config above
producer:.kfk.Producer[kfk_cfg]
// setup producer topic "test"
test_topic:.kfk.Topic[producer;`test;()!()]
// publish current time with a key "time"
.kfk.Pub[test_topic;.kfk.PARTITION_UA;string .z.t;"time"];
show "Published 1 message";

KxSystems/kafka/test_producer.q

Minimal consumer example

\l kfk.q
// create consumer process within group 0
client:.kfk.Consumer[`metadata.broker.list`group.id!`localhost:9092`0];
data:();
// setup meaningful consumer callback(do nothing by default)
.kfk.consumecb:{[msg]
    msg[`data]:"c"$msg[`data];
    msg[`rcvtime]:.z.p;
    data,::enlist msg;}
// subscribe to the "test" topic with default partitioning
.kfk.Sub[client;`test;enlist .kfk.PARTITION_UA];

KxSystems/kafka/test_consumer.q for a slightly more elaborate version

Configuration

The library supports and uses all configuration options exposed by librdkafka, except callback functions, which are identical to Kafka options by design of librdkafka.

edenhill/librdkafka/CONFIGURATION.md for a list of options

Testing

Use can use either existing Kafka broker or start a test Kafka broker as described below.

Setting up a test Kafka instance

Apache Kafka tutorial

Download and unzip Kafka.

$ cd $HOME
$ wget http://www-us.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz
$ tar xzvf kafka_2.11-0.10.2.0.tgz
$ cd $HOME/kafka_2.11-0.10.2.0

Start zookeeper.

$ bin/zookeeper-server-start.sh config/zookeeper.properties

Start Kafka broker

$ bin/kafka-server-start.sh config/server.properties

Running examples

Start producer.

q)\l test_producer.q
q)\t 1000

Start consumer.

q)\l test_consumer.q

The messages will now flow from producer to consumer and the publishing rate can be adjusted via \t x in the producer process.

Performance and tuning

edenhill/librdkafka/wiki/How-to-decrease-message-latency

There are numerous configuration options and it is best to find settings that suit your needs and setup. See Configuration above.