Created
September 29, 2015 22:00
-
-
Save alq666/1796511f5caff0743561 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
From 1814e7c904072f0f67c5128d53a20d26ebb56b1a Mon Sep 17 00:00:00 2001 | |
From: Jun Rao <[email protected]> | |
Date: Tue, 12 May 2015 15:37:21 -0700 | |
Subject: [PATCH 1/3] synchronize on getting size from watchers | |
--- | |
core/src/main/scala/kafka/server/RequestPurgatory.scala | 6 +++++- | |
1 file changed, 5 insertions(+), 1 deletion(-) | |
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala | |
index 87ee3be..098679c 100644 | |
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala | |
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala | |
@@ -196,7 +196,11 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt | |
private val requests = new util.LinkedList[T] | |
// return the size of the watch list | |
- def watched() = requests.size() | |
+ def watched() = { | |
+ synchronized { | |
+ requests.size() | |
+ } | |
+ } | |
// add the element to the watcher list if it's not already satisfied | |
def addIfNotSatisfied(t: T): Boolean = { | |
-- | |
1.8.5.2 (Apple Git-48) | |
From ef381eaefea768eb95da279d1bf197aeab27a66b Mon Sep 17 00:00:00 2001 | |
From: Jun Rao <[email protected]> | |
Date: Thu, 14 May 2015 09:04:38 -0700 | |
Subject: [PATCH 2/3] add instrumentation | |
--- | |
core/src/main/scala/kafka/server/FetchRequestPurgatory.scala | 2 +- | |
core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala | 2 +- | |
core/src/main/scala/kafka/server/RequestPurgatory.scala | 6 +++++- | |
3 files changed, 7 insertions(+), 3 deletions(-) | |
diff --git a/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala b/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala | |
index ed13188..5a20233 100644 | |
--- a/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala | |
+++ b/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala | |
@@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit | |
* The purgatory holding delayed fetch requests | |
*/ | |
class FetchRequestPurgatory(replicaManager: ReplicaManager, requestChannel: RequestChannel) | |
- extends RequestPurgatory[DelayedFetch](replicaManager.config.brokerId, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) { | |
+ extends RequestPurgatory[DelayedFetch](replicaManager.config.brokerId, replicaManager.config.fetchPurgatoryPurgeIntervalRequests, "FetchPurgatory") { | |
this.logIdent = "[FetchRequestPurgatory-%d] ".format(replicaManager.config.brokerId) | |
private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup { | |
diff --git a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala | |
index e7ff411..852841a 100644 | |
--- a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala | |
+++ b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala | |
@@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit | |
* The purgatory holding delayed producer requests | |
*/ | |
class ProducerRequestPurgatory(replicaManager: ReplicaManager, offsetManager: OffsetManager, requestChannel: RequestChannel) | |
- extends RequestPurgatory[DelayedProduce](replicaManager.config.brokerId, replicaManager.config.producerPurgatoryPurgeIntervalRequests) { | |
+ extends RequestPurgatory[DelayedProduce](replicaManager.config.brokerId, replicaManager.config.producerPurgatoryPurgeIntervalRequests, "ProducePurgatory") { | |
this.logIdent = "[ProducerRequestPurgatory-%d] ".format(replicaManager.config.brokerId) | |
private class DelayedProducerRequestMetrics(metricId: Option[TopicAndPartition]) extends KafkaMetricsGroup { | |
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala | |
index 098679c..701583e 100644 | |
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala | |
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala | |
@@ -65,7 +65,7 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de | |
* this function handles delayed requests that have hit their time limit without being satisfied. | |
* | |
*/ | |
-abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: Int = 1000) | |
+abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: Int = 1000, purgatoryName: String = "") | |
extends Logging with KafkaMetricsGroup { | |
/* a list of requests watching each key */ | |
@@ -280,6 +280,10 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt | |
expire(curr) | |
} | |
} | |
+ | |
+ debug("Checking for purging in %s; watched items: %d, purge threshold: %d". | |
+ format(purgatoryName, RequestPurgatory.this.watched(), purgeInterval)) | |
+ | |
// see if we need to purge the watch lists | |
if (RequestPurgatory.this.watched() >= purgeInterval) { | |
debug("Begin purging watch lists") | |
-- | |
1.8.5.2 (Apple Git-48) | |
From 62f5112a398f58975ad4c163fcbd08af9e852cd0 Mon Sep 17 00:00:00 2001 | |
From: Jun Rao <[email protected]> | |
Date: Fri, 15 May 2015 15:52:27 -0700 | |
Subject: [PATCH 3/3] return from pollExpired() on expired items | |
--- | |
core/src/main/scala/kafka/server/RequestPurgatory.scala | 13 +++++++------ | |
1 file changed, 7 insertions(+), 6 deletions(-) | |
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala | |
index 701583e..9fa00e8 100644 | |
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala | |
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala | |
@@ -286,15 +286,15 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt | |
// see if we need to purge the watch lists | |
if (RequestPurgatory.this.watched() >= purgeInterval) { | |
- debug("Begin purging watch lists") | |
+ debug("Begin purging watch lists in %s".format(purgatoryName)) | |
val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum | |
- debug("Purged %d elements from watch lists.".format(numPurgedFromWatchers)) | |
+ debug("Purged %d elements from watch lists in %s.".format(numPurgedFromWatchers, purgatoryName)) | |
} | |
// see if we need to purge the delayed request queue | |
if (delayed() >= purgeInterval) { | |
- debug("Begin purging delayed queue") | |
+ debug("Begin purging delayed queue in %s".format(purgatoryName)) | |
val purged = purgeSatisfied() | |
- debug("Purged %d requests from delayed queue.".format(purged)) | |
+ debug("Purged %d requests from delayed queue %s.".format(purged, purgatoryName)) | |
} | |
} catch { | |
case e: Exception => | |
@@ -326,9 +326,10 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt | |
if (curr == null) | |
return null.asInstanceOf[T] | |
val updated = curr.satisfied.compareAndSet(false, true) | |
- if(updated) { | |
+ if(updated) | |
return curr | |
- } | |
+ else | |
+ return null.asInstanceOf[T] | |
} | |
throw new RuntimeException("This should not happen") | |
} | |
-- | |
1.8.5.2 (Apple Git-48) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment