Skip to content

Instantly share code, notes, and snippets.

@xzel23
Last active October 28, 2024 06:36
Show Gist options
  • Save xzel23/670918bb82b781f3cc7e1019ebade1a7 to your computer and use it in GitHub Desktop.
Save xzel23/670918bb82b781f3cc7e1019ebade1a7 to your computer and use it in GitHub Desktop.
Intercepting AMQ Messages

Intercepting ActiveMQ classic Messages without consuming

This is a set of three minimal classes to demonstrate how to intercept messages being sent to a queue without consuming the messages.

The process of intercepting messages is transparent to both sender and receiver. A virtual topic is created in the ActiveMQ configuration that publishes each message sent by the sender while the receiver still gets the original message.

How to:

  • Unpack Apache ActiveMQ classic to the subdirectory apache-activemq in the project root.
  • Use the start.sh script to copy the activemq.xml configuration file into the activemq configuration folder and start the ActiveMQ instance.
  • Build the project and run the Main class.

Classes

  • Main: this is just for convenience. It creates instances of the three other classes and runs them.
  • Sender: creates a queue named "tesQueue" and sends messages to that queue.
  • Receiver: connects to the queue and consumes the messages.
  • Ìnterceptor: subscribes to the virtual topic defined in activemq.xml` where messages are published.
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<!-- The constantPendingMessageLimitStrategy is used to prevent
slow topic consumers to block producers and affect other consumers
by limiting the number of messages that are retained
For more information, see:
http://activemq.apache.org/slow-consumer-handling.html
-->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!--
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
<!--
The systemUsage controls the maximum amount of space the broker will
use before disabling caching and/or slowing down producers. For more information, see:
http://activemq.apache.org/producer-flow-control.html
-->
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
<!-- destroy the spring context on shutdown to stop jetty -->
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
<!-- configure interceptor for virtual topic -->
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueue name="testQueue" forwardOnly="false">
<forwardTo>
<topic physicalName="VirtualTopic.receivedMessages" />
</forwardTo>
</compositeQueue>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
</broker>
<!--
Enable web consoles, REST and Ajax APIs and demos
The web consoles requires by default login, you can disable this in the jetty.xml file
Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
-->
<import resource="jetty.xml"/>
</beans>
<!-- END SNIPPET: example -->
package amq;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Interceptor implements Runnable {
private static final Logger LOG = LoggerFactory.getILoggerFactory().getLogger(Receiver.class.getName());
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String VIRTUAL_TOPIC_SUBSCRIPTION = "VirtualTopic.receivedMessages";
private static final String CLIENT_ID = "InterceptorClientID"; // A unique client ID for the durable subscription
private static final String DURABLE_SUBSCRIPTION_NAME = "Interceptors";
public static void main(String[] args) throws JMSException, InterruptedException {
Interceptor interceptor = new Interceptor();
interceptor.run();
}
public void run() {
LOG.info("connecting to broker at {}", BROKER_URL);
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
try (Connection connection = connectionFactory.createConnection()) {
connection.setClientID(CLIENT_ID); // Important for durable subscription
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(VIRTUAL_TOPIC_SUBSCRIPTION);
LOG.info("subscribing to durable topic '{}'", VIRTUAL_TOPIC_SUBSCRIPTION);
MessageConsumer consumer = session.createDurableSubscriber(topic, DURABLE_SUBSCRIPTION_NAME);
while (true) {
Message message = consumer.receive(60_000);
if (message != null) {
Object content;
if (message instanceof TextMessage textMessage) {
content = textMessage.getText();
} else {
content = message;
}
LOG.info("intercepted message: {}", content);
} else {
LOG.debug("no message received within the timeout period");
}
}
} catch (JMSException e) {
LOG.error("exception caught", e);
}
}
}
package amq;
public class Main {
public static void main(String[] args) {
new Thread(new Sender()).start();
new Thread(new Receiver()).start();
new Thread(new Interceptor()).start();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>issues.spotbugs</groupId>
<artifactId>amq</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<version>6.1.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.16</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.16</version>
</dependency>
</dependencies>
</project>
package amq;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Receiver implements Runnable {
private static final Logger LOG = LoggerFactory.getILoggerFactory().getLogger(Receiver.class.getName());
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String QUEUE_NAME = "testQueue";
public static void main(String[] args) throws JMSException, InterruptedException {
Receiver receiver = new Receiver();
receiver.run();
}
@Override
public void run() {
LOG.info("connecting to queue '{}'", QUEUE_NAME);
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
try (Connection connection = connectionFactory.createConnection()) {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
while (true) {
Message message = consumer.receive();
Object content;
if (message instanceof TextMessage textMessage) {
content = textMessage.getText();
} else {
content = message;
}
LOG.info("received message: {}", content);
}
} catch (JMSException e) {
LOG.error("exception caught", e);
}
}
}
package amq;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.DeliveryMode;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Sender implements Runnable {
private static final Logger LOG = LoggerFactory.getILoggerFactory().getLogger(Sender.class.getName());
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String QUEUE_NAME = "testQueue";
public static void main(String[] args) throws JMSException, InterruptedException {
Sender sender = new Sender();
sender.run();
}
@Override
public void run() {
LOG.info("connecting to queue '{}'", QUEUE_NAME);
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
try (Connection connection = connectionFactory.createConnection()) {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
int messageCount = 0;
while (true) {
String text = "Message #" + ++messageCount;
TextMessage message = session.createTextMessage(text);
LOG.info("sending message: {}", text);
producer.send(message);
Thread.sleep(15000); // 15 seconds delay
}
} catch (JMSException | InterruptedException e) {
LOG.error("exception caught", e);
}
}
}
#!/usr/bin/env sh
cd "$(dirname "$0")" || exit 1
cp activemq.xml ./apache-activemq/conf/activemq.xml
cd ./apache-activemq/bin || exit 1
./activemq console
#!/usr/bin/env sh
cd "$(dirname "$0")" || exit 1
./apache-activemq/bin/activemq stop
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment