Created
June 19, 2014 17:16
-
-
Save jjkoshy/5c3d065161153b7b1ee3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala | |
| index c793110..d9bd99f 100644 | |
| --- a/core/src/main/scala/kafka/consumer/TopicCount.scala | |
| +++ b/core/src/main/scala/kafka/consumer/TopicCount.scala | |
| @@ -22,24 +22,15 @@ import org.I0Itec.zkclient.ZkClient | |
| import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging} | |
| import kafka.common.KafkaException | |
| +// RR TODO: we should get rid of this topiccount trait since it only makes sense for static topic count | |
| +// RR TODO: and rename this to SubscriptionConfig | |
| private[kafka] trait TopicCount { | |
| def getConsumerThreadIdsPerTopic: Map[String, Set[String]] | |
| + def getThreadIds(label: String): Set[String] | |
| def getTopicCountMap: Map[String, Int] | |
| def pattern: String | |
| - | |
| - protected def makeConsumerThreadIdsPerTopic(consumerIdString: String, | |
| - topicCountMap: Map[String, Int]) = { | |
| - val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[String]]() | |
| - for ((topic, nConsumers) <- topicCountMap) { | |
| - val consumerSet = new mutable.HashSet[String] | |
| - assert(nConsumers >= 1) | |
| - for (i <- 0 until nConsumers) | |
| - consumerSet += consumerIdString + "-" + i | |
| - consumerThreadIdsPerTopicMap.put(topic, consumerSet) | |
| - } | |
| - consumerThreadIdsPerTopicMap | |
| - } | |
| + | |
| } | |
| private[kafka] object TopicCount extends Logging { | |
| @@ -89,6 +80,24 @@ private[kafka] object TopicCount extends Logging { | |
| } | |
| } | |
| + def getConsumerThreadIdsPerTopic(topicCount: StaticTopicCount) = { | |
| + val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[String]]() | |
| + for ((topic, nConsumers) <- topicCount.topicCountMap) { | |
| + val consumerSet = new mutable.HashSet[String] | |
| + assert(nConsumers >= 1) | |
| + for (i <- 0 until nConsumers) | |
| + consumerSet += topicCount.consumerIdString + "-" + i | |
| + consumerThreadIdsPerTopicMap.put(topic, consumerSet) | |
| + } | |
| + println(consumerThreadIdsPerTopicMap) | |
| + consumerThreadIdsPerTopicMap | |
| + } | |
| + | |
| + protected def getWildcardThreadIds(topicCount: WildcardTopicCount) = { | |
| + (0 until topicCount.numStreams) | |
| + .map(threadId => "__wildcard-%s-%d".format(topicCount.consumerIdString, threadId)).toSet | |
| + } | |
| + | |
| def constructTopicCount(consumerIdString: String, topicCount: Map[String, Int]) = | |
| new StaticTopicCount(consumerIdString, topicCount) | |
| @@ -101,7 +110,9 @@ private[kafka] class StaticTopicCount(val consumerIdString: String, | |
| val topicCountMap: Map[String, Int]) | |
| extends TopicCount { | |
| - def getConsumerThreadIdsPerTopic = makeConsumerThreadIdsPerTopic(consumerIdString, topicCountMap) | |
| + def getThreadIds(label: String) = null | |
| + | |
| + def getConsumerThreadIdsPerTopic: Map[String, Set[String]] = null | |
| override def equals(obj: Any): Boolean = { | |
| obj match { | |
| @@ -117,18 +128,17 @@ private[kafka] class StaticTopicCount(val consumerIdString: String, | |
| } | |
| private[kafka] class WildcardTopicCount(zkClient: ZkClient, | |
| - consumerIdString: String, | |
| + val consumerIdString: String, | |
| topicFilter: TopicFilter, | |
| - numStreams: Int, | |
| + val numStreams: Int, | |
| excludeInternalTopics: Boolean) extends TopicCount { | |
| - def getConsumerThreadIdsPerTopic = { | |
| - val wildcardTopics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath) | |
| - .filter(topic => topicFilter.isTopicAllowed(topic, excludeInternalTopics)) | |
| - makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*)) | |
| - } | |
| def getTopicCountMap = Map(topicFilter.regex -> numStreams) | |
| + def getThreadIds(label: String) = (0 until numStreams).map(i => "%s-%s-%d".format(consumerIdString, label, i)).toSet | |
| + | |
| + def getConsumerThreadIdsPerTopic: Map[String, Set[String]] = null | |
| + | |
| def pattern: String = { | |
| topicFilter match { | |
| case wl: Whitelist => TopicCount.whiteListPattern | |
| diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | |
| index 1dde4fc..e8501b5 100644 | |
| --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | |
| +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | |
| @@ -90,7 +90,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |
| private var zkClient: ZkClient = null | |
| private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] | |
| private val checkpointedOffsets = new Pool[TopicAndPartition, Long] | |
| - private val topicThreadIdAndQueues = new Pool[(String,String), BlockingQueue[FetchedDataChunk]] | |
| + // RR TODO: can be thread-id -> queue | |
| + private val topicThreadIdAndQueues = new Pool[String /* thread id - e.g., <cid>-topicA-0 */, BlockingQueue[FetchedDataChunk]] | |
| private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-") | |
| private val messageStreamCreated = new AtomicBoolean(false) | |
| @@ -102,6 +103,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |
| private val offsetsChannelLock = new Object | |
| private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null | |
| + private var wildcardTopicFilter: TopicFilter = null | |
| // useful for tracking migration of consumers to store offsets in kafka | |
| private val kafkaCommitMeter = newMeter(config.clientId + "-KafkaCommitsPerSec", "commits", TimeUnit.SECONDS) | |
| @@ -155,6 +157,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |
| numStreams: Int, | |
| keyDecoder: Decoder[K] = new DefaultDecoder(), | |
| valueDecoder: Decoder[V] = new DefaultDecoder()) = { | |
| + wildcardTopicFilter = topicFilter | |
| val wildcardStreamsHandler = new WildcardStreamsHandler[K,V](topicFilter, numStreams, keyDecoder, valueDecoder) | |
| wildcardStreamsHandler.streams | |
| } | |
| @@ -195,6 +198,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |
| case None => | |
| } | |
| sendShutdownToAllQueues() | |
| + // RR TODO: unclear on this: i.e., we go ahead and commit offsets after sending the signal, but have not | |
| if (config.autoCommitEnable) | |
| commitOffsets() | |
| if (zkClient != null) { | |
| @@ -220,7 +224,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |
| val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap) | |
| - val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic | |
| + val topicThreadIds = TopicCount.getConsumerThreadIdsPerTopic(topicCount) | |
| // make a list of (queue,stream) pairs, one pair for each threadId | |
| val queuesAndStreams = topicThreadIds.values.map(threadIdSet => | |
| @@ -236,6 +240,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |
| registerConsumerInZK(dirs, consumerIdString, topicCount) | |
| reinitializeConsumer(topicCount, queuesAndStreams) | |
| + // RR TODO: if we remove the topic->list map from load balancer, we need to reconstruct here | |
| loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]] | |
| } | |
| @@ -254,6 +259,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |
| } | |
| private def sendShutdownToAllQueues() = { | |
| + // RR TODO: seems we need a rebalancer that returns queue for given topic/partition (comment, but won't implement) | |
| + // RR TODO: if topicThreadIdAndQueues does not apply for wildcard then need to enumerate all wildcard queues | |
| for (queue <- topicThreadIdAndQueues.values) { | |
| debug("Clearing up queue") | |
| queue.clear() | |
| @@ -512,6 +519,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |
| class ZKRebalancerListener(val group: String, val consumerIdString: String, | |
| val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]]) | |
| extends IZkChildListener { | |
| + | |
| + // RR TODO: maybe have a queueFor map here that takes topic-partition to queue and handles both wildcard and static | |
| + | |
| + val queueFor = mutable.Map[TopicAndPartition, BlockingQueue[FetchedDataChunk]]() | |
| + | |
| private var isWatcherTriggered = false | |
| private val lock = new ReentrantLock | |
| private val cond = lock.newCondition() | |
| @@ -574,6 +586,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |
| } | |
| def syncedRebalance() { | |
| + println("Rebalancing") | |
| rebalanceLock synchronized { | |
| if(isShuttingDown.get()) { | |
| return | |
| @@ -612,8 +625,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |
| } | |
| private def rebalance(cluster: Cluster): Boolean = { | |
| - val myTopicThreadIdsMap = TopicCount.constructTopicCount( | |
| - group, consumerIdString, zkClient, config.excludeInternalTopics).getConsumerThreadIdsPerTopic | |
| + val topicCount = TopicCount.constructTopicCount( | |
| + group, consumerIdString, zkClient, config.excludeInternalTopics) | |
| + val myTopicThreadIdsMap = topicCount.getConsumerThreadIdsPerTopic | |
| + // RR TODO: can change getConsumersPerTopic to be getConsumersForTopic and move down and execute per-topic | |
| val consumersPerTopicMap = getConsumersPerTopic(zkClient, group, config.excludeInternalTopics) | |
| val brokers = getAllBrokersInCluster(zkClient) | |
| if (brokers.size == 0) { | |
| @@ -641,39 +656,58 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |
| var partitionOwnershipDecision = new collection.mutable.HashMap[TopicAndPartition, String]() | |
| val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] | |
| - for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) { | |
| - currentTopicRegistry.put(topic, new Pool[Int, PartitionTopicInfo]) | |
| - | |
| - val curConsumers = consumersPerTopicMap.get(topic).get | |
| - val curPartitions: Seq[Int] = partitionsPerTopicMap.get(topic).get | |
| - | |
| - val nPartsPerConsumer = curPartitions.size / curConsumers.size | |
| - val nConsumersWithExtraPart = curPartitions.size % curConsumers.size | |
| - | |
| - info("Consumer " + consumerIdString + " rebalancing the following partitions: " + curPartitions + | |
| - " for topic " + topic + " with consumers: " + curConsumers) | |
| - | |
| - for (consumerThreadId <- consumerThreadIdSet) { | |
| - val myConsumerPosition = curConsumers.indexOf(consumerThreadId) | |
| - assert(myConsumerPosition >= 0) | |
| - val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) | |
| - val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1) | |
| - | |
| - /** | |
| - * Range-partition the sorted partitions to consumers for better locality. | |
| - * The first few consumers pick up an extra partition, if any. | |
| - */ | |
| - if (nParts <= 0) | |
| - warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic) | |
| - else { | |
| - for (i <- startPart until startPart + nParts) { | |
| - val partition = curPartitions(i) | |
| - info(consumerThreadId + " attempting to claim partition " + partition) | |
| - // record the partition ownership decision | |
| - partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId) | |
| + topicCount match { | |
| + case wildcardCount: WildcardTopicCount => | |
| + // get list of current consumers | |
| + // consumersPerTopicMap has wildcard-0, wildcard-1, wildcard-2, etc. | |
| +// val allConsumerThreadIds = wildcardCount.getThreadIds | |
| + | |
| + // get list of all partitions of all allowed topics | |
| + val allTopicMetadata = ClientUtils.fetchTopicMetadata(Set("topics"), brokers, | |
| + config.clientId, config.socketTimeoutMs).topicsMetadata | |
| + val allowedTopicsMetadata = allTopicMetadata.filter( | |
| + metadata => wildcardTopicFilter.isTopicAllowed(metadata.topic, config.excludeInternalTopics)) | |
| + | |
| +// val allowedPartitions = allowedTopicsMetadata.flatMap( | |
| +// tm => tm.partitionsMetadata.map( | |
| +// pm => TopicAndPartition(tm.topic, pm.partitionId))).sorted | |
| + | |
| + // round-robin assignment of partitions to topic-thread-ids | |
| + case staticCount: StaticTopicCount => | |
| + for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) { | |
| + currentTopicRegistry.put(topic, new Pool[Int, PartitionTopicInfo]) | |
| + | |
| + val curConsumers = consumersPerTopicMap.get(topic).get | |
| + val curPartitions: Seq[Int] = partitionsPerTopicMap.get(topic).get | |
| + | |
| + val nPartsPerConsumer = curPartitions.size / curConsumers.size | |
| + val nConsumersWithExtraPart = curPartitions.size % curConsumers.size | |
| + | |
| + info("Consumer " + consumerIdString + " rebalancing the following partitions: " + curPartitions + | |
| + " for topic " + topic + " with consumers: " + curConsumers) | |
| + | |
| + for (consumerThreadId <- consumerThreadIdSet) { | |
| + val myConsumerPosition = curConsumers.indexOf(consumerThreadId) | |
| + assert(myConsumerPosition >= 0) | |
| + val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) | |
| + val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1) | |
| + | |
| + /** | |
| + * Range-partition the sorted partitions to consumers for better locality. | |
| + * The first few consumers pick up an extra partition, if any. | |
| + */ | |
| + if (nParts <= 0) | |
| + warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic) | |
| + else { | |
| + for (i <- startPart until startPart + nParts) { | |
| + val partition = curPartitions(i) | |
| + info(consumerThreadId + " attempting to claim partition " + partition) | |
| + // record the partition ownership decision | |
| + partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId) | |
| + } | |
| + } | |
| } | |
| } | |
| - } | |
| } | |
| // fetch current offsets for all topic-partitions | |
| @@ -688,6 +722,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |
| val (topic, partition) = topicAndPartition.asTuple | |
| val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset | |
| val threadId = partitionOwnershipDecision(topicAndPartition) | |
| + // RR TODO: make sure the threadid is complete (of the form <cid>-topic-threadid | |
| addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId) | |
| } | |
| @@ -716,7 +751,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |
| fetcher match { | |
| case Some(f) => | |
| f.stopConnections | |
| - clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams) | |
| + clearFetcherQueues(queuesToBeCleared, messageStreams) | |
| info("Committing all offsets after clearing the fetcher queues") | |
| /** | |
| * here, we need to commit offsets before stopping the consumer from returning any more messages | |
| @@ -732,8 +767,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |
| } | |
| } | |
| - private def clearFetcherQueues(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster, | |
| - queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]], | |
| + private def clearFetcherQueues(queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]], | |
| messageStreams: Map[String,List[KafkaStream[_,_]]]) { | |
| // Clear all but the currently iterated upon chunk in the consumer thread's queue | |
| @@ -752,6 +786,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |
| relevantTopicThreadIdsMap: Map[String, Set[String]]) { | |
| // only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer | |
| // after this rebalancing attempt | |
| + // RR TODO: topicthreadidandqueues not needed - will clear all queues off messagestreams | |
| val queuesTobeCleared = topicThreadIdAndQueues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q => q._2) | |
| closeFetchersForQueues(cluster, messageStreams, queuesTobeCleared) | |
| } | |
| @@ -802,12 +837,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |
| else true | |
| } | |
| + // RR TODO: this should not take topic-threadid. just the queue; or fully qualified threadid | |
| private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]], | |
| partition: Int, topic: String, | |
| offset: Long, consumerThreadId: String) { | |
| val partTopicInfoMap = currentTopicRegistry.get(topic) | |
| - val queue = topicThreadIdAndQueues.get((topic, consumerThreadId)) | |
| + val queue = topicThreadIdAndQueues.get(consumerThreadId) | |
| val consumedOffset = new AtomicLong(offset) | |
| val fetchedOffset = new AtomicLong(offset) | |
| val partTopicInfo = new PartitionTopicInfo(topic, | |
| @@ -827,13 +863,16 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |
| topicCount: TopicCount, | |
| queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) { | |
| + println("reinitializing") | |
| + | |
| + // RR TODO: most of these can actually move out to a once-only initialization | |
| + // The only thing that really needs to be "re-initialized" is the partition change listener | |
| val dirs = new ZKGroupDirs(config.groupId) | |
| - // listener to consumer and partition changes | |
| + // listener for consumer and partition changes | |
| if (loadBalancerListener == null) { | |
| - val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]] | |
| - loadBalancerListener = new ZKRebalancerListener( | |
| - config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]]) | |
| + val streams = queuesAndStreams.map(_._2) | |
| + loadBalancerListener = new ZKRebalancerListener(config.groupId, consumerIdString, streams) | |
| } | |
| // create listener for session expired event if not exist yet | |
| @@ -848,9 +887,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |
| val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams | |
| // map of {topic -> Set(thread-1, thread-2, ...)} | |
| + // RR TODO: this can be commented | |
| val consumerThreadIdsPerTopic: Map[String, Set[String]] = | |
| topicCount.getConsumerThreadIdsPerTopic | |
| + // RR TODO: this can be commented | |
| val allQueuesAndStreams = topicCount match { | |
| case wildTopicCount: WildcardTopicCount => | |
| /* | |
| @@ -898,6 +939,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |
| zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener) | |
| + // RR TODO: if we go with the wildcard changes then this will need to be revisited. i.e., we need to have the | |
| + // list of topics to add watchers explicitly | |
| topicStreamsMap.foreach { topicAndStreams => | |
| // register on broker partition path changes | |
| val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1 | |
| diff --git a/core/src/main/scala/kafka/utils/Annotations_2.8.scala b/core/src/main/scala/kafka/utils/Annotations_2.8.scala | |
| deleted file mode 100644 | |
| index 28269eb..0000000 | |
| --- a/core/src/main/scala/kafka/utils/Annotations_2.8.scala | |
| +++ /dev/null | |
| @@ -1,36 +0,0 @@ | |
| -/** | |
| - * Licensed to the Apache Software Foundation (ASF) under one or more | |
| - * contributor license agreements. See the NOTICE file distributed with | |
| - * this work for additional information regarding copyright ownership. | |
| - * The ASF licenses this file to You under the Apache License, Version 2.0 | |
| - * (the "License"); you may not use this file except in compliance with | |
| - * the License. You may obtain a copy of the License at | |
| - * | |
| - * http://www.apache.org/licenses/LICENSE-2.0 | |
| - * | |
| - * Unless required by applicable law or agreed to in writing, software | |
| - * distributed under the License is distributed on an "AS IS" BASIS, | |
| - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| - * See the License for the specific language governing permissions and | |
| - * limitations under the License. | |
| - */ | |
| - | |
| -package kafka.utils | |
| - | |
| -/* Some helpful annotations */ | |
| - | |
| -/** | |
| - * Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation | |
| - * must respect | |
| - */ | |
| -class threadsafe extends StaticAnnotation | |
| - | |
| -/** | |
| - * Indicates that the annotated class is not threadsafe | |
| - */ | |
| -class nonthreadsafe extends StaticAnnotation | |
| - | |
| -/** | |
| - * Indicates that the annotated class is immutable | |
| - */ | |
| -class immutable extends StaticAnnotation | |
| diff --git a/core/src/test/scala/unit/kafka/consumer/RebalanceTest.scala b/core/src/test/scala/unit/kafka/consumer/RebalanceTest.scala | |
| new file mode 100644 | |
| index 0000000..f5347de | |
| --- /dev/null | |
| +++ b/core/src/test/scala/unit/kafka/consumer/RebalanceTest.scala | |
| @@ -0,0 +1,67 @@ | |
| +package kafka.consumer | |
| + | |
| +import junit.framework.Assert._ | |
| +import org.junit.Test | |
| +import scala.collection._ | |
| +import org.junit.runner.RunWith | |
| +import org.junit.runners.Parameterized | |
| +import org.junit.runners.Parameterized.Parameters | |
| +import java.util | |
| +import kafka.consumer.RebalanceTest.ConsumerScenario | |
| + | |
| + | |
| +@RunWith(value = classOf[Parameterized]) | |
| +class RebalanceTest(scenario: ConsumerScenario) { | |
| + | |
| + @Test | |
| + def testRoundRobinRebalance() { | |
| + val consumerIds = (0 until scenario.numConsumers).map("id" + _) | |
| + val topicCounts = | |
| + Map(consumerIds.map(id => (id, new WildcardTopicCount(null, id, new Whitelist(".*"), scenario.numStreams, true))):_*) | |
| + val ownershipMap = mutable.Map[Int, String]() | |
| + | |
| + val expectedMinOwnerCount = scenario.numPartitions / (scenario.numConsumers * scenario.numStreams) | |
| + val expectedMaxOwnerCount = expectedMinOwnerCount + 1 | |
| + | |
| + // process one consumer at a time | |
| + (0 until scenario.numConsumers).map(consumerIdx => { | |
| + val id = consumerIds(consumerIdx) | |
| + val topicCount = topicCounts.get(id).get | |
| + val threadIds = topicCount.getThreadIds("wildcard").toIndexedSeq.sorted | |
| + val numThreads = threadIds.size | |
| + | |
| + val threadStep = scenario.numStreams * scenario.numConsumers | |
| + | |
| + // within each consumer, process one thread at a time | |
| + (0 until numThreads).foreach(threadIdx => { | |
| + var ownerCount = 0 | |
| + var partitionIdx = consumerIdx + threadIdx * scenario.numConsumers | |
| + while (partitionIdx < scenario.numPartitions) { | |
| + assertTrue("Unique owner constraint failed for partition " + partitionIdx, ownershipMap.get(partitionIdx).isEmpty) | |
| + ownerCount += 1 | |
| + ownershipMap.put(partitionIdx, threadIds(threadIdx)) | |
| + partitionIdx += threadStep | |
| + } | |
| + // check that round-robin partitioning yields even layout | |
| + assertTrue("Balanced ownership constraint failed", ownerCount == expectedMinOwnerCount || ownerCount == expectedMaxOwnerCount) | |
| + }) | |
| + }) | |
| + | |
| + assertTrue("Full coverage constraint failed", ownershipMap.size == scenario.numPartitions) | |
| + } | |
| + | |
| +} | |
| + | |
| +object RebalanceTest { | |
| + final case class ConsumerScenario(numConsumers: Int, numStreams: Int, numPartitions: Int) | |
| + @Parameters def parameters: java.util.Collection[Array[ConsumerScenario]] = { | |
| + val params = new util.ArrayList[Array[ConsumerScenario]]() | |
| + params.add(Array(ConsumerScenario(numConsumers = 3, numStreams = 3, numPartitions = 10))) | |
| + params.add(Array(ConsumerScenario(numConsumers = 3, numStreams = 2, numPartitions = 12))) | |
| + params.add(Array(ConsumerScenario(numConsumers = 1, numStreams = 2, numPartitions = 0))) | |
| + params.add(Array(ConsumerScenario(numConsumers = 10, numStreams = 2, numPartitions = 2))) | |
| + params.add(Array(ConsumerScenario(numConsumers = 16, numStreams = 3, numPartitions = 2045))) | |
| + // can also generate some random configurations | |
| + params | |
| + } | |
| +} | |
| \ No newline at end of file | |
| diff --git a/gradle.properties b/gradle.properties | |
| index 4827769..236e243 100644 | |
| --- a/gradle.properties | |
| +++ b/gradle.properties | |
| @@ -15,7 +15,7 @@ | |
| group=org.apache.kafka | |
| version=0.8.1 | |
| -scalaVersion=2.8.0 | |
| +scalaVersion=2.9.2 | |
| task=build | |
| mavenUrl= |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment