Created
September 23, 2015 08:21
-
-
Save c9n/104c4088a4ada941e7e9 to your computer and use it in GitHub Desktop.
librdkafka consumer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <string> | |
#include <cstdlib> | |
#include <cstdio> | |
#include <csignal> | |
#include <cstring> | |
#include <getopt.h> | |
#include "rdkafkacpp.h" | |
/* for loop */ | |
static bool run = true; | |
/* flag for last message */ | |
static bool exit_eof = false; | |
/* int & term */ | |
static void sigterm(int sig) { run = false; } | |
/* Delivery Report */ | |
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb { | |
public: | |
void dr_cb(RdKafka::Message &message) { | |
std::cout << "Message delivery for (" << message.len() | |
<< " bytes): " << message.errstr() << std::endl; | |
} | |
}; | |
/* Event callback */ | |
class ExampleEventCb : public RdKafka::EventCb { | |
public: | |
void event_cb(RdKafka::Event &event) { | |
switch (event.type()) { | |
case RdKafka::Event::EVENT_ERROR: | |
std::cerr << "ERROR (" << RdKafka::err2str(event.err()) | |
<< "): " << event.str() << std::endl; | |
if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN) run = false; | |
break; | |
case RdKafka::Event::EVENT_STATS: | |
std::cerr << "\"STATS\": " << event.str() << std::endl; | |
break; | |
case RdKafka::Event::EVENT_LOG: | |
fprintf(stderr, "LOG-%i-%s: %s\n", event.severity(), | |
event.fac().c_str(), event.str().c_str()); | |
break; | |
default: | |
std::cerr << "EVENT " << event.type() << " (" | |
<< RdKafka::err2str(event.err()) << "): " << event.str() | |
<< std::endl; | |
break; | |
} | |
} | |
}; | |
/* unknown */ | |
class MyHashPartitionerCb : public RdKafka::PartitionerCb { | |
public: | |
int32_t partitioner_cb(const RdKafka::Topic *topic, const std::string *key, | |
int32_t partition_cnt, void *msg_opaque) { | |
return djb_hash(key->c_str(), key->size()) % partition_cnt; | |
} | |
private: | |
static inline unsigned int djb_hash(const char *str, size_t len) { | |
unsigned int hash = 5381; | |
for (size_t i = 0; i < len; i++) hash = ((hash << 5) + hash) + str[i]; | |
return hash; | |
} | |
}; | |
/* consume */ | |
void msg_consume(RdKafka::Message *message, void *opaque) { | |
switch (message->err()) { | |
case RdKafka::ERR__TIMED_OUT: | |
break; | |
case RdKafka::ERR_NO_ERROR: | |
/* Real message */ | |
std::cout << "Read msg at offset " << message->offset() << std::endl; | |
if (message->key()) { | |
std::cout << "Key: " << *message->key() << std::endl; | |
} | |
printf("%.*s\n", static_cast<int>(message->len()), | |
static_cast<const char *>(message->payload())); | |
break; | |
case RdKafka::ERR__PARTITION_EOF: | |
/* Last message */ | |
if (exit_eof) { | |
run = false; | |
} | |
break; | |
case RdKafka::ERR__UNKNOWN_TOPIC: | |
case RdKafka::ERR__UNKNOWN_PARTITION: | |
std::cerr << "Consume failed: " << message->errstr() << std::endl; | |
run = false; | |
break; | |
default: | |
/* Errors */ | |
std::cerr << "Consume failed: " << message->errstr() << std::endl; | |
run = false; | |
} | |
} | |
/* consume callback */ | |
class ExampleConsumeCb : public RdKafka::ConsumeCb { | |
public: | |
void consume_cb(RdKafka::Message &msg, void *opaque) { | |
msg_consume(&msg, opaque); | |
} | |
}; | |
/* this is the entry */ | |
int main(int argc, char **argv) { | |
std::string brokers = "localhost:9092"; | |
std::string errstr; | |
std::string topic_str = "test"; | |
std::string mode = "C"; | |
std::string debug; | |
int32_t partition = 0; | |
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING; | |
MyHashPartitionerCb hash_partitioner; | |
int use_ccb = 0; | |
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); | |
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); | |
conf->set("metadata.broker.list", brokers, errstr); | |
if (!debug.empty()) { | |
if (conf->set("debug", debug, errstr) != RdKafka::Conf::CONF_OK) { | |
std::cerr << errstr << std::endl; | |
exit(1); | |
} | |
} | |
ExampleEventCb ex_event_cb; | |
conf->set("event_cb", &ex_event_cb, errstr); | |
if (true) { | |
int pass; | |
for (pass = 0; pass < 2; pass++) { | |
std::list<std::string> *dump; | |
if (pass == 0) { | |
dump = conf->dump(); | |
std::cout << "# Global config" << std::endl; | |
} else { | |
dump = tconf->dump(); | |
std::cout << "# Topic config" << std::endl; | |
} | |
for (std::list<std::string>::iterator it = dump->begin(); | |
it != dump->end();) { | |
std::cout << *it << " = "; | |
it++; | |
std::cout << *it << std::endl; | |
it++; | |
} | |
std::cout << std::endl; | |
} | |
} | |
signal(SIGINT, sigterm); | |
signal(SIGTERM, sigterm); | |
RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr); | |
if (!consumer) { | |
std::cerr << "Failed to create consumer: " << errstr << std::endl; | |
exit(1); | |
} | |
std::cout << "% Created consumer " << consumer->name() << std::endl; | |
/* | |
* Create topic handle. | |
*/ | |
RdKafka::Topic *topic = | |
RdKafka::Topic::create(consumer, topic_str, tconf, errstr); | |
if (!topic) { | |
std::cerr << "Failed to create topic: " << errstr << std::endl; | |
exit(1); | |
} | |
/* | |
* Start consumer for topic+partition at start offset | |
*/ | |
RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset); | |
if (resp != RdKafka::ERR_NO_ERROR) { | |
std::cerr << "Failed to start consumer: " << RdKafka::err2str(resp) | |
<< std::endl; | |
exit(1); | |
} | |
ExampleConsumeCb ex_consume_cb; | |
/* | |
* Consume messages | |
*/ | |
while (run) { | |
if (use_ccb) { | |
consumer->consume_callback(topic, partition, 1000, &ex_consume_cb, | |
&use_ccb); | |
} else { | |
RdKafka::Message *msg = consumer->consume(topic, partition, 1000); | |
msg_consume(msg, NULL); | |
delete msg; | |
} | |
consumer->poll(0); | |
} | |
/* | |
* Stop consumer | |
*/ | |
consumer->stop(topic, partition); | |
consumer->poll(1000); | |
delete topic; | |
delete consumer; | |
/* | |
* Wait for RdKafka to decommission. | |
* This is not strictly needed (when check outq_len() above), but | |
* allows RdKafka to clean up all its resources before the application | |
* exits so that memory profilers such as valgrind wont complain about | |
* memory leaks. | |
*/ | |
RdKafka::wait_destroyed(5000); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment