Skip to content

Instantly share code, notes, and snippets.

@dejanb
Created January 22, 2014 15:11
Show Gist options
  • Save dejanb/8560346 to your computer and use it in GitHub Desktop.
Save dejanb/8560346 to your computer and use it in GitHub Desktop.
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.
index 0ae4463..f903090 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -17,18 +17,7 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -113,7 +102,6 @@ public class Queue extends BaseDestination implements Task, UsageListener {
// Messages that are paged in but have not yet been targeted at a subscription
private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock();
protected PendingList pagedInPendingDispatch = new OrderedPendingList();
- protected PendingList redeliveredWaitingDispatch = new OrderedPendingList();
private MessageGroupMap messageGroupOwners;
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
private MessageGroupMapFactory messageGroupMapFactory = new CachedMessageGroupMapFactory();
@@ -344,10 +332,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) {
pagedInPendingDispatch = new PrioritizedPendingList();
- redeliveredWaitingDispatch = new PrioritizedPendingList();
} else if(pagedInPendingDispatch instanceof PrioritizedPendingList) {
pagedInPendingDispatch = new OrderedPendingList();
- redeliveredWaitingDispatch = new OrderedPendingList();
}
}
@@ -556,9 +542,9 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
}
}
-
- for (MessageReference ref : unAckedMessages) {
- QueueMessageReference qmr = (QueueMessageReference) ref;
+ ListIterator unAckedMessagesIterator = unAckedMessages.listIterator(unAckedMessages.size());
+ while (unAckedMessagesIterator.hasPrevious()) {
+ QueueMessageReference qmr = (QueueMessageReference)unAckedMessagesIterator.previous();
if (qmr.getLockOwner() == sub) {
qmr.unlock();
@@ -569,21 +555,22 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (markAsRedelivered) {
qmr.incrementRedeliveryCounter();
}
- if (ref == lastDeliveredRef) {
+ System.out.println("qmr " + (qmr == lastDeliveredRef));
+ if (qmr == lastDeliveredRef) {
// all that follow were not redelivered
markAsRedelivered = false;
}
}
}
if (!qmr.isDropped()) {
- redeliveredWaitingDispatch.addMessageLast(qmr);
+ pagedInPendingDispatch.addMessageFirst(qmr);
}
}
if (sub instanceof QueueBrowserSubscription) {
((QueueBrowserSubscription)sub).decrementQueueRef();
browserDispatches.remove(sub);
}
- if (!redeliveredWaitingDispatch.isEmpty()) {
+ if (!pagedInPendingDispatch.isEmpty()) {
doDispatch(new OrderedPendingList());
}
} finally {
@@ -1654,7 +1641,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
// then we do a dispatch.
boolean hasBrowsers = browserDispatches.size() > 0;
- if (pageInMoreMessages || hasBrowsers || !redeliveredWaitingDispatch.isEmpty()) {
+ if (pageInMoreMessages || hasBrowsers) {
try {
pageInMessages(hasBrowsers);
} catch (Throwable e) {
@@ -2019,11 +2006,6 @@ public class Queue extends BaseDestination implements Task, UsageListener {
pagedInPendingDispatchLock.writeLock().lock();
try {
- if (!redeliveredWaitingDispatch.isEmpty()) {
- // Try first to dispatch redelivered messages to keep an
- // proper order
- redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch);
- }
if (!pagedInPendingDispatch.isEmpty()) {
// Next dispatch anything that had not been
// dispatched before.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment