Created
April 16, 2015 16:15
-
-
Save mostlylikeable/a2ce9d13275af4b055a9 to your computer and use it in GitHub Desktop.
Kafka Consumer Test - implements kafka examples for testing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| @Grab("org.apache.kafka:kafka_2.10:0.8.1") | |
| @GrabExclude("com.sun.jmx:jmxri") | |
| @GrabExclude("javax.jms:jms") | |
| @GrabExclude("com.sun.jdmk:jmxtools") | |
| @GrabExclude("junit:junit") | |
| import kafka.consumer.ConsumerConfig; | |
| import kafka.consumer.ConsumerIterator; | |
| import kafka.consumer.KafkaStream; | |
| import kafka.javaapi.consumer.ConsumerConnector; | |
| import java.util.HashMap; | |
| import java.util.List; | |
| import java.util.Map; | |
| import java.util.Properties; | |
| import java.util.concurrent.* | |
| /** | |
| * Kafka examples taken from website. This allows simple testing of kafka consumer configuration. | |
| */ | |
| // zookeeper-server-start.sh /usr/local/Cellar/kafka/0.8.1.1/libexec/config/zookeeper.properties | |
| // kafka-server-start.sh /usr/local/Cellar/kafka/0.8.1.1/libexec/config/server.properties | |
| // kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_topic | |
| // kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic | |
| String zooKeeper = "localhost" | |
| String groupId = "g2" | |
| String topic = "test_topic" | |
| int threads = 1 | |
| ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic); | |
| example.run(threads); | |
| try { | |
| Thread.sleep(10000); | |
| } catch (InterruptedException ie) { | |
| } | |
| example.shutdown(); | |
| println 'done' | |
| public class ConsumerTest implements Runnable { | |
| private KafkaStream m_stream; | |
| private int m_threadNumber; | |
| public ConsumerTest(KafkaStream a_stream, int a_threadNumber) { | |
| m_threadNumber = a_threadNumber; | |
| m_stream = a_stream; | |
| } | |
| public void run() { | |
| ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); | |
| while (it.hasNext()) | |
| System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message())); | |
| System.out.println("Shutting down Thread: " + m_threadNumber); | |
| } | |
| } | |
| public class ConsumerGroupExample { | |
| private final ConsumerConnector consumer; | |
| private final String topic; | |
| private ExecutorService executor; | |
| public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { | |
| consumer = kafka.consumer.Consumer.createJavaConsumerConnector( | |
| createConsumerConfig(a_zookeeper, a_groupId)); | |
| this.topic = a_topic; | |
| } | |
| public void shutdown() { | |
| if (consumer != null) consumer.shutdown(); | |
| if (executor != null) executor.shutdown(); | |
| try { | |
| if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { | |
| System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); | |
| } | |
| } catch (InterruptedException e) { | |
| System.out.println("Interrupted during shutdown, exiting uncleanly"); | |
| } | |
| } | |
| public void run(int a_numThreads) { | |
| Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); | |
| topicCountMap.put(topic, new Integer(a_numThreads)); | |
| Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); | |
| List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); | |
| // now launch all the threads | |
| // | |
| executor = Executors.newFixedThreadPool(a_numThreads); | |
| // now create an object to consume the messages | |
| // | |
| int threadNumber = 0; | |
| for (final KafkaStream stream : streams) { | |
| executor.submit(new ConsumerTest(stream, threadNumber)); | |
| threadNumber++; | |
| } | |
| } | |
| private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { | |
| Properties props = new Properties(); | |
| props.put("zookeeper.connect", a_zookeeper); | |
| props.put("group.id", a_groupId); | |
| props.put("zookeeper.session.timeout.ms", "400"); | |
| props.put("zookeeper.sync.time.ms", "200"); | |
| props.put("auto.commit.interval.ms", "1000"); | |
| props.put("auto.offset.reset", "smallest") | |
| return new ConsumerConfig(props); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment