Using Kafka with kdb+¶
kfk
is a thin wrapper for kdb+ around the edenhill/librdkafka C API for Apache Kafka.
Follow the installation instructions for set-up.
To run examples on this page you will need a Kafka broker available. It is easy to set up a local instance for testing.
API¶
The library follows the librdkafka
API closely where possible.
As per its introduction:
- Base container
rd_kafka_t
is a client create by.kfk.Client
..kfk.Producer
and.kfk.Consumer
are provided for simplicity. Provides global configuration and shared state. - One or more topics
rd_kafka_topic_t
, which are either producers or consumers and created by function.kfk.Topic
Both clients and topics accept an optional configuration dictionary.
.kfk.Client
and .kfk.Topic
return an int which acts as a Client or Topic ID (index into an internal array). Client IDs are used to create topics and Topic IDs are used to publish or subscribe to data on that topic. They can also be used to query metadata – state of subscription, pending queues, etc.
Minimal producer example¶
\l kfk.q
// specify kafka brokers to connect to and statistics settings.
kfk_cfg:`metadata.broker.list`statistics.interval.ms!`localhost:9092`10000
// create producer with the config above
producer:.kfk.Producer[kfk_cfg]
// setup producer topic "test"
test_topic:.kfk.Topic[producer;`test;()!()]
// publish current time with a key "time"
.kfk.Pub[test_topic;.kfk.PARTITION_UA;string .z.t;"time"];
show "Published 1 message";
KxSystems/kafka/test_producer.q
Minimal consumer example¶
\l kfk.q
// create consumer process within group 0
client:.kfk.Consumer[`metadata.broker.list`group.id!`localhost:9092`0];
data:();
// setup meaningful consumer callback(do nothing by default)
.kfk.consumecb:{[msg]
msg[`data]:"c"$msg[`data];
msg[`rcvtime]:.z.p;
data,::enlist msg;}
// subscribe to the "test" topic with default partitioning
.kfk.Sub[client;`test;enlist .kfk.PARTITION_UA];
KxSystems/kafka/test_consumer.q for a slightly more elaborate version
Configuration¶
The library supports and uses all configuration options exposed by librdkafka
, except callback functions, which are identical to Kafka options by design of librdkafka
.
edenhill/librdkafka/CONFIGURATION.md for a list of options
Testing¶
Use can use either existing Kafka broker or start a test Kafka broker as described below.
Setting up a test Kafka instance¶
Download and unzip Kafka.
$ cd $HOME
$ wget http://www-us.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz
$ tar xzvf kafka_2.11-0.10.2.0.tgz
$ cd $HOME/kafka_2.11-0.10.2.0
Start zookeeper
.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
Start Kafka broker
$ bin/kafka-server-start.sh config/server.properties
Running examples¶
Start producer.
q)\l test_producer.q
q)\t 1000
Start consumer.
q)\l test_consumer.q
The messages will now flow from producer to consumer and the publishing rate can be adjusted via \t x
in the producer process.
Performance and tuning¶
edenhill/librdkafka/wiki/How-to-decrease-message-latency
There are numerous configuration options and it is best to find settings that suit your needs and setup. See Configuration above.