Skip to content

Instantly share code, notes, and snippets.

@c9n
Created September 23, 2015 08:21
Show Gist options
  • Save c9n/104c4088a4ada941e7e9 to your computer and use it in GitHub Desktop.
Save c9n/104c4088a4ada941e7e9 to your computer and use it in GitHub Desktop.
librdkafka consumer
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>
#include <getopt.h>
#include "rdkafkacpp.h"
/* for loop */
static bool run = true;
/* flag for last message */
static bool exit_eof = false;
/* int & term */
static void sigterm(int sig) { run = false; }
/* Delivery Report */
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
void dr_cb(RdKafka::Message &message) {
std::cout << "Message delivery for (" << message.len()
<< " bytes): " << message.errstr() << std::endl;
}
};
/* Event callback */
class ExampleEventCb : public RdKafka::EventCb {
public:
void event_cb(RdKafka::Event &event) {
switch (event.type()) {
case RdKafka::Event::EVENT_ERROR:
std::cerr << "ERROR (" << RdKafka::err2str(event.err())
<< "): " << event.str() << std::endl;
if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN) run = false;
break;
case RdKafka::Event::EVENT_STATS:
std::cerr << "\"STATS\": " << event.str() << std::endl;
break;
case RdKafka::Event::EVENT_LOG:
fprintf(stderr, "LOG-%i-%s: %s\n", event.severity(),
event.fac().c_str(), event.str().c_str());
break;
default:
std::cerr << "EVENT " << event.type() << " ("
<< RdKafka::err2str(event.err()) << "): " << event.str()
<< std::endl;
break;
}
}
};
/* unknown */
class MyHashPartitionerCb : public RdKafka::PartitionerCb {
public:
int32_t partitioner_cb(const RdKafka::Topic *topic, const std::string *key,
int32_t partition_cnt, void *msg_opaque) {
return djb_hash(key->c_str(), key->size()) % partition_cnt;
}
private:
static inline unsigned int djb_hash(const char *str, size_t len) {
unsigned int hash = 5381;
for (size_t i = 0; i < len; i++) hash = ((hash << 5) + hash) + str[i];
return hash;
}
};
/* consume */
void msg_consume(RdKafka::Message *message, void *opaque) {
switch (message->err()) {
case RdKafka::ERR__TIMED_OUT:
break;
case RdKafka::ERR_NO_ERROR:
/* Real message */
std::cout << "Read msg at offset " << message->offset() << std::endl;
if (message->key()) {
std::cout << "Key: " << *message->key() << std::endl;
}
printf("%.*s\n", static_cast<int>(message->len()),
static_cast<const char *>(message->payload()));
break;
case RdKafka::ERR__PARTITION_EOF:
/* Last message */
if (exit_eof) {
run = false;
}
break;
case RdKafka::ERR__UNKNOWN_TOPIC:
case RdKafka::ERR__UNKNOWN_PARTITION:
std::cerr << "Consume failed: " << message->errstr() << std::endl;
run = false;
break;
default:
/* Errors */
std::cerr << "Consume failed: " << message->errstr() << std::endl;
run = false;
}
}
/* consume callback */
class ExampleConsumeCb : public RdKafka::ConsumeCb {
public:
void consume_cb(RdKafka::Message &msg, void *opaque) {
msg_consume(&msg, opaque);
}
};
/* this is the entry */
int main(int argc, char **argv) {
std::string brokers = "localhost:9092";
std::string errstr;
std::string topic_str = "test";
std::string mode = "C";
std::string debug;
int32_t partition = 0;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
MyHashPartitionerCb hash_partitioner;
int use_ccb = 0;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
conf->set("metadata.broker.list", brokers, errstr);
if (!debug.empty()) {
if (conf->set("debug", debug, errstr) != RdKafka::Conf::CONF_OK) {
std::cerr << errstr << std::endl;
exit(1);
}
}
ExampleEventCb ex_event_cb;
conf->set("event_cb", &ex_event_cb, errstr);
if (true) {
int pass;
for (pass = 0; pass < 2; pass++) {
std::list<std::string> *dump;
if (pass == 0) {
dump = conf->dump();
std::cout << "# Global config" << std::endl;
} else {
dump = tconf->dump();
std::cout << "# Topic config" << std::endl;
}
for (std::list<std::string>::iterator it = dump->begin();
it != dump->end();) {
std::cout << *it << " = ";
it++;
std::cout << *it << std::endl;
it++;
}
std::cout << std::endl;
}
}
signal(SIGINT, sigterm);
signal(SIGTERM, sigterm);
RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);
if (!consumer) {
std::cerr << "Failed to create consumer: " << errstr << std::endl;
exit(1);
}
std::cout << "% Created consumer " << consumer->name() << std::endl;
/*
* Create topic handle.
*/
RdKafka::Topic *topic =
RdKafka::Topic::create(consumer, topic_str, tconf, errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
exit(1);
}
/*
* Start consumer for topic+partition at start offset
*/
RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset);
if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to start consumer: " << RdKafka::err2str(resp)
<< std::endl;
exit(1);
}
ExampleConsumeCb ex_consume_cb;
/*
* Consume messages
*/
while (run) {
if (use_ccb) {
consumer->consume_callback(topic, partition, 1000, &ex_consume_cb,
&use_ccb);
} else {
RdKafka::Message *msg = consumer->consume(topic, partition, 1000);
msg_consume(msg, NULL);
delete msg;
}
consumer->poll(0);
}
/*
* Stop consumer
*/
consumer->stop(topic, partition);
consumer->poll(1000);
delete topic;
delete consumer;
/*
* Wait for RdKafka to decommission.
* This is not strictly needed (when check outq_len() above), but
* allows RdKafka to clean up all its resources before the application
* exits so that memory profilers such as valgrind wont complain about
* memory leaks.
*/
RdKafka::wait_destroyed(5000);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment