Last active
August 29, 2015 14:05
-
-
Save samuelgmartinez/1b3588b739e60db3d492 to your computer and use it in GitHub Desktop.
Dummy Consumers rebalancing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.testing.kafka; | |
import com.google.common.collect.ImmutableMap; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Properties; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import kafka.consumer.Consumer; | |
import kafka.consumer.ConsumerConfig; | |
import kafka.consumer.ConsumerIterator; | |
import kafka.consumer.KafkaStream; | |
import kafka.javaapi.consumer.ConsumerConnector; | |
import kafka.message.MessageAndMetadata; | |
import org.apache.log4j.Logger; | |
import org.junit.Assert; | |
import org.junit.Test; | |
/** | |
* | |
* @author samuelgmartinez | |
*/ | |
public class ConsumerRebalanceTest { | |
private static final Logger log = Logger.getLogger(ConsumerRebalanceTest.class); | |
@Test | |
public void testRebalance() throws Exception { | |
int consumers = 2; | |
int tpoolSize = 2; | |
ExecutorService executor = Executors.newFixedThreadPool(tpoolSize * consumers); | |
List<Future<Result>> futures = new ArrayList<Future<Result>>(tpoolSize * consumers); | |
List<ConsumerConnector> connectors = new ArrayList<ConsumerConnector>(tpoolSize * consumers); | |
AtomicBoolean running = new AtomicBoolean(true); | |
for (int c = 1; c <= consumers; c++) { | |
// specify some consumer properties | |
Properties props = new Properties(); | |
props.put("zookeeper.connect", "localhost:2181"); | |
props.put("group.id", "mygroup2"); | |
//Just for testing | |
props.put("consumer.id", "instance" + c); | |
ConsumerConfig consumerConfig = new ConsumerConfig(props); | |
ConsumerConnector connector = Consumer.createJavaConsumerConnector(consumerConfig); | |
connectors.add(connector); | |
Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams; | |
topicMessageStreams = | |
connector.createMessageStreams(ImmutableMap.of("multi1", new Integer(tpoolSize))); | |
for (int i = 1; i <= tpoolSize; i++) { | |
futures.add(executor.submit( | |
new MyConsumer("instance-" + c + "-" + i, topicMessageStreams.get("multi1").get(i - 1), running))); | |
} | |
//to be able to sleep here, the topic must have a bunch of consumable msgs | |
Thread.sleep(2000); | |
} | |
Thread.sleep(10000); | |
log.info("shutting down..."); | |
running.set(false); | |
//test | |
for (ConsumerConnector connector : connectors) { | |
connector.commitOffsets(); | |
connector.shutdown(); | |
} | |
executor.shutdown(); | |
Map<String, Result> results = new HashMap<String, Result>(consumers * tpoolSize); | |
for (Future<Result> future : futures) { | |
Result result = future.get(); | |
results.put(result.id, result); | |
log.info(result.id + " consumed " + result.count); | |
} | |
//this must be true | |
Assert.assertTrue(results.get("instance-1-1").count > 0); | |
Assert.assertTrue(results.get("instance-1-2").count > 0); | |
//if the rebalance worked correctly, this consumer instance would have consumed a bunch of msgs | |
Assert.assertTrue(results.get("instance-2-1").count > 0 || results.get("instance-2-2").count > 0); | |
} | |
public static class MyConsumer implements Callable<Result> { | |
private final KafkaStream<byte[], byte[]> stream; | |
private final AtomicBoolean running; | |
private final String id; | |
public MyConsumer(String id, KafkaStream<byte[], byte[]> stream, AtomicBoolean running) { | |
this.stream = stream; | |
this.id = id; | |
this.running = running; | |
} | |
@Override | |
public Result call() throws Exception { | |
int i = 0; | |
log.trace(Thread.currentThread().getName() + " - " + this.id + " - starting to consume msgs"); | |
ConsumerIterator<byte[], byte[]> it = stream.iterator(); | |
while(running.get() && it.hasNext()) { | |
MessageAndMetadata<byte[], byte[]> msg = it.next(); | |
i++; | |
if (i % 200 == 0) { | |
log.info(Thread.currentThread().getName() | |
+ " - " + this.id + " - " + i + " msgs consumed"); | |
} | |
// log.info(Thread.currentThread().getName() | |
// + " - " + this.id + " - " + new String(msg.message())); | |
Thread.sleep(10); | |
} | |
log.trace(Thread.currentThread().getName() + " - " + this.id + " is over"); | |
return new Result(id, i); | |
} | |
} | |
private static class Result { | |
String id; | |
int count; | |
public Result(String id, int count) { | |
this.id = id; | |
this.count = count; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment