Created
January 22, 2014 15:11
-
-
Save dejanb/8560346 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue. | |
index 0ae4463..f903090 100755 | |
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java | |
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java | |
@@ -17,18 +17,7 @@ | |
package org.apache.activemq.broker.region; | |
import java.io.IOException; | |
-import java.util.ArrayList; | |
-import java.util.Collection; | |
-import java.util.Collections; | |
-import java.util.Comparator; | |
-import java.util.HashSet; | |
-import java.util.Iterator; | |
-import java.util.LinkedHashMap; | |
-import java.util.LinkedHashSet; | |
-import java.util.LinkedList; | |
-import java.util.List; | |
-import java.util.Map; | |
-import java.util.Set; | |
+import java.util.*; | |
import java.util.concurrent.CancellationException; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.ConcurrentLinkedQueue; | |
@@ -113,7 +102,6 @@ public class Queue extends BaseDestination implements Task, UsageListener { | |
// Messages that are paged in but have not yet been targeted at a subscription | |
private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock(); | |
protected PendingList pagedInPendingDispatch = new OrderedPendingList(); | |
- protected PendingList redeliveredWaitingDispatch = new OrderedPendingList(); | |
private MessageGroupMap messageGroupOwners; | |
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); | |
private MessageGroupMapFactory messageGroupMapFactory = new CachedMessageGroupMapFactory(); | |
@@ -344,10 +332,8 @@ public class Queue extends BaseDestination implements Task, UsageListener { | |
if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) { | |
pagedInPendingDispatch = new PrioritizedPendingList(); | |
- redeliveredWaitingDispatch = new PrioritizedPendingList(); | |
} else if(pagedInPendingDispatch instanceof PrioritizedPendingList) { | |
pagedInPendingDispatch = new OrderedPendingList(); | |
- redeliveredWaitingDispatch = new OrderedPendingList(); | |
} | |
} | |
@@ -556,9 +542,9 @@ public class Queue extends BaseDestination implements Task, UsageListener { | |
} | |
} | |
} | |
- | |
- for (MessageReference ref : unAckedMessages) { | |
- QueueMessageReference qmr = (QueueMessageReference) ref; | |
+ ListIterator unAckedMessagesIterator = unAckedMessages.listIterator(unAckedMessages.size()); | |
+ while (unAckedMessagesIterator.hasPrevious()) { | |
+ QueueMessageReference qmr = (QueueMessageReference)unAckedMessagesIterator.previous(); | |
if (qmr.getLockOwner() == sub) { | |
qmr.unlock(); | |
@@ -569,21 +555,22 @@ public class Queue extends BaseDestination implements Task, UsageListener { | |
if (markAsRedelivered) { | |
qmr.incrementRedeliveryCounter(); | |
} | |
- if (ref == lastDeliveredRef) { | |
+ System.out.println("qmr " + (qmr == lastDeliveredRef)); | |
+ if (qmr == lastDeliveredRef) { | |
// all that follow were not redelivered | |
markAsRedelivered = false; | |
} | |
} | |
} | |
if (!qmr.isDropped()) { | |
- redeliveredWaitingDispatch.addMessageLast(qmr); | |
+ pagedInPendingDispatch.addMessageFirst(qmr); | |
} | |
} | |
if (sub instanceof QueueBrowserSubscription) { | |
((QueueBrowserSubscription)sub).decrementQueueRef(); | |
browserDispatches.remove(sub); | |
} | |
- if (!redeliveredWaitingDispatch.isEmpty()) { | |
+ if (!pagedInPendingDispatch.isEmpty()) { | |
doDispatch(new OrderedPendingList()); | |
} | |
} finally { | |
@@ -1654,7 +1641,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { | |
// then we do a dispatch. | |
boolean hasBrowsers = browserDispatches.size() > 0; | |
- if (pageInMoreMessages || hasBrowsers || !redeliveredWaitingDispatch.isEmpty()) { | |
+ if (pageInMoreMessages || hasBrowsers) { | |
try { | |
pageInMessages(hasBrowsers); | |
} catch (Throwable e) { | |
@@ -2019,11 +2006,6 @@ public class Queue extends BaseDestination implements Task, UsageListener { | |
pagedInPendingDispatchLock.writeLock().lock(); | |
try { | |
- if (!redeliveredWaitingDispatch.isEmpty()) { | |
- // Try first to dispatch redelivered messages to keep an | |
- // proper order | |
- redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch); | |
- } | |
if (!pagedInPendingDispatch.isEmpty()) { | |
// Next dispatch anything that had not been | |
// dispatched before. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment