Skip to content

Instantly share code, notes, and snippets.

@dejanb
Created March 2, 2011 16:39
Show Gist options
  • Save dejanb/851227 to your computer and use it in GitHub Desktop.
Save dejanb/851227 to your computer and use it in GitHub Desktop.
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
index 9db3fa0..94dfbd0 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
@@ -17,6 +17,7 @@
package org.apache.activemq.broker;
import java.net.URI;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.activemq.Service;
@@ -24,15 +25,7 @@ import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Region;
import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.*;
import org.apache.activemq.network.NetworkBridge;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.thread.Scheduler;
@@ -384,5 +377,7 @@ public interface Broker extends Region, Service {
void networkBridgeStopped(BrokerInfo brokerInfo);
+ public Map<ConsumerId, Subscription> getSubscriptions(ActiveMQDestination destination) throws Exception;
+
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
index 244e886..47b7ec9 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
@@ -23,23 +23,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerControl;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.MessagePull;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.*;
import org.apache.activemq.network.NetworkBridge;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.thread.Scheduler;
@@ -319,4 +303,9 @@ public class BrokerFilter implements Broker {
public void networkBridgeStopped(BrokerInfo brokerInfo) {
next.networkBridgeStopped(brokerInfo);
}
+
+ @Override
+ public Map<ConsumerId, Subscription> getSubscriptions(ActiveMQDestination destination) throws Exception {
+ return next.getSubscriptions(destination);
+ }
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
index 3373971..21c6a18 100644
--- a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
@@ -24,23 +24,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerControl;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.MessagePull;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.*;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.Usage;
@@ -288,6 +272,11 @@ public class EmptyBroker implements Broker {
public void networkBridgeStopped(BrokerInfo brokerInfo) {
}
+ @Override
+ public Map<ConsumerId, Subscription> getSubscriptions(ActiveMQDestination destination) throws Exception {
+ return null;
+ }
+
public void processConsumerControl(ConsumerBrokerExchange consumerExchange,
ConsumerControl control) {
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
index 14469ab..af7bf8f 100644
--- a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
@@ -24,23 +24,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerControl;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.MessagePull;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.*;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.Usage;
@@ -320,4 +304,9 @@ public class ErrorBroker implements Broker {
public void networkBridgeStopped(BrokerInfo brokerInfo) {
throw new BrokerStoppedException(this.message);
}
+
+ @Override
+ public Map<ConsumerId, Subscription> getSubscriptions(ActiveMQDestination destination) throws Exception {
+ throw new BrokerStoppedException(this.message);
+ }
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
index d243bf8..5f97edb 100644
--- a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
@@ -24,23 +24,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerControl;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.MessagePull;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.*;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.Usage;
@@ -329,4 +313,9 @@ public class MutableBrokerFilter implements Broker {
public void networkBridgeStopped(BrokerInfo brokerInfo) {
getNext().networkBridgeStopped(brokerInfo);
}
+
+ @Override
+ public Map<ConsumerId, Subscription> getSubscriptions(ActiveMQDestination destination) throws Exception {
+ return getNext().getSubscriptions(destination);
+ }
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
index 4333632..f1a32ea 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
@@ -22,16 +22,7 @@ import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.ProducerBrokerExchange;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ConsumerControl;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.MessagePull;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
-import org.apache.activemq.command.Response;
+import org.apache.activemq.command.*;
/**
* A Region is used to implement the different QOS options available to
@@ -151,5 +142,5 @@ public interface Region extends Service {
Set <Destination>getDestinations(ActiveMQDestination destination);
void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control);
-
+
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
index 8ca0e76..329441e 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
@@ -40,23 +40,7 @@ import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerControl;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.MessagePull;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.*;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.thread.Scheduler;
@@ -163,6 +147,21 @@ public class RegionBroker extends EmptyBroker {
return queueRegion;
}
+ public Map<ConsumerId, Subscription> getSubscriptions(ActiveMQDestination destination) throws Exception {
+ switch (destination.getDestinationType()) {
+ case ActiveMQDestination.QUEUE_TYPE:
+ return ((QueueRegion)queueRegion).getSubscriptions();
+ case ActiveMQDestination.TOPIC_TYPE:
+ return ((QueueRegion)topicRegion).getSubscriptions();
+ case ActiveMQDestination.TEMP_QUEUE_TYPE:
+ return ((QueueRegion)tempQueueRegion).getSubscriptions();
+ case ActiveMQDestination.TEMP_TOPIC_TYPE:
+ return ((QueueRegion)tempTopicRegion).getSubscriptions();
+ default:
+ throw createUnknownDestinationTypeException(destination);
+ }
+ }
+
public Region getTempQueueRegion() {
return tempQueueRegion;
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
index eefedd9..dd011e5 100644
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
@@ -16,12 +16,20 @@
*/
package org.apache.activemq.broker.region.virtual;
+import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
+import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.Message;
+import sun.tools.tree.ContinueStatement;
+
+import javax.xml.bind.SchemaOutputResolver;
+import java.util.Map;
+import java.util.Set;
/**
* A Destination which implements <a
@@ -43,7 +51,37 @@ public class VirtualTopicInterceptor extends DestinationFilter {
public void send(ProducerBrokerExchange context, Message message) throws Exception {
if (!message.isAdvisory()) {
ActiveMQDestination queueConsumers = getQueueConsumersWildcard(message.getDestination());
- send(context, message, queueConsumers);
+ Broker broker = context.getConnectionContext().getBroker();
+ Set<Destination> destinations = broker.getDestinations(queueConsumers);
+ if (!destinations.isEmpty()) {
+ for (Destination dest : destinations) {
+ dest.send(context, message.copy());
+ }
+ } else {
+ Map<ConsumerId, Subscription> subs = broker.getSubscriptions(queueConsumers);
+ for (Subscription sub : subs.values()) {
+ if (sub.matches(queueConsumers)) {
+ String[] subPaths = sub.getActiveMQDestination().getDestinationPaths();
+ String[] virtPaths = queueConsumers.getDestinationPaths();
+ String destName = "";
+ String separator = "";
+ for (int i = 0; i < subPaths.length; i++) {
+ if (subPaths[i].equals(virtPaths[i]) || virtPaths[i].equals(org.apache.activemq.filter.DestinationFilter.ANY_CHILD)) {
+ destName += separator + subPaths[i];
+ } else {
+ if (subPaths[i].equals(org.apache.activemq.filter.DestinationFilter.ANY_DESCENDENT)) {
+ for (int j = i; j < virtPaths.length; j++) {
+ destName += separator + virtPaths[j];
+ }
+ }
+ }
+ separator = ActiveMQDestination.PATH_SEPERATOR;
+ }
+ ActiveMQDestination dest = ActiveMQDestination.createDestination(destName, ActiveMQDestination.QUEUE_TYPE);
+ broker.addDestination(context.getConnectionContext(), dest, false).send(context, message.copy());
+ }
+ }
+ }
}
super.send(context, message);
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java b/activemq-core/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java
index ec2e45e..c7b8e88 100755
--- a/activemq-core/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java
+++ b/activemq-core/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java
@@ -19,6 +19,8 @@ package org.apache.activemq.filter;
import org.apache.activemq.command.ActiveMQDestination;
+import java.util.Arrays;
+
/**
* Matches messages which match a prefix like "A.B.>"
@@ -47,7 +49,7 @@ public class PrefixDestinationFilter extends DestinationFilter {
if (path.length >= length) {
int size = length - 1;
for (int i = 0; i < size; i++) {
- if (!prefixes[i].equals(path[i])) {
+ if (!path[i].equals(ANY_CHILD) && !prefixes[i].equals(path[i])) {
return false;
}
}
diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java
index f4644b6..b1049ce 100644
--- a/activemq-core/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java
@@ -82,9 +82,9 @@ public class SingleBrokerVirtualDestinationsWithWildcardTest extends JmsMultiple
startAllBrokers();
sendReceive("Consumer.a.local.test.>", false, "Consumer.a.local.test.>", false, 1, 1);
- sendReceive("local.test.1", true, "Consumer.a.local.test.>", false, 1, 2); // duplicates due to wildcard queue pre-created
+ sendReceive("local.test.1", true, "Consumer.a.local.test.>", false, 1, 1);
sendReceive("Consumer.a.global.test.>", false, "Consumer.a.global.test.>", false, 1, 1);
- sendReceive("global.test.1", true, "Consumer.a.global.test.>", false, 1, 2); // duplicates due to wildcard queue pre-created
+ sendReceive("global.test.1", true, "Consumer.a.global.test.>", false, 1, 1);
destroyAllBrokers();
}
@@ -110,7 +110,7 @@ public class SingleBrokerVirtualDestinationsWithWildcardTest extends JmsMultiple
private BrokerService createAndConfigureBroker(URI uri) throws Exception {
BrokerService broker = createBroker(uri);
// without this testVirtualDestinationsWithWildcardWithoutIndividualVirtualQueue will fail
- broker.setDestinations(new ActiveMQDestination[] {new ActiveMQQueue("Consumer.a.local.test.1"), new ActiveMQQueue("Consumer.a.global.test.1")});
+ //broker.setDestinations(new ActiveMQDestination[] {new ActiveMQQueue("Consumer.a.local.test.1"), new ActiveMQQueue("Consumer.a.global.test.1")});
configurePersistenceAdapter(broker);
diff --git a/activemq-core/src/test/resources/log4j.properties b/activemq-core/src/test/resources/log4j.properties
index 35d2015..c4ef52d 100755
--- a/activemq-core/src/test/resources/log4j.properties
+++ b/activemq-core/src/test/resources/log4j.properties
@@ -24,7 +24,7 @@ log4j.rootLogger=INFO, out, stdout
#log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG
#log4j.logger.org.apache.activemq=TRACE
#log4j.logger.org.apache.activemq.store.jdbc=TRACE
-#log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG
+log4j.logger.org.apache.activemq.broker.region.AbstractRegion=DEBUG
#log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG
# CONSOLE appender not used by default
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment