Skip to content

Instantly share code, notes, and snippets.

@ept
Created April 15, 2015 15:50
Show Gist options
  • Save ept/a7d6edc2e46f896b03a6 to your computer and use it in GitHub Desktop.
Save ept/a7d6edc2e46f896b03a6 to your computer and use it in GitHub Desktop.
#include <librdkafka/rdkafka.h>
#include <string.h>
#include <stdio.h>
/* If this is zero, the message is produced correctly. Other values cause problems. */
#define LENGTH_FOR_NULL_KEY 5
#define ERROR_LEN 512
#define TOPIC "foo"
#define MSG_KEY "hello"
int main(int argc, char **argv) {
char error[ERROR_LEN];
rd_kafka_t *kafka = rd_kafka_new(RD_KAFKA_PRODUCER, rd_kafka_conf_new(), error, ERROR_LEN);
if (!kafka) {
fprintf(stderr, "Cannot create Kafka producer\n");
return 1;
}
if (rd_kafka_brokers_add(kafka, "localhost:9092") == 0) {
fprintf(stderr, "Cannot add broker\n");
return 1;
}
rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new();
rd_kafka_topic_t *topic = rd_kafka_topic_new(kafka, TOPIC, topic_conf);
if (!topic) {
fprintf(stderr, "Cannot create topic\n");
return 1;
}
int err = rd_kafka_produce(topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_FREE,
NULL, LENGTH_FOR_NULL_KEY, strdup(MSG_KEY), strlen(MSG_KEY), NULL);
if (err != 0) {
fprintf(stderr, "Error %d producing message\n", err);
return 1;
}
while (rd_kafka_outq_len(kafka) > 0) rd_kafka_poll(kafka, 100);
rd_kafka_topic_destroy(topic);
rd_kafka_destroy(kafka);
rd_kafka_wait_destroyed(2000);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment