Skip to content

Instantly share code, notes, and snippets.

@13h3r
Created March 14, 2016 15:13
Show Gist options
  • Save 13h3r/42633bcd64b80ddffe6b to your computer and use it in GitHub Desktop.
Save 13h3r/42633bcd64b80ddffe6b to your computer and use it in GitHub Desktop.
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import java.util.*;
public class KafkaWTF2 {
public static void main(String[] args) throws InterruptedException {
KafkaConsumer<byte[], byte[]> consumer = getConsumer();
ArrayList<String> topics = new ArrayList<>();
topics.add("test.wtf2");
consumer.subscribe(topics);
List<ConsumerRecord<byte[], byte[]>> toCommit = new ArrayList<>();
do {
ConsumerRecords<byte[], byte[]> msgs = consumer.poll(1000);
for (ConsumerRecord<byte[], byte[]> msg : msgs) {
toCommit.add(msg);
}
} while(toCommit.size() < 10);
for (ConsumerRecord<byte[], byte[]> msg : toCommit.subList(0, 10)) {
Thread.sleep(100);
System.out.println("Committing offset " + msg.offset());
Map<TopicPartition, OffsetAndMetadata> commitMap = new HashMap<>();
commitMap.put(new TopicPartition(msg.topic(), msg.partition()), new OffsetAndMetadata(msg.offset()));
consumer.commitAsync(commitMap, (offsets, exception) -> {
long offset = offsets.values().iterator().next().offset();
System.out.println("Offset " + offset + " confirmed. Result - " + exception);
});
}
Thread.sleep(100);
System.out.println("Poll!");
consumer.poll(100);
Thread.sleep(100000);
}
private static KafkaConsumer<byte[], byte[]> getConsumer() {
Properties props = new Properties();
props.put("auto.offset.reset", "earliest");
props.put("group.id", "wtf3");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaConsumer<>(
props,
new ByteArrayDeserializer(),
new ByteArrayDeserializer()
);
}
}
Committing offset 2
Committing offset 3
Committing offset 4
Committing offset 5
Committing offset 6
Committing offset 7
Committing offset 8
Committing offset 9
Committing offset 10
Committing offset 11
Poll!
Offset 4 confirmed. Result - org.apache.kafka.clients.consumer.internals.SendFailedException
Offset 5 confirmed. Result - org.apache.kafka.clients.consumer.internals.SendFailedException
Offset 6 confirmed. Result - org.apache.kafka.clients.consumer.internals.SendFailedException
Offset 7 confirmed. Result - org.apache.kafka.clients.consumer.internals.SendFailedException
Offset 8 confirmed. Result - org.apache.kafka.clients.consumer.internals.SendFailedException
Offset 9 confirmed. Result - org.apache.kafka.clients.consumer.internals.SendFailedException
Offset 10 confirmed. Result - org.apache.kafka.clients.consumer.internals.SendFailedException
Offset 11 confirmed. Result - org.apache.kafka.clients.consumer.internals.SendFailedException
Offset 2 confirmed. Result - null
Offset 3 confirmed. Result - null
@gzoller
Copy link

gzoller commented Apr 15, 2016

Did you ever solve this problem? I think I'm having the exact same issue--some of my commitAsync() calls work fine and others get SendFailedException and I don't know why. I'm using kafka 0.9.0.1.

@gianluca-sabena
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment