Skip to content

Instantly share code, notes, and snippets.

@tf0054
Created May 6, 2013 15:29
Show Gist options
  • Select an option

  • Save tf0054/5525847 to your computer and use it in GitHub Desktop.

Select an option

Save tf0054/5525847 to your computer and use it in GitHub Desktop.
package com.digipepper.test.kafka.consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.Message;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.api.OffsetRequest;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConsumerExample {
private static ConsumerConnector consumer;
public static void main(String[] args) {
String zooKeeper = args[0];
String groupId = args[1];
String topic = args[2];
int threads = Integer.parseInt(args[3]);
// Create the connection to the cluster
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig( zooKeeper, groupId));
// create 4 partitions of the stream for topic, to allow 4 threads to consume
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, threads);
Map<String, List<KafkaStream<Message>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<Message>> streams = consumerMap.get(topic);
// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(4);
// consume the messages in the threads
for (final KafkaStream<Message> stream: streams) {
executor.submit(new Runnable() {
public void run() {
ConsumerIterator<Message> it = stream.iterator();
while (it.hasNext())
System.out.println("Msg: " + getMessage(it.next().message()));
}
}
);
}
}
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zk.connect", a_zookeeper);
props.put("groupid", a_groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("autooffset.reset", OffsetRequest.LargestTimeString());
System.out.println("LargestTimeString:"+OffsetRequest.LargestTimeString());
return new ConsumerConfig(props);
}
private static String getMessage(Message message) {
ByteBuffer buffer = message.payload();
byte [] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return new String(bytes);
}
}
@tf0054
Copy link
Copy Markdown
Author

tf0054 commented May 6, 2013

Kafkaのバージョンはv0.7.2でやってます。

@tf0054
Copy link
Copy Markdown
Author

tf0054 commented May 6, 2013

なお、複数のGroup名を使ってしまうと、Group名Aで読んだ位置(読み進んだ位置)と、Gorup名Bで読んだ位置が異なってしまい、AがBより進んだ後にBが接続すると、Bにはその差分が、接続時に流れ込んでくることになります。

@johtani
Copy link
Copy Markdown

johtani commented May 7, 2013

コメント書いたらメールとか飛ぶんですかね?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment