Created
June 21, 2019 09:16
-
-
Save avgerin0s/561f18e417f74cb05a264f402b369118 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <ctype.h> | |
#include <signal.h> | |
#include <string.h> | |
#include <unistd.h> | |
#include <stdlib.h> | |
#include <syslog.h> | |
#include <sys/time.h> | |
#include <errno.h> | |
#include <getopt.h> | |
#include <librdkafka/rdkafka.h> | |
#include <sys/types.h> | |
#include <sys/wait.h> | |
static int run = 1; | |
static rd_kafka_t *rk; | |
static int exit_eof = 0; | |
static int wait_eof = 0; /* number of partitions awaiting EOF */ | |
static int quiet = 0; | |
static enum { | |
OUTPUT_HEXDUMP, | |
OUTPUT_RAW, | |
} output = OUTPUT_HEXDUMP; | |
static void stop (int sig) { | |
if (!run) | |
exit(1); | |
fprintf(stdout, "Closing\n"); | |
run = 0; | |
fclose(stdin); /* abort fgets() */ | |
} | |
static void hexdump (FILE *fp, const char *name, const void *ptr, size_t len) { | |
const char *p = (const char *)ptr; | |
unsigned int of = 0; | |
if (name) | |
fprintf(fp, "%s hexdump (%zd bytes):\n", name, len); | |
for (of = 0 ; of < len ; of += 16) { | |
char hexen[16*3+1]; | |
char charen[16+1]; | |
int hof = 0; | |
int cof = 0; | |
int i; | |
for (i = of ; i < (int)of + 16 && i < (int)len ; i++) { | |
hof += sprintf(hexen+hof, "%02x ", p[i] & 0xff); | |
cof += sprintf(charen+cof, "%c", | |
isprint((int)p[i]) ? p[i] : '.'); | |
} | |
fprintf(fp, "%08x: %-48s %-16s\n", | |
of, hexen, charen); | |
} | |
} | |
/** | |
* Kafka logger callback (optional) | |
*/ | |
static void logger (const rd_kafka_t *rk, int level, | |
const char *fac, const char *buf) { | |
struct timeval tv; | |
gettimeofday(&tv, NULL); | |
fprintf(stdout, "%u.%03u RDKAFKA-%i-%s: %s: %s\n", | |
(int)tv.tv_sec, (int)(tv.tv_usec / 1000), | |
level, fac, rd_kafka_name(rk), buf); | |
} | |
/** | |
* Handle and print a consumed message. | |
* Internally crafted messages are also used to propagate state from | |
* librdkafka to the application. The application needs to check | |
* the `rkmessage->err` field for this purpose. | |
*/ | |
static void msg_consume (rd_kafka_message_t *rkmessage) { | |
if (rkmessage->err) { | |
if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { | |
fprintf(stderr, | |
"%% Consumer reached end of %s [%"PRId32"] " | |
"message queue at offset %"PRId64"\n", | |
rd_kafka_topic_name(rkmessage->rkt), | |
rkmessage->partition, rkmessage->offset); | |
if (exit_eof && --wait_eof == 0) { | |
fprintf(stderr, | |
"%% All partition(s) reached EOF: " | |
"exiting\n"); | |
run = 0; | |
} | |
return; | |
} | |
if (rkmessage->rkt) | |
fprintf(stderr, "%% Consume error for " | |
"topic \"%s\" [%"PRId32"] " | |
"offset %"PRId64": %s\n", | |
rd_kafka_topic_name(rkmessage->rkt), | |
rkmessage->partition, | |
rkmessage->offset, | |
rd_kafka_message_errstr(rkmessage)); | |
else | |
fprintf(stderr, "%% Consumer error: %s: %s\n", | |
rd_kafka_err2str(rkmessage->err), | |
rd_kafka_message_errstr(rkmessage)); | |
if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION || | |
rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC) | |
run = 0; | |
return; | |
} | |
if (!quiet) | |
fprintf(stdout, "%% Message (topic %s [%"PRId32"], " | |
"offset %"PRId64", %zd bytes):\n", | |
rd_kafka_topic_name(rkmessage->rkt), | |
rkmessage->partition, | |
rkmessage->offset, rkmessage->len); | |
if (rkmessage->key_len) { | |
if (output == OUTPUT_HEXDUMP) | |
hexdump(stdout, "Message Key", | |
rkmessage->key, rkmessage->key_len); | |
else | |
printf("Key: %.*s\n", | |
(int)rkmessage->key_len, (char *)rkmessage->key); | |
} | |
if (output == OUTPUT_HEXDUMP) | |
hexdump(stdout, "Message Payload", | |
rkmessage->payload, rkmessage->len); | |
else | |
printf("%.*s\n", | |
(int)rkmessage->len, (char *)rkmessage->payload); | |
} | |
static void print_partition_list (FILE *fp, | |
const rd_kafka_topic_partition_list_t | |
*partitions) { | |
int i; | |
for (i = 0 ; i < partitions->cnt ; i++) { | |
fprintf(stderr, "%s %s [%"PRId32"] offset %"PRId64, | |
i > 0 ? ",":"", | |
partitions->elems[i].topic, | |
partitions->elems[i].partition, | |
partitions->elems[i].offset); | |
} | |
fprintf(stderr, "\n"); | |
} | |
static void rebalance_cb (rd_kafka_t *rk, | |
rd_kafka_resp_err_t err, | |
rd_kafka_topic_partition_list_t *partitions, | |
void *opaque) { | |
fprintf(stderr, "%% Consumer group rebalanced: "); | |
switch (err) | |
{ | |
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: | |
fprintf(stderr, "assigned:\n"); | |
print_partition_list(stderr, partitions); | |
rd_kafka_assign(rk, partitions); | |
wait_eof += partitions->cnt; | |
break; | |
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: | |
fprintf(stderr, "revoked:\n"); | |
print_partition_list(stderr, partitions); | |
rd_kafka_assign(rk, NULL); | |
wait_eof = 0; | |
break; | |
default: | |
fprintf(stderr, "failed: %s\n", | |
rd_kafka_err2str(err)); | |
rd_kafka_assign(rk, NULL); | |
break; | |
} | |
} | |
static int describe_groups (rd_kafka_t *rk, const char *group) { | |
rd_kafka_resp_err_t err; | |
const struct rd_kafka_group_list *grplist; | |
int i; | |
err = rd_kafka_list_groups(rk, group, &grplist, 10000); | |
if (err) { | |
fprintf(stderr, "%% Failed to acquire group list: %s\n", | |
rd_kafka_err2str(err)); | |
return -1; | |
} | |
for (i = 0 ; i < grplist->group_cnt ; i++) { | |
const struct rd_kafka_group_info *gi = &grplist->groups[i]; | |
int j; | |
printf("Group \"%s\" in state %s on broker %d (%s:%d)\n", | |
gi->group, gi->state, | |
gi->broker.id, gi->broker.host, gi->broker.port); | |
if (gi->err) | |
printf(" Error: %s\n", rd_kafka_err2str(gi->err)); | |
printf(" Protocol type \"%s\", protocol \"%s\", " | |
"with %d member(s):\n", | |
gi->protocol_type, gi->protocol, gi->member_cnt); | |
for (j = 0 ; j < gi->member_cnt ; j++) { | |
const struct rd_kafka_group_member_info *mi; | |
mi = &gi->members[j]; | |
printf(" \"%s\", client id \"%s\" on host %s\n", | |
mi->member_id, mi->client_id, mi->client_host); | |
printf(" metadata: %d bytes\n", | |
mi->member_metadata_size); | |
printf(" assignment: %d bytes\n", | |
mi->member_assignment_size); | |
} | |
printf("\n"); | |
} | |
if (group && !grplist->group_cnt) | |
fprintf(stderr, "%% No matching group (%s)\n", group); | |
rd_kafka_group_list_destroy(grplist); | |
return 0; | |
} | |
static void sig_usr1 (int sig) { | |
rd_kafka_dump(stdout, rk); | |
} | |
int main (int argc, char **argv) { | |
char mode = 'C'; | |
char *brokers = "localhost:9092"; | |
int opt; | |
rd_kafka_conf_t *conf; | |
rd_kafka_topic_conf_t *topic_conf; | |
char errstr[512]; | |
const char *debug = NULL; | |
int do_conf_dump = 0; | |
char tmp[16]; | |
rd_kafka_resp_err_t err; | |
char *group = NULL; | |
rd_kafka_topic_partition_list_t *topics; | |
int is_subscription; | |
int i; | |
quiet = !isatty(STDIN_FILENO); | |
/* Kafka configuration */ | |
conf = rd_kafka_conf_new(); | |
/* Set logger */ | |
rd_kafka_conf_set_log_cb(conf, logger); | |
/* Quick termination */ | |
snprintf(tmp, sizeof(tmp), "%i", SIGIO); | |
rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0); | |
/* Topic configuration */ | |
topic_conf = rd_kafka_topic_conf_new(); | |
while ((opt = getopt(argc, argv, "g:b:qd:eX:ADO")) != -1) { | |
switch (opt) { | |
case 'b': | |
brokers = optarg; | |
break; | |
case 'g': | |
group = optarg; | |
break; | |
case 'e': | |
exit_eof = 1; | |
break; | |
case 'd': | |
debug = optarg; | |
break; | |
case 'q': | |
quiet = 1; | |
break; | |
case 'A': | |
output = OUTPUT_RAW; | |
break; | |
case 'X': | |
{ | |
char *name, *val; | |
rd_kafka_conf_res_t res; | |
if (!strcmp(optarg, "list") || | |
!strcmp(optarg, "help")) { | |
rd_kafka_conf_properties_show(stdout); | |
exit(0); | |
} | |
if (!strcmp(optarg, "dump")) { | |
do_conf_dump = 1; | |
continue; | |
} | |
name = optarg; | |
if (!(val = strchr(name, '='))) { | |
fprintf(stderr, "%% Expected " | |
"-X property=value, not %s\n", name); | |
exit(1); | |
} | |
*val = '\0'; | |
val++; | |
res = RD_KAFKA_CONF_UNKNOWN; | |
/* Try "topic." prefixed properties on topic | |
* conf first, and then fall through to global if | |
* it didnt match a topic configuration property. */ | |
if (!strncmp(name, "topic.", strlen("topic."))) | |
res = rd_kafka_topic_conf_set(topic_conf, | |
name+ | |
strlen("topic."), | |
val, | |
errstr, | |
sizeof(errstr)); | |
if (res == RD_KAFKA_CONF_UNKNOWN) | |
res = rd_kafka_conf_set(conf, name, val, | |
errstr, sizeof(errstr)); | |
if (res != RD_KAFKA_CONF_OK) { | |
fprintf(stderr, "%% %s\n", errstr); | |
exit(1); | |
} | |
} | |
break; | |
case 'D': | |
case 'O': | |
mode = opt; | |
break; | |
default: | |
goto usage; | |
} | |
} | |
if (do_conf_dump) { | |
const char **arr; | |
size_t cnt; | |
int pass; | |
for (pass = 0 ; pass < 2 ; pass++) { | |
if (pass == 0) { | |
arr = rd_kafka_conf_dump(conf, &cnt); | |
printf("# Global config\n"); | |
} else { | |
printf("# Topic config\n"); | |
arr = rd_kafka_topic_conf_dump(topic_conf, | |
&cnt); | |
} | |
for (i = 0 ; i < (int)cnt ; i += 2) | |
printf("%s = %s\n", | |
arr[i], arr[i+1]); | |
printf("\n"); | |
rd_kafka_conf_dump_free(arr, cnt); | |
} | |
exit(0); | |
} | |
if (strchr("OC", mode) && optind == argc) { | |
usage: | |
fprintf(stderr, | |
"Usage: %s [options] <topic[:part]> <topic[:part]>..\n" | |
"\n" | |
"librdkafka version %s (0x%08x)\n" | |
"\n" | |
" Options:\n" | |
" -g <group> Consumer group (%s)\n" | |
" -b <brokers> Broker address (%s)\n" | |
" -e Exit consumer when last message\n" | |
" in partition has been received.\n" | |
" -D Describe group.\n" | |
" -O Get commmitted offset(s)\n" | |
" -d [facs..] Enable debugging contexts:\n" | |
" %s\n" | |
" -q Be quiet\n" | |
" -A Raw payload output (consumer)\n" | |
" -X <prop=name> Set arbitrary librdkafka " | |
"configuration property\n" | |
" Properties prefixed with \"topic.\" " | |
"will be set on topic object.\n" | |
" Use '-X list' to see the full list\n" | |
" of supported properties.\n" | |
"\n" | |
"For balanced consumer groups use the 'topic1 topic2..'" | |
" format\n" | |
"and for static assignment use " | |
"'topic1:part1 topic1:part2 topic2:part1..'\n" | |
"\n", | |
argv[0], | |
rd_kafka_version_str(), rd_kafka_version(), | |
group, brokers, | |
RD_KAFKA_DEBUG_CONTEXTS); | |
exit(1); | |
} | |
signal(SIGINT, stop); | |
signal(SIGUSR1, sig_usr1); | |
if (debug && | |
rd_kafka_conf_set(conf, "debug", debug, errstr, sizeof(errstr)) != | |
RD_KAFKA_CONF_OK) { | |
fprintf(stderr, "%% Debug configuration failed: %s: %s\n", | |
errstr, debug); | |
exit(1); | |
} | |
/* | |
* Client/Consumer group | |
*/ | |
if (strchr("CO", mode)) { | |
/* Consumer groups require a group id */ | |
if (!group) | |
group = "rdkafka_consumer_example"; | |
if (rd_kafka_conf_set(conf, "group.id", group, | |
errstr, sizeof(errstr)) != | |
RD_KAFKA_CONF_OK) { | |
fprintf(stderr, "%% %s\n", errstr); | |
exit(1); | |
} | |
/* Consumer groups always use broker based offset storage */ | |
if (rd_kafka_topic_conf_set(topic_conf, "offset.store.method", | |
"broker", | |
errstr, sizeof(errstr)) != | |
RD_KAFKA_CONF_OK) { | |
fprintf(stderr, "%% %s\n", errstr); | |
exit(1); | |
} | |
/* Set default topic config for pattern-matched topics. */ | |
rd_kafka_conf_set_default_topic_conf(conf, topic_conf); | |
/* Callback called on partition assignment changes */ | |
rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb); | |
rd_kafka_conf_set(conf, "enable.partition.eof", "true", | |
NULL, 0); | |
} | |
/* Create Kafka handle */ | |
if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, | |
errstr, sizeof(errstr)))) { | |
fprintf(stderr, | |
"%% Failed to create new consumer: %s\n", | |
errstr); | |
exit(1); | |
} | |
/* Add brokers */ | |
if (rd_kafka_brokers_add(rk, brokers) == 0) { | |
fprintf(stderr, "%% No valid brokers specified\n"); | |
exit(1); | |
} | |
if (mode == 'D') { | |
int r; | |
/* Describe groups */ | |
r = describe_groups(rk, group); | |
rd_kafka_destroy(rk); | |
exit(r == -1 ? 1 : 0); | |
} | |
/* Redirect rd_kafka_poll() to consumer_poll() */ | |
rd_kafka_poll_set_consumer(rk); | |
topics = rd_kafka_topic_partition_list_new(argc - optind); | |
is_subscription = 1; | |
for (i = optind ; i < argc ; i++) { | |
/* Parse "topic[:part] */ | |
char *topic = argv[i]; | |
char *t; | |
int32_t partition = -1; | |
if ((t = strstr(topic, ":"))) { | |
*t = '\0'; | |
partition = atoi(t+1); | |
is_subscription = 0; /* is assignment */ | |
wait_eof++; | |
} | |
rd_kafka_topic_partition_list_add(topics, topic, partition); | |
} | |
if (mode == 'O') { | |
/* Offset query */ | |
err = rd_kafka_committed(rk, topics, 5000); | |
if (err) { | |
fprintf(stderr, "%% Failed to fetch offsets: %s\n", | |
rd_kafka_err2str(err)); | |
exit(1); | |
} | |
for (i = 0 ; i < topics->cnt ; i++) { | |
rd_kafka_topic_partition_t *p = &topics->elems[i]; | |
printf("Topic \"%s\" partition %"PRId32, | |
p->topic, p->partition); | |
if (p->err) | |
printf(" error %s", | |
rd_kafka_err2str(p->err)); | |
else { | |
printf(" offset %"PRId64"", | |
p->offset); | |
if (p->metadata_size) | |
printf(" (%d bytes of metadata)", | |
(int)p->metadata_size); | |
} | |
printf("\n"); | |
} | |
goto done; | |
} | |
if (is_subscription) { | |
fprintf(stderr, "%% Subscribing to %d topics\n", topics->cnt); | |
if ((err = rd_kafka_subscribe(rk, topics))) { | |
fprintf(stderr, | |
"%% Failed to start consuming topics: %s\n", | |
rd_kafka_err2str(err)); | |
exit(1); | |
} | |
} else { | |
fprintf(stderr, "%% Assigning %d partitions\n", topics->cnt); | |
if ((err = rd_kafka_assign(rk, topics))) { | |
fprintf(stderr, | |
"%% Failed to assign partitions: %s\n", | |
rd_kafka_err2str(err)); | |
} | |
} | |
// This is the part I modified to introduce a use case of a real | |
// application of mine. Basically, we poll from the parent process and | |
// just print the message in the child one. Before exiting the child | |
// process we call `rd_kafka_destroy` which hangs indefinitely. | |
fprintf(stdout, "Main PID: %d\n", getpid()); | |
while (run) { | |
rd_kafka_message_t *rkmessage; | |
rkmessage = rd_kafka_consumer_poll(rk, 1000); | |
if (rkmessage) { | |
pid_t cpid; | |
cpid = fork(); | |
// child proc | |
if (cpid == 0) { | |
msg_consume(rkmessage); | |
rd_kafka_message_destroy(rkmessage); | |
// This is the part that hangs | |
rd_kafka_destroy(rk); | |
exit(0); | |
// parent | |
} else if (cpid > 0) { | |
fprintf(stdout, "Waiting for %d\n", cpid); | |
pid_t r = waitpid(cpid, NULL, 0); | |
fprintf(stdout, "Exited %d\n", cpid); | |
} else { | |
fprintf(stderr, "Fork failed\n"); | |
exit(1); | |
} | |
} | |
} | |
// End of modified part | |
done: | |
err = rd_kafka_consumer_close(rk); | |
if (err) | |
fprintf(stderr, "%% Failed to close consumer: %s\n", | |
rd_kafka_err2str(err)); | |
else | |
fprintf(stderr, "%% Consumer closed\n"); | |
rd_kafka_topic_partition_list_destroy(topics); | |
/* Destroy handle */ | |
rd_kafka_destroy(rk); | |
/* Let background threads clean up and terminate cleanly. */ | |
run = 5; | |
while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1) | |
printf("Waiting for librdkafka to decommission\n"); | |
if (run <= 0) | |
rd_kafka_dump(stdout, rk); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment