Skip to content

Instantly share code, notes, and snippets.

@darionyaphet
Last active August 29, 2015 14:02
Show Gist options
  • Save darionyaphet/ab9586c31cf158b6e958 to your computer and use it in GitHub Desktop.
Save darionyaphet/ab9586c31cf158b6e958 to your computer and use it in GitHub Desktop.
ComsumerMultPartitionExamples.java
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
public class ComsumerMultPartitionExamples {
final static String topic = "topic.mult.partitions";
final static int partition = 1;
final static long maxReads = 1024;
// static List<String> host = Lists.newArrayList("", "");
static long readOffset = 5;
static final int fetchSize = 1024 * 128;
public static void main(String[] args) throws UnsupportedEncodingException {
String clientID = "client-" + topic + "-" + partition;
SimpleConsumer consumer = new SimpleConsumer("127.0.0.1", 9092, 100000,
64 * 1024, clientID);
FetchRequest request = new FetchRequestBuilder().clientId(clientID)
.addFetch(topic, partition, readOffset, fetchSize).build();
FetchResponse response = consumer.fetch(request);
for (MessageAndOffset messageAndOffset : response.messageSet(topic,
partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffset) {
System.out.println("Found an old offset: " + currentOffset
+ " Expecting: " + readOffset);
continue;
}
readOffset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(String.valueOf(messageAndOffset.offset()) + ": "
+ new String(bytes, "UTF-8"));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment