This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from confluent_kafka import Consumer, KafkaException, KafkaError, OFFSET_END | |
import sys | |
if __name__ == '__main__': | |
broker = "localhost:51895" | |
topics = ["test"] | |
group = "mygroup" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* @brief Buffer segment | |
*/ | |
typedef struct rd_segment_s { | |
TAILQ_ENTRY(rd_segment_s) seg_link; /*<< rbuf_segments Link */ | |
char *seg_p; /**< Backing-store memory */ | |
size_t seg_of; /**< Current relative write-position | |
* (length of payload in this segment) */ | |
size_t seg_size; /**< Allocated size of seg_p */ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Example function-based high-level Apache Kafka consumer | |
package main | |
/** | |
* Copyright 2016 Confluent Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
c, err := kafka.NewConsumer(&kafka.ConfigMap{ | |
"bootstrap.servers": broker, | |
"group.id": group, | |
"go.events.channel.enable": true, | |
"go.application.rebalance.enable": true, | |
"security.protocol": "ssl", | |
"ssl.ca.location": "/path/to/ca-cert-file", | |
"ssl.certificate.location": "/path/to/client.pem", | |
"ssl.key.location": "/path/to/client.key", | |
//"ssl.key.password": "maybe_a_password_if_needed", |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/*! | |
* Partition (int32_t) | |
*/ | |
#define RD_KAFKA_V_PARTITION(partition) \ | |
_LRK_TYPECHECK(RD_KAFKA_VTYPE_PARTITION, int32_t, partition), \ | |
(int32_t)partition | |
/*! | |
* Message value/payload pointer and length (void *, size_t) | |
*/ | |
#define RD_KAFKA_V_VALUE(VALUE,LEN) \ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
If your processing rate is high this might not be optimal since you'll be committing for each message, even if using async commits there's some degree of performance penalty. | |
So another alternative is to keep `enable.auto.commit` set to True (default) but disable the automatic offset store. | |
So what is the offset store? | |
Each time a message is passed from the client to your application its offset is stored for future commit, the next intervalled commit will then use this stored offset. If the stored offset did not change from the last commit nothing happens. | |
So by setting `enable.auto.offset.store` to False you keep the convenient intervalled auto commit behaviour but you control what offsets are actually eligible for commit. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#define _LRK_TYPECHECK(RET,TYPE,ARG) \ | |
({ \ | |
if (0) { \ | |
TYPE __t RD_UNUSED = (ARG); \ | |
} \ | |
RET; \ | |
}) | |
#define _LRK_TYPECHECK2(RET,TYPE,ARG,TYPE2,ARG2) \ | |
({ \ | |
if (0) { \ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* @brief Test sockets | |
* | |
* By plugging in to librdkafka's socket_cb and connect_cb the test framework | |
* adds an interim socket between the socket exposed to librdkafka (tsk_fd) | |
* and the socket connecting to the broker (tsk_intfd). | |
* A thread is created to pass data between the two sockets according | |
* to test parameters. | |
* This allows the following network simulations: | |
* - connection close (abrupt or timeout) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
add_str_rep (const char *typestr, const char *rep (void *obj)) { | |
LIST_INSERT..(&str_reps, {typestr, rep}); | |
} | |
const char *to_str (const char *typestr, void *obj) { | |
str_rep = LIST_FIND(&str_reps, trim_stuff(typestr); /* remove "const", "*", etc.. */)); | |
if (!str_rep) | |
return rsprintf("(%s)%p", typestr, obj); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
rd_ts_t r = 1234; | |
if (rkb->rkb_rk->rk_conf.api_version_request && | |
(r = rd_interval(&rkb->rkb_ApiVersion_fail_intvl, 0, 0)) > 0) { | |
/* Use ApiVersion to query broker for supported API versions. */ | |
rd_rkb_dbg(rkb, PROTOCOL, "X", "Enabling ApiVersion"); | |
rd_kafka_broker_feature_enable(rkb, RD_KAFKA_FEATURE_APIVERSION); | |
} else { | |
rd_rkb_dbg(rkb, PROTOCOL, "X", "Not enabling: %"PRId64, r); | |
} |
NewerOlder