Created
March 24, 2015 01:44
-
-
Save dmitrig01/3c7e8963228f987a4c13 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.test.groups; | |
import kafka.consumer.ConsumerConfig; | |
import kafka.consumer.KafkaStream; | |
import kafka.javaapi.consumer.ConsumerConnector; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Properties; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.TimeUnit; | |
public class ConsumerGroupExample { | |
private final ConsumerConnector consumer; | |
private final String topic; | |
private ExecutorService executor; | |
public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { | |
consumer = kafka.consumer.Consumer.createJavaConsumerConnector( | |
createConsumerConfig(a_zookeeper, a_groupId)); | |
this.topic = a_topic; | |
} | |
public void shutdown() { | |
if (consumer != null) consumer.shutdown(); | |
if (executor != null) executor.shutdown(); | |
try { | |
if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { | |
System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); | |
} | |
} catch (InterruptedException e) { | |
System.out.println("Interrupted during shutdown, exiting uncleanly"); | |
} | |
} | |
public void run(int a_numThreads) { | |
Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); | |
topicCountMap.put(topic, new Integer(a_numThreads)); | |
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); | |
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); | |
// now launch all the threads | |
// | |
executor = Executors.newFixedThreadPool(a_numThreads); | |
// now create an object to consume the messages | |
// | |
int threadNumber = 0; | |
for (final KafkaStream stream : streams) { | |
executor.submit(new ConsumerTest(stream, threadNumber)); | |
threadNumber++; | |
} | |
} | |
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { | |
Properties props = new Properties(); | |
props.put("zookeeper.connect", a_zookeeper); | |
props.put("group.id", a_groupId); | |
props.put("zookeeper.session.timeout.ms", "400"); | |
props.put("zookeeper.sync.time.ms", "200"); | |
props.put("auto.commit.interval.ms", "1000"); | |
return new ConsumerConfig(props); | |
} | |
public static void main(String[] args) { | |
//String zooKeeper = args[0]; | |
//String groupId = args[1]; | |
//String topic = args[2]; | |
String zooKeeper = "54.67.1.134"; | |
String groupId = "test2"; | |
String topic = "session-open"; | |
//int threads = Integer.parseInt(args[3]); | |
int threads = 4; | |
ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic); | |
example.run(threads); | |
try { | |
Thread.sleep(10000); | |
} catch (InterruptedException ie) { | |
} | |
example.shutdown(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.test.groups; | |
import kafka.consumer.ConsumerIterator; | |
import kafka.consumer.KafkaStream; | |
public class ConsumerTest implements Runnable { | |
private KafkaStream m_stream; | |
private int m_threadNumber; | |
public ConsumerTest(KafkaStream a_stream, int a_threadNumber) { | |
m_threadNumber = a_threadNumber; | |
m_stream = a_stream; | |
} | |
public void run() { | |
ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); | |
while (it.hasNext()) | |
System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message())); | |
System.out.println("Shutting down Thread: " + m_threadNumber); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment