Skip to content

Instantly share code, notes, and snippets.

@jjkoshy
Created June 19, 2014 17:16
Show Gist options
  • Save jjkoshy/5c3d065161153b7b1ee3 to your computer and use it in GitHub Desktop.
Save jjkoshy/5c3d065161153b7b1ee3 to your computer and use it in GitHub Desktop.
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index c793110..d9bd99f 100644
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -22,24 +22,15 @@ import org.I0Itec.zkclient.ZkClient
import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging}
import kafka.common.KafkaException
+// RR TODO: we should get rid of this topiccount trait since it only makes sense for static topic count
+// RR TODO: and rename this to SubscriptionConfig
private[kafka] trait TopicCount {
def getConsumerThreadIdsPerTopic: Map[String, Set[String]]
+ def getThreadIds(label: String): Set[String]
def getTopicCountMap: Map[String, Int]
def pattern: String
-
- protected def makeConsumerThreadIdsPerTopic(consumerIdString: String,
- topicCountMap: Map[String, Int]) = {
- val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[String]]()
- for ((topic, nConsumers) <- topicCountMap) {
- val consumerSet = new mutable.HashSet[String]
- assert(nConsumers >= 1)
- for (i <- 0 until nConsumers)
- consumerSet += consumerIdString + "-" + i
- consumerThreadIdsPerTopicMap.put(topic, consumerSet)
- }
- consumerThreadIdsPerTopicMap
- }
+
}
private[kafka] object TopicCount extends Logging {
@@ -89,6 +80,24 @@ private[kafka] object TopicCount extends Logging {
}
}
+ def getConsumerThreadIdsPerTopic(topicCount: StaticTopicCount) = {
+ val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[String]]()
+ for ((topic, nConsumers) <- topicCount.topicCountMap) {
+ val consumerSet = new mutable.HashSet[String]
+ assert(nConsumers >= 1)
+ for (i <- 0 until nConsumers)
+ consumerSet += topicCount.consumerIdString + "-" + i
+ consumerThreadIdsPerTopicMap.put(topic, consumerSet)
+ }
+ println(consumerThreadIdsPerTopicMap)
+ consumerThreadIdsPerTopicMap
+ }
+
+ protected def getWildcardThreadIds(topicCount: WildcardTopicCount) = {
+ (0 until topicCount.numStreams)
+ .map(threadId => "__wildcard-%s-%d".format(topicCount.consumerIdString, threadId)).toSet
+ }
+
def constructTopicCount(consumerIdString: String, topicCount: Map[String, Int]) =
new StaticTopicCount(consumerIdString, topicCount)
@@ -101,7 +110,9 @@ private[kafka] class StaticTopicCount(val consumerIdString: String,
val topicCountMap: Map[String, Int])
extends TopicCount {
- def getConsumerThreadIdsPerTopic = makeConsumerThreadIdsPerTopic(consumerIdString, topicCountMap)
+ def getThreadIds(label: String) = null
+
+ def getConsumerThreadIdsPerTopic: Map[String, Set[String]] = null
override def equals(obj: Any): Boolean = {
obj match {
@@ -117,18 +128,17 @@ private[kafka] class StaticTopicCount(val consumerIdString: String,
}
private[kafka] class WildcardTopicCount(zkClient: ZkClient,
- consumerIdString: String,
+ val consumerIdString: String,
topicFilter: TopicFilter,
- numStreams: Int,
+ val numStreams: Int,
excludeInternalTopics: Boolean) extends TopicCount {
- def getConsumerThreadIdsPerTopic = {
- val wildcardTopics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath)
- .filter(topic => topicFilter.isTopicAllowed(topic, excludeInternalTopics))
- makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*))
- }
def getTopicCountMap = Map(topicFilter.regex -> numStreams)
+ def getThreadIds(label: String) = (0 until numStreams).map(i => "%s-%s-%d".format(consumerIdString, label, i)).toSet
+
+ def getConsumerThreadIdsPerTopic: Map[String, Set[String]] = null
+
def pattern: String = {
topicFilter match {
case wl: Whitelist => TopicCount.whiteListPattern
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 1dde4fc..e8501b5 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -90,7 +90,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private var zkClient: ZkClient = null
private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
private val checkpointedOffsets = new Pool[TopicAndPartition, Long]
- private val topicThreadIdAndQueues = new Pool[(String,String), BlockingQueue[FetchedDataChunk]]
+ // RR TODO: can be thread-id -> queue
+ private val topicThreadIdAndQueues = new Pool[String /* thread id - e.g., <cid>-topicA-0 */, BlockingQueue[FetchedDataChunk]]
private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-")
private val messageStreamCreated = new AtomicBoolean(false)
@@ -102,6 +103,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private val offsetsChannelLock = new Object
private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
+ private var wildcardTopicFilter: TopicFilter = null
// useful for tracking migration of consumers to store offsets in kafka
private val kafkaCommitMeter = newMeter(config.clientId + "-KafkaCommitsPerSec", "commits", TimeUnit.SECONDS)
@@ -155,6 +157,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
numStreams: Int,
keyDecoder: Decoder[K] = new DefaultDecoder(),
valueDecoder: Decoder[V] = new DefaultDecoder()) = {
+ wildcardTopicFilter = topicFilter
val wildcardStreamsHandler = new WildcardStreamsHandler[K,V](topicFilter, numStreams, keyDecoder, valueDecoder)
wildcardStreamsHandler.streams
}
@@ -195,6 +198,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
case None =>
}
sendShutdownToAllQueues()
+ // RR TODO: unclear on this: i.e., we go ahead and commit offsets after sending the signal, but have not
if (config.autoCommitEnable)
commitOffsets()
if (zkClient != null) {
@@ -220,7 +224,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap)
- val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic
+ val topicThreadIds = TopicCount.getConsumerThreadIdsPerTopic(topicCount)
// make a list of (queue,stream) pairs, one pair for each threadId
val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
@@ -236,6 +240,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
registerConsumerInZK(dirs, consumerIdString, topicCount)
reinitializeConsumer(topicCount, queuesAndStreams)
+ // RR TODO: if we remove the topic->list map from load balancer, we need to reconstruct here
loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]]
}
@@ -254,6 +259,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
private def sendShutdownToAllQueues() = {
+ // RR TODO: seems we need a rebalancer that returns queue for given topic/partition (comment, but won't implement)
+ // RR TODO: if topicThreadIdAndQueues does not apply for wildcard then need to enumerate all wildcard queues
for (queue <- topicThreadIdAndQueues.values) {
debug("Clearing up queue")
queue.clear()
@@ -512,6 +519,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
class ZKRebalancerListener(val group: String, val consumerIdString: String,
val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]])
extends IZkChildListener {
+
+ // RR TODO: maybe have a queueFor map here that takes topic-partition to queue and handles both wildcard and static
+
+ val queueFor = mutable.Map[TopicAndPartition, BlockingQueue[FetchedDataChunk]]()
+
private var isWatcherTriggered = false
private val lock = new ReentrantLock
private val cond = lock.newCondition()
@@ -574,6 +586,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
def syncedRebalance() {
+ println("Rebalancing")
rebalanceLock synchronized {
if(isShuttingDown.get()) {
return
@@ -612,8 +625,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
private def rebalance(cluster: Cluster): Boolean = {
- val myTopicThreadIdsMap = TopicCount.constructTopicCount(
- group, consumerIdString, zkClient, config.excludeInternalTopics).getConsumerThreadIdsPerTopic
+ val topicCount = TopicCount.constructTopicCount(
+ group, consumerIdString, zkClient, config.excludeInternalTopics)
+ val myTopicThreadIdsMap = topicCount.getConsumerThreadIdsPerTopic
+ // RR TODO: can change getConsumersPerTopic to be getConsumersForTopic and move down and execute per-topic
val consumersPerTopicMap = getConsumersPerTopic(zkClient, group, config.excludeInternalTopics)
val brokers = getAllBrokersInCluster(zkClient)
if (brokers.size == 0) {
@@ -641,39 +656,58 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
var partitionOwnershipDecision = new collection.mutable.HashMap[TopicAndPartition, String]()
val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
- for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) {
- currentTopicRegistry.put(topic, new Pool[Int, PartitionTopicInfo])
-
- val curConsumers = consumersPerTopicMap.get(topic).get
- val curPartitions: Seq[Int] = partitionsPerTopicMap.get(topic).get
-
- val nPartsPerConsumer = curPartitions.size / curConsumers.size
- val nConsumersWithExtraPart = curPartitions.size % curConsumers.size
-
- info("Consumer " + consumerIdString + " rebalancing the following partitions: " + curPartitions +
- " for topic " + topic + " with consumers: " + curConsumers)
-
- for (consumerThreadId <- consumerThreadIdSet) {
- val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
- assert(myConsumerPosition >= 0)
- val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
- val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
-
- /**
- * Range-partition the sorted partitions to consumers for better locality.
- * The first few consumers pick up an extra partition, if any.
- */
- if (nParts <= 0)
- warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
- else {
- for (i <- startPart until startPart + nParts) {
- val partition = curPartitions(i)
- info(consumerThreadId + " attempting to claim partition " + partition)
- // record the partition ownership decision
- partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId)
+ topicCount match {
+ case wildcardCount: WildcardTopicCount =>
+ // get list of current consumers
+ // consumersPerTopicMap has wildcard-0, wildcard-1, wildcard-2, etc.
+// val allConsumerThreadIds = wildcardCount.getThreadIds
+
+ // get list of all partitions of all allowed topics
+ val allTopicMetadata = ClientUtils.fetchTopicMetadata(Set("topics"), brokers,
+ config.clientId, config.socketTimeoutMs).topicsMetadata
+ val allowedTopicsMetadata = allTopicMetadata.filter(
+ metadata => wildcardTopicFilter.isTopicAllowed(metadata.topic, config.excludeInternalTopics))
+
+// val allowedPartitions = allowedTopicsMetadata.flatMap(
+// tm => tm.partitionsMetadata.map(
+// pm => TopicAndPartition(tm.topic, pm.partitionId))).sorted
+
+ // round-robin assignment of partitions to topic-thread-ids
+ case staticCount: StaticTopicCount =>
+ for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) {
+ currentTopicRegistry.put(topic, new Pool[Int, PartitionTopicInfo])
+
+ val curConsumers = consumersPerTopicMap.get(topic).get
+ val curPartitions: Seq[Int] = partitionsPerTopicMap.get(topic).get
+
+ val nPartsPerConsumer = curPartitions.size / curConsumers.size
+ val nConsumersWithExtraPart = curPartitions.size % curConsumers.size
+
+ info("Consumer " + consumerIdString + " rebalancing the following partitions: " + curPartitions +
+ " for topic " + topic + " with consumers: " + curConsumers)
+
+ for (consumerThreadId <- consumerThreadIdSet) {
+ val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
+ assert(myConsumerPosition >= 0)
+ val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
+ val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
+
+ /**
+ * Range-partition the sorted partitions to consumers for better locality.
+ * The first few consumers pick up an extra partition, if any.
+ */
+ if (nParts <= 0)
+ warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
+ else {
+ for (i <- startPart until startPart + nParts) {
+ val partition = curPartitions(i)
+ info(consumerThreadId + " attempting to claim partition " + partition)
+ // record the partition ownership decision
+ partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId)
+ }
+ }
}
}
- }
}
// fetch current offsets for all topic-partitions
@@ -688,6 +722,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val (topic, partition) = topicAndPartition.asTuple
val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset
val threadId = partitionOwnershipDecision(topicAndPartition)
+ // RR TODO: make sure the threadid is complete (of the form <cid>-topic-threadid
addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId)
}
@@ -716,7 +751,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
fetcher match {
case Some(f) =>
f.stopConnections
- clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams)
+ clearFetcherQueues(queuesToBeCleared, messageStreams)
info("Committing all offsets after clearing the fetcher queues")
/**
* here, we need to commit offsets before stopping the consumer from returning any more messages
@@ -732,8 +767,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
}
- private def clearFetcherQueues(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster,
- queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]],
+ private def clearFetcherQueues(queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]],
messageStreams: Map[String,List[KafkaStream[_,_]]]) {
// Clear all but the currently iterated upon chunk in the consumer thread's queue
@@ -752,6 +786,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
relevantTopicThreadIdsMap: Map[String, Set[String]]) {
// only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer
// after this rebalancing attempt
+ // RR TODO: topicthreadidandqueues not needed - will clear all queues off messagestreams
val queuesTobeCleared = topicThreadIdAndQueues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q => q._2)
closeFetchersForQueues(cluster, messageStreams, queuesTobeCleared)
}
@@ -802,12 +837,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
else true
}
+ // RR TODO: this should not take topic-threadid. just the queue; or fully qualified threadid
private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]],
partition: Int, topic: String,
offset: Long, consumerThreadId: String) {
val partTopicInfoMap = currentTopicRegistry.get(topic)
- val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
+ val queue = topicThreadIdAndQueues.get(consumerThreadId)
val consumedOffset = new AtomicLong(offset)
val fetchedOffset = new AtomicLong(offset)
val partTopicInfo = new PartitionTopicInfo(topic,
@@ -827,13 +863,16 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
topicCount: TopicCount,
queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) {
+ println("reinitializing")
+
+ // RR TODO: most of these can actually move out to a once-only initialization
+ // The only thing that really needs to be "re-initialized" is the partition change listener
val dirs = new ZKGroupDirs(config.groupId)
- // listener to consumer and partition changes
+ // listener for consumer and partition changes
if (loadBalancerListener == null) {
- val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]]
- loadBalancerListener = new ZKRebalancerListener(
- config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]])
+ val streams = queuesAndStreams.map(_._2)
+ loadBalancerListener = new ZKRebalancerListener(config.groupId, consumerIdString, streams)
}
// create listener for session expired event if not exist yet
@@ -848,9 +887,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams
// map of {topic -> Set(thread-1, thread-2, ...)}
+ // RR TODO: this can be commented
val consumerThreadIdsPerTopic: Map[String, Set[String]] =
topicCount.getConsumerThreadIdsPerTopic
+ // RR TODO: this can be commented
val allQueuesAndStreams = topicCount match {
case wildTopicCount: WildcardTopicCount =>
/*
@@ -898,6 +939,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
+ // RR TODO: if we go with the wildcard changes then this will need to be revisited. i.e., we need to have the
+ // list of topics to add watchers explicitly
topicStreamsMap.foreach { topicAndStreams =>
// register on broker partition path changes
val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1
diff --git a/core/src/main/scala/kafka/utils/Annotations_2.8.scala b/core/src/main/scala/kafka/utils/Annotations_2.8.scala
deleted file mode 100644
index 28269eb..0000000
--- a/core/src/main/scala/kafka/utils/Annotations_2.8.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-/* Some helpful annotations */
-
-/**
- * Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation
- * must respect
- */
-class threadsafe extends StaticAnnotation
-
-/**
- * Indicates that the annotated class is not threadsafe
- */
-class nonthreadsafe extends StaticAnnotation
-
-/**
- * Indicates that the annotated class is immutable
- */
-class immutable extends StaticAnnotation
diff --git a/core/src/test/scala/unit/kafka/consumer/RebalanceTest.scala b/core/src/test/scala/unit/kafka/consumer/RebalanceTest.scala
new file mode 100644
index 0000000..f5347de
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/consumer/RebalanceTest.scala
@@ -0,0 +1,67 @@
+package kafka.consumer
+
+import junit.framework.Assert._
+import org.junit.Test
+import scala.collection._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+import java.util
+import kafka.consumer.RebalanceTest.ConsumerScenario
+
+
+@RunWith(value = classOf[Parameterized])
+class RebalanceTest(scenario: ConsumerScenario) {
+
+ @Test
+ def testRoundRobinRebalance() {
+ val consumerIds = (0 until scenario.numConsumers).map("id" + _)
+ val topicCounts =
+ Map(consumerIds.map(id => (id, new WildcardTopicCount(null, id, new Whitelist(".*"), scenario.numStreams, true))):_*)
+ val ownershipMap = mutable.Map[Int, String]()
+
+ val expectedMinOwnerCount = scenario.numPartitions / (scenario.numConsumers * scenario.numStreams)
+ val expectedMaxOwnerCount = expectedMinOwnerCount + 1
+
+ // process one consumer at a time
+ (0 until scenario.numConsumers).map(consumerIdx => {
+ val id = consumerIds(consumerIdx)
+ val topicCount = topicCounts.get(id).get
+ val threadIds = topicCount.getThreadIds("wildcard").toIndexedSeq.sorted
+ val numThreads = threadIds.size
+
+ val threadStep = scenario.numStreams * scenario.numConsumers
+
+ // within each consumer, process one thread at a time
+ (0 until numThreads).foreach(threadIdx => {
+ var ownerCount = 0
+ var partitionIdx = consumerIdx + threadIdx * scenario.numConsumers
+ while (partitionIdx < scenario.numPartitions) {
+ assertTrue("Unique owner constraint failed for partition " + partitionIdx, ownershipMap.get(partitionIdx).isEmpty)
+ ownerCount += 1
+ ownershipMap.put(partitionIdx, threadIds(threadIdx))
+ partitionIdx += threadStep
+ }
+ // check that round-robin partitioning yields even layout
+ assertTrue("Balanced ownership constraint failed", ownerCount == expectedMinOwnerCount || ownerCount == expectedMaxOwnerCount)
+ })
+ })
+
+ assertTrue("Full coverage constraint failed", ownershipMap.size == scenario.numPartitions)
+ }
+
+}
+
+object RebalanceTest {
+ final case class ConsumerScenario(numConsumers: Int, numStreams: Int, numPartitions: Int)
+ @Parameters def parameters: java.util.Collection[Array[ConsumerScenario]] = {
+ val params = new util.ArrayList[Array[ConsumerScenario]]()
+ params.add(Array(ConsumerScenario(numConsumers = 3, numStreams = 3, numPartitions = 10)))
+ params.add(Array(ConsumerScenario(numConsumers = 3, numStreams = 2, numPartitions = 12)))
+ params.add(Array(ConsumerScenario(numConsumers = 1, numStreams = 2, numPartitions = 0)))
+ params.add(Array(ConsumerScenario(numConsumers = 10, numStreams = 2, numPartitions = 2)))
+ params.add(Array(ConsumerScenario(numConsumers = 16, numStreams = 3, numPartitions = 2045)))
+ // can also generate some random configurations
+ params
+ }
+}
\ No newline at end of file
diff --git a/gradle.properties b/gradle.properties
index 4827769..236e243 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -15,7 +15,7 @@
group=org.apache.kafka
version=0.8.1
-scalaVersion=2.8.0
+scalaVersion=2.9.2
task=build
mavenUrl=
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment