Created
March 2, 2011 16:39
-
-
Save dejanb/851227 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java | |
index 9db3fa0..94dfbd0 100755 | |
--- a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java | |
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java | |
@@ -17,6 +17,7 @@ | |
package org.apache.activemq.broker; | |
import java.net.URI; | |
+import java.util.Map; | |
import java.util.Set; | |
import java.util.concurrent.ThreadPoolExecutor; | |
import org.apache.activemq.Service; | |
@@ -24,15 +25,7 @@ import org.apache.activemq.broker.region.Destination; | |
import org.apache.activemq.broker.region.MessageReference; | |
import org.apache.activemq.broker.region.Region; | |
import org.apache.activemq.broker.region.Subscription; | |
-import org.apache.activemq.command.ActiveMQDestination; | |
-import org.apache.activemq.command.BrokerId; | |
-import org.apache.activemq.command.BrokerInfo; | |
-import org.apache.activemq.command.ConnectionInfo; | |
-import org.apache.activemq.command.DestinationInfo; | |
-import org.apache.activemq.command.MessageDispatch; | |
-import org.apache.activemq.command.ProducerInfo; | |
-import org.apache.activemq.command.SessionInfo; | |
-import org.apache.activemq.command.TransactionId; | |
+import org.apache.activemq.command.*; | |
import org.apache.activemq.network.NetworkBridge; | |
import org.apache.activemq.store.kahadb.plist.PListStore; | |
import org.apache.activemq.thread.Scheduler; | |
@@ -384,5 +377,7 @@ public interface Broker extends Region, Service { | |
void networkBridgeStopped(BrokerInfo brokerInfo); | |
+ public Map<ConsumerId, Subscription> getSubscriptions(ActiveMQDestination destination) throws Exception; | |
+ | |
} | |
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java | |
index 244e886..47b7ec9 100755 | |
--- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java | |
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java | |
@@ -23,23 +23,7 @@ import java.util.concurrent.ThreadPoolExecutor; | |
import org.apache.activemq.broker.region.Destination; | |
import org.apache.activemq.broker.region.MessageReference; | |
import org.apache.activemq.broker.region.Subscription; | |
-import org.apache.activemq.command.ActiveMQDestination; | |
-import org.apache.activemq.command.BrokerId; | |
-import org.apache.activemq.command.BrokerInfo; | |
-import org.apache.activemq.command.ConnectionInfo; | |
-import org.apache.activemq.command.ConsumerControl; | |
-import org.apache.activemq.command.ConsumerInfo; | |
-import org.apache.activemq.command.DestinationInfo; | |
-import org.apache.activemq.command.Message; | |
-import org.apache.activemq.command.MessageAck; | |
-import org.apache.activemq.command.MessageDispatch; | |
-import org.apache.activemq.command.MessageDispatchNotification; | |
-import org.apache.activemq.command.MessagePull; | |
-import org.apache.activemq.command.ProducerInfo; | |
-import org.apache.activemq.command.RemoveSubscriptionInfo; | |
-import org.apache.activemq.command.Response; | |
-import org.apache.activemq.command.SessionInfo; | |
-import org.apache.activemq.command.TransactionId; | |
+import org.apache.activemq.command.*; | |
import org.apache.activemq.network.NetworkBridge; | |
import org.apache.activemq.store.kahadb.plist.PListStore; | |
import org.apache.activemq.thread.Scheduler; | |
@@ -319,4 +303,9 @@ public class BrokerFilter implements Broker { | |
public void networkBridgeStopped(BrokerInfo brokerInfo) { | |
next.networkBridgeStopped(brokerInfo); | |
} | |
+ | |
+ @Override | |
+ public Map<ConsumerId, Subscription> getSubscriptions(ActiveMQDestination destination) throws Exception { | |
+ return next.getSubscriptions(destination); | |
+ } | |
} | |
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java | |
index 3373971..21c6a18 100644 | |
--- a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java | |
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java | |
@@ -24,23 +24,7 @@ import java.util.concurrent.ThreadPoolExecutor; | |
import org.apache.activemq.broker.region.Destination; | |
import org.apache.activemq.broker.region.MessageReference; | |
import org.apache.activemq.broker.region.Subscription; | |
-import org.apache.activemq.command.ActiveMQDestination; | |
-import org.apache.activemq.command.BrokerId; | |
-import org.apache.activemq.command.BrokerInfo; | |
-import org.apache.activemq.command.ConnectionInfo; | |
-import org.apache.activemq.command.ConsumerControl; | |
-import org.apache.activemq.command.ConsumerInfo; | |
-import org.apache.activemq.command.DestinationInfo; | |
-import org.apache.activemq.command.Message; | |
-import org.apache.activemq.command.MessageAck; | |
-import org.apache.activemq.command.MessageDispatch; | |
-import org.apache.activemq.command.MessageDispatchNotification; | |
-import org.apache.activemq.command.MessagePull; | |
-import org.apache.activemq.command.ProducerInfo; | |
-import org.apache.activemq.command.RemoveSubscriptionInfo; | |
-import org.apache.activemq.command.Response; | |
-import org.apache.activemq.command.SessionInfo; | |
-import org.apache.activemq.command.TransactionId; | |
+import org.apache.activemq.command.*; | |
import org.apache.activemq.store.kahadb.plist.PListStore; | |
import org.apache.activemq.thread.Scheduler; | |
import org.apache.activemq.usage.Usage; | |
@@ -288,6 +272,11 @@ public class EmptyBroker implements Broker { | |
public void networkBridgeStopped(BrokerInfo brokerInfo) { | |
} | |
+ @Override | |
+ public Map<ConsumerId, Subscription> getSubscriptions(ActiveMQDestination destination) throws Exception { | |
+ return null; | |
+ } | |
+ | |
public void processConsumerControl(ConsumerBrokerExchange consumerExchange, | |
ConsumerControl control) { | |
} | |
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java | |
index 14469ab..af7bf8f 100644 | |
--- a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java | |
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java | |
@@ -24,23 +24,7 @@ import java.util.concurrent.ThreadPoolExecutor; | |
import org.apache.activemq.broker.region.Destination; | |
import org.apache.activemq.broker.region.MessageReference; | |
import org.apache.activemq.broker.region.Subscription; | |
-import org.apache.activemq.command.ActiveMQDestination; | |
-import org.apache.activemq.command.BrokerId; | |
-import org.apache.activemq.command.BrokerInfo; | |
-import org.apache.activemq.command.ConnectionInfo; | |
-import org.apache.activemq.command.ConsumerControl; | |
-import org.apache.activemq.command.ConsumerInfo; | |
-import org.apache.activemq.command.DestinationInfo; | |
-import org.apache.activemq.command.Message; | |
-import org.apache.activemq.command.MessageAck; | |
-import org.apache.activemq.command.MessageDispatch; | |
-import org.apache.activemq.command.MessageDispatchNotification; | |
-import org.apache.activemq.command.MessagePull; | |
-import org.apache.activemq.command.ProducerInfo; | |
-import org.apache.activemq.command.RemoveSubscriptionInfo; | |
-import org.apache.activemq.command.Response; | |
-import org.apache.activemq.command.SessionInfo; | |
-import org.apache.activemq.command.TransactionId; | |
+import org.apache.activemq.command.*; | |
import org.apache.activemq.store.kahadb.plist.PListStore; | |
import org.apache.activemq.thread.Scheduler; | |
import org.apache.activemq.usage.Usage; | |
@@ -320,4 +304,9 @@ public class ErrorBroker implements Broker { | |
public void networkBridgeStopped(BrokerInfo brokerInfo) { | |
throw new BrokerStoppedException(this.message); | |
} | |
+ | |
+ @Override | |
+ public Map<ConsumerId, Subscription> getSubscriptions(ActiveMQDestination destination) throws Exception { | |
+ throw new BrokerStoppedException(this.message); | |
+ } | |
} | |
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java | |
index d243bf8..5f97edb 100644 | |
--- a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java | |
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java | |
@@ -24,23 +24,7 @@ import java.util.concurrent.atomic.AtomicReference; | |
import org.apache.activemq.broker.region.Destination; | |
import org.apache.activemq.broker.region.MessageReference; | |
import org.apache.activemq.broker.region.Subscription; | |
-import org.apache.activemq.command.ActiveMQDestination; | |
-import org.apache.activemq.command.BrokerId; | |
-import org.apache.activemq.command.BrokerInfo; | |
-import org.apache.activemq.command.ConnectionInfo; | |
-import org.apache.activemq.command.ConsumerControl; | |
-import org.apache.activemq.command.ConsumerInfo; | |
-import org.apache.activemq.command.DestinationInfo; | |
-import org.apache.activemq.command.Message; | |
-import org.apache.activemq.command.MessageAck; | |
-import org.apache.activemq.command.MessageDispatch; | |
-import org.apache.activemq.command.MessageDispatchNotification; | |
-import org.apache.activemq.command.MessagePull; | |
-import org.apache.activemq.command.ProducerInfo; | |
-import org.apache.activemq.command.RemoveSubscriptionInfo; | |
-import org.apache.activemq.command.Response; | |
-import org.apache.activemq.command.SessionInfo; | |
-import org.apache.activemq.command.TransactionId; | |
+import org.apache.activemq.command.*; | |
import org.apache.activemq.store.kahadb.plist.PListStore; | |
import org.apache.activemq.thread.Scheduler; | |
import org.apache.activemq.usage.Usage; | |
@@ -329,4 +313,9 @@ public class MutableBrokerFilter implements Broker { | |
public void networkBridgeStopped(BrokerInfo brokerInfo) { | |
getNext().networkBridgeStopped(brokerInfo); | |
} | |
+ | |
+ @Override | |
+ public Map<ConsumerId, Subscription> getSubscriptions(ActiveMQDestination destination) throws Exception { | |
+ return getNext().getSubscriptions(destination); | |
+ } | |
} | |
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java | |
index 4333632..f1a32ea 100755 | |
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java | |
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java | |
@@ -22,16 +22,7 @@ import org.apache.activemq.Service; | |
import org.apache.activemq.broker.ConnectionContext; | |
import org.apache.activemq.broker.ConsumerBrokerExchange; | |
import org.apache.activemq.broker.ProducerBrokerExchange; | |
-import org.apache.activemq.command.ActiveMQDestination; | |
-import org.apache.activemq.command.ConsumerControl; | |
-import org.apache.activemq.command.ConsumerInfo; | |
-import org.apache.activemq.command.Message; | |
-import org.apache.activemq.command.MessageAck; | |
-import org.apache.activemq.command.MessageDispatchNotification; | |
-import org.apache.activemq.command.MessagePull; | |
-import org.apache.activemq.command.ProducerInfo; | |
-import org.apache.activemq.command.RemoveSubscriptionInfo; | |
-import org.apache.activemq.command.Response; | |
+import org.apache.activemq.command.*; | |
/** | |
* A Region is used to implement the different QOS options available to | |
@@ -151,5 +142,5 @@ public interface Region extends Service { | |
Set <Destination>getDestinations(ActiveMQDestination destination); | |
void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control); | |
- | |
+ | |
} | |
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java | |
index 8ca0e76..329441e 100755 | |
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java | |
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java | |
@@ -40,23 +40,7 @@ import org.apache.activemq.broker.ProducerBrokerExchange; | |
import org.apache.activemq.broker.TransportConnector; | |
import org.apache.activemq.broker.region.policy.DeadLetterStrategy; | |
import org.apache.activemq.broker.region.policy.PolicyMap; | |
-import org.apache.activemq.command.ActiveMQDestination; | |
-import org.apache.activemq.command.BrokerId; | |
-import org.apache.activemq.command.BrokerInfo; | |
-import org.apache.activemq.command.ConnectionId; | |
-import org.apache.activemq.command.ConnectionInfo; | |
-import org.apache.activemq.command.ConsumerControl; | |
-import org.apache.activemq.command.ConsumerInfo; | |
-import org.apache.activemq.command.DestinationInfo; | |
-import org.apache.activemq.command.Message; | |
-import org.apache.activemq.command.MessageAck; | |
-import org.apache.activemq.command.MessageDispatch; | |
-import org.apache.activemq.command.MessageDispatchNotification; | |
-import org.apache.activemq.command.MessagePull; | |
-import org.apache.activemq.command.ProducerInfo; | |
-import org.apache.activemq.command.RemoveSubscriptionInfo; | |
-import org.apache.activemq.command.Response; | |
-import org.apache.activemq.command.TransactionId; | |
+import org.apache.activemq.command.*; | |
import org.apache.activemq.state.ConnectionState; | |
import org.apache.activemq.store.kahadb.plist.PListStore; | |
import org.apache.activemq.thread.Scheduler; | |
@@ -163,6 +147,21 @@ public class RegionBroker extends EmptyBroker { | |
return queueRegion; | |
} | |
+ public Map<ConsumerId, Subscription> getSubscriptions(ActiveMQDestination destination) throws Exception { | |
+ switch (destination.getDestinationType()) { | |
+ case ActiveMQDestination.QUEUE_TYPE: | |
+ return ((QueueRegion)queueRegion).getSubscriptions(); | |
+ case ActiveMQDestination.TOPIC_TYPE: | |
+ return ((QueueRegion)topicRegion).getSubscriptions(); | |
+ case ActiveMQDestination.TEMP_QUEUE_TYPE: | |
+ return ((QueueRegion)tempQueueRegion).getSubscriptions(); | |
+ case ActiveMQDestination.TEMP_TOPIC_TYPE: | |
+ return ((QueueRegion)tempTopicRegion).getSubscriptions(); | |
+ default: | |
+ throw createUnknownDestinationTypeException(destination); | |
+ } | |
+ } | |
+ | |
public Region getTempQueueRegion() { | |
return tempQueueRegion; | |
} | |
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java | |
index eefedd9..dd011e5 100644 | |
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java | |
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java | |
@@ -16,12 +16,20 @@ | |
*/ | |
package org.apache.activemq.broker.region.virtual; | |
+import org.apache.activemq.broker.Broker; | |
import org.apache.activemq.broker.ProducerBrokerExchange; | |
import org.apache.activemq.broker.region.Destination; | |
import org.apache.activemq.broker.region.DestinationFilter; | |
+import org.apache.activemq.broker.region.Subscription; | |
import org.apache.activemq.command.ActiveMQDestination; | |
import org.apache.activemq.command.ActiveMQQueue; | |
+import org.apache.activemq.command.ConsumerId; | |
import org.apache.activemq.command.Message; | |
+import sun.tools.tree.ContinueStatement; | |
+ | |
+import javax.xml.bind.SchemaOutputResolver; | |
+import java.util.Map; | |
+import java.util.Set; | |
/** | |
* A Destination which implements <a | |
@@ -43,7 +51,37 @@ public class VirtualTopicInterceptor extends DestinationFilter { | |
public void send(ProducerBrokerExchange context, Message message) throws Exception { | |
if (!message.isAdvisory()) { | |
ActiveMQDestination queueConsumers = getQueueConsumersWildcard(message.getDestination()); | |
- send(context, message, queueConsumers); | |
+ Broker broker = context.getConnectionContext().getBroker(); | |
+ Set<Destination> destinations = broker.getDestinations(queueConsumers); | |
+ if (!destinations.isEmpty()) { | |
+ for (Destination dest : destinations) { | |
+ dest.send(context, message.copy()); | |
+ } | |
+ } else { | |
+ Map<ConsumerId, Subscription> subs = broker.getSubscriptions(queueConsumers); | |
+ for (Subscription sub : subs.values()) { | |
+ if (sub.matches(queueConsumers)) { | |
+ String[] subPaths = sub.getActiveMQDestination().getDestinationPaths(); | |
+ String[] virtPaths = queueConsumers.getDestinationPaths(); | |
+ String destName = ""; | |
+ String separator = ""; | |
+ for (int i = 0; i < subPaths.length; i++) { | |
+ if (subPaths[i].equals(virtPaths[i]) || virtPaths[i].equals(org.apache.activemq.filter.DestinationFilter.ANY_CHILD)) { | |
+ destName += separator + subPaths[i]; | |
+ } else { | |
+ if (subPaths[i].equals(org.apache.activemq.filter.DestinationFilter.ANY_DESCENDENT)) { | |
+ for (int j = i; j < virtPaths.length; j++) { | |
+ destName += separator + virtPaths[j]; | |
+ } | |
+ } | |
+ } | |
+ separator = ActiveMQDestination.PATH_SEPERATOR; | |
+ } | |
+ ActiveMQDestination dest = ActiveMQDestination.createDestination(destName, ActiveMQDestination.QUEUE_TYPE); | |
+ broker.addDestination(context.getConnectionContext(), dest, false).send(context, message.copy()); | |
+ } | |
+ } | |
+ } | |
} | |
super.send(context, message); | |
} | |
diff --git a/activemq-core/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java b/activemq-core/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java | |
index ec2e45e..c7b8e88 100755 | |
--- a/activemq-core/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java | |
+++ b/activemq-core/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java | |
@@ -19,6 +19,8 @@ package org.apache.activemq.filter; | |
import org.apache.activemq.command.ActiveMQDestination; | |
+import java.util.Arrays; | |
+ | |
/** | |
* Matches messages which match a prefix like "A.B.>" | |
@@ -47,7 +49,7 @@ public class PrefixDestinationFilter extends DestinationFilter { | |
if (path.length >= length) { | |
int size = length - 1; | |
for (int i = 0; i < size; i++) { | |
- if (!prefixes[i].equals(path[i])) { | |
+ if (!path[i].equals(ANY_CHILD) && !prefixes[i].equals(path[i])) { | |
return false; | |
} | |
} | |
diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java | |
index f4644b6..b1049ce 100644 | |
--- a/activemq-core/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java | |
+++ b/activemq-core/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java | |
@@ -82,9 +82,9 @@ public class SingleBrokerVirtualDestinationsWithWildcardTest extends JmsMultiple | |
startAllBrokers(); | |
sendReceive("Consumer.a.local.test.>", false, "Consumer.a.local.test.>", false, 1, 1); | |
- sendReceive("local.test.1", true, "Consumer.a.local.test.>", false, 1, 2); // duplicates due to wildcard queue pre-created | |
+ sendReceive("local.test.1", true, "Consumer.a.local.test.>", false, 1, 1); | |
sendReceive("Consumer.a.global.test.>", false, "Consumer.a.global.test.>", false, 1, 1); | |
- sendReceive("global.test.1", true, "Consumer.a.global.test.>", false, 1, 2); // duplicates due to wildcard queue pre-created | |
+ sendReceive("global.test.1", true, "Consumer.a.global.test.>", false, 1, 1); | |
destroyAllBrokers(); | |
} | |
@@ -110,7 +110,7 @@ public class SingleBrokerVirtualDestinationsWithWildcardTest extends JmsMultiple | |
private BrokerService createAndConfigureBroker(URI uri) throws Exception { | |
BrokerService broker = createBroker(uri); | |
// without this testVirtualDestinationsWithWildcardWithoutIndividualVirtualQueue will fail | |
- broker.setDestinations(new ActiveMQDestination[] {new ActiveMQQueue("Consumer.a.local.test.1"), new ActiveMQQueue("Consumer.a.global.test.1")}); | |
+ //broker.setDestinations(new ActiveMQDestination[] {new ActiveMQQueue("Consumer.a.local.test.1"), new ActiveMQQueue("Consumer.a.global.test.1")}); | |
configurePersistenceAdapter(broker); | |
diff --git a/activemq-core/src/test/resources/log4j.properties b/activemq-core/src/test/resources/log4j.properties | |
index 35d2015..c4ef52d 100755 | |
--- a/activemq-core/src/test/resources/log4j.properties | |
+++ b/activemq-core/src/test/resources/log4j.properties | |
@@ -24,7 +24,7 @@ log4j.rootLogger=INFO, out, stdout | |
#log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG | |
#log4j.logger.org.apache.activemq=TRACE | |
#log4j.logger.org.apache.activemq.store.jdbc=TRACE | |
-#log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG | |
+log4j.logger.org.apache.activemq.broker.region.AbstractRegion=DEBUG | |
#log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG | |
# CONSOLE appender not used by default |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment