Skip to content

Instantly share code, notes, and snippets.

@areddy7021
Last active June 1, 2016 05:24
Show Gist options
  • Save areddy7021/3a00cb54eb293d1058e2a550c9cd21bc to your computer and use it in GitHub Desktop.
Save areddy7021/3a00cb54eb293d1058e2a550c9cd21bc to your computer and use it in GitHub Desktop.
junit log
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<!--<bean id="myMBean" class="com.cengage.ceq.plugin.broker.IPAuthenticationPlugin">
<property name="allowedIPAddresses">
<list>
<value>127.0.0.1</value>
</list>
</property>
</bean>
<bean id="myTestMBean" class="com.cengage.ceq.plugin.broker.TestBrokerService">
</bean>
<bean class="org.springframework.jmx.export.MBeanExporter" lazy-init="false">
<property name="beans">
<map>
<entry key="bean:name=MyMBeanName" value-ref="myMBean"/>
<entry key="bean:name=MyMBeanTestName" value-ref="myTestMBean"/>
</map>
</property>
</bean>-->
<!-- Allows accessing the server log -->
<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
<plugins>
<bean id="ipAuthenticationPlugin" class="com.cengage.ceq.plugin.broker.IPAuthenticationPlugin" xmlns="http://www.springframework.org/schema/beans">
</bean>
</plugins>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<!-- The constantPendingMessageLimitStrategy is used to prevent
slow topic consumers to block producers and affect other consumers
by limiting the number of messages that are retained
For more information, see:
http://activemq.apache.org/slow-consumer-handling.html
-->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!--
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
<!--
The systemUsage controls the maximum amount of space the broker will
use before disabling caching and/or slowing down producers. For more information, see:
http://activemq.apache.org/producer-flow-control.html
-->
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
<!-- destroy the spring context on shutdown to stop jetty -->
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
<!--
Enable web consoles, REST and Ajax APIs and demos
The web consoles requires by default login, you can disable this in the jetty.xml file
Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
-->
<import resource="jetty.xml"/>
</beans>
<!-- END SNIPPET: example -->
2016-06-01 01:06:01,492 | INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1@5a61f5df: startup date [Wed Jun 01 01:06:01 EDT 2016]; root of context hierarchy | org.apache.activemq.xbean.XBeanBrokerFactory$1 | main
2016-06-01 01:06:04,151 | INFO | PListStore:[/Users/avootukuri/Downloads/apache-activemq-latest/data/localhost/tmp_storage] started | org.apache.activemq.store.kahadb.plist.PListStoreImpl | main
2016-06-01 01:06:04,184 | INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[/Users/avootukuri/Downloads/apache-activemq-latest/data/kahadb] | org.apache.activemq.broker.BrokerService | main
2016-06-01 01:06:04,399 | INFO | KahaDB is version 5 | org.apache.activemq.store.kahadb.MessageDatabase | main
2016-06-01 01:06:04,422 | INFO | Recovering from the journal ... | org.apache.activemq.store.kahadb.MessageDatabase | main
2016-06-01 01:06:04,437 | INFO | Recovery replayed 171 operations from the journal in 0.027 seconds. | org.apache.activemq.store.kahadb.MessageDatabase | main
2016-06-01 01:06:04,584 | INFO | Apache ActiveMQ 5.11.1 (localhost, ID:mabosanandan-m1.local-55157-1464757564457-0:1) is starting | org.apache.activemq.broker.BrokerService | main
2016-06-01 01:06:04,605 | INFO | Listening for connections at: tcp://mabosanandan-m1.local:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600 | org.apache.activemq.transport.TransportServerThreadSupport | main
2016-06-01 01:06:04,611 | INFO | Connector openwire started | org.apache.activemq.broker.TransportConnector | main
2016-06-01 01:06:04,620 | INFO | Listening for connections at: amqp://mabosanandan-m1.local:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600 | org.apache.activemq.transport.TransportServerThreadSupport | main
2016-06-01 01:06:04,626 | INFO | Connector amqp started | org.apache.activemq.broker.TransportConnector | main
2016-06-01 01:06:04,633 | INFO | Listening for connections at: stomp://mabosanandan-m1.local:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600 | org.apache.activemq.transport.TransportServerThreadSupport | main
2016-06-01 01:06:04,639 | INFO | Connector stomp started | org.apache.activemq.broker.TransportConnector | main
2016-06-01 01:06:04,647 | INFO | Listening for connections at: mqtt://mabosanandan-m1.local:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600 | org.apache.activemq.transport.TransportServerThreadSupport | main
2016-06-01 01:06:04,653 | INFO | Connector mqtt started | org.apache.activemq.broker.TransportConnector | main
2016-06-01 01:06:04,737 | INFO | Listening for connections at ws://mabosanandan-m1.local:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600 | org.apache.activemq.transport.ws.WSTransportServer | main
2016-06-01 01:06:04,743 | INFO | Connector ws started | org.apache.activemq.broker.TransportConnector | main
2016-06-01 01:06:04,749 | INFO | Apache ActiveMQ 5.11.1 (localhost, ID:mabosanandan-m1.local-55157-1464757564457-0:1) started | org.apache.activemq.broker.BrokerService | main
2016-06-01 01:06:04,755 | INFO | For help or more information please see: http://activemq.apache.org | org.apache.activemq.broker.BrokerService | main
2016-06-01 01:06:05,044 | INFO | ActiveMQ WebConsole available at http://0.0.0.0:8161/ | org.apache.activemq.web.WebConsoleStarter | main
2016-06-01 01:06:05,077 | INFO | Initializing Spring FrameworkServlet 'dispatcher' | /admin | main
2016-06-01 01:06:05,225 | INFO | jolokia-agent: No access restrictor found at classpath:/jolokia-access.xml, access to all MBeans is allowed | /api | main
2016-06-01 01:07:32,243 | WARN | Failed to add Connection ID:mabosanandan-m1.local-55242-1464757651530-0:0 | org.apache.activemq.broker.TransportConnection | ActiveMQ Transport: tcp:///127.0.0.1:55243@61616
java.lang.SecurityException: Invalid remote address tcp://127.0.0.1:55243
at com.cengage.ceq.plugin.broker.IPAuthenticationBroker.addConnection(IPAuthenticationBroker.java:36)[ceq-plugin-1.0.0-SNAPSHOT.jar:]
at org.apache.activemq.broker.MutableBrokerFilter.addConnection(MutableBrokerFilter.java:102)[activemq-all-5.11.1.jar:5.11.1]
at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:809)[activemq-all-5.11.1.jar:5.11.1]
at org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:79)[activemq-all-5.11.1.jar:5.11.1]
at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:139)[activemq-all-5.11.1.jar:5.11.1]
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:334)[activemq-all-5.11.1.jar:5.11.1]
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188)[activemq-all-5.11.1.jar:5.11.1]
at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)[activemq-all-5.11.1.jar:5.11.1]
at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)[activemq-all-5.11.1.jar:5.11.1]
at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)[activemq-all-5.11.1.jar:5.11.1]
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)[activemq-all-5.11.1.jar:5.11.1]
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214)[activemq-all-5.11.1.jar:5.11.1]
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)[activemq-all-5.11.1.jar:5.11.1]
at java.lang.Thread.run(Thread.java:745)[:1.8.0_51]
2016-06-01 01:07:32,250 | WARN | Security Error occurred: Invalid remote address tcp://127.0.0.1:55243 | org.apache.activemq.broker.TransportConnection.Service | ActiveMQ Transport: tcp:///127.0.0.1:55243@61616
2016-06-01 01:07:34,256 | INFO | Stopping tcp://127.0.0.1:55243 because Failed with SecurityException: Invalid remote address tcp://127.0.0.1:55243 | org.apache.activemq.broker.TransportConnection | ActiveMQ BrokerService[localhost] Task-2
package com.cengage.ceq.plugin.broker;
public interface IPAuth {
public int getSessionCount();
}
package com.cengage.ceq.plugin.broker;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.state.ConnectionState;
public class IPAuthenticationBroker extends BrokerFilter implements IPAuth {
public IPAuthenticationBroker(Broker next) {
super(next);
}
public int getSessionCount() {
RegionBroker rb = null;
try {
rb = (RegionBroker) getBrokerService().getBroker().getAdaptor(
RegionBroker.class);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Map<ConnectionId, ConnectionState> connStates = rb
.getConnectionStates();
Iterator it = connStates.entrySet().iterator();
int totalCount = 0;
while (it.hasNext()) {
Map.Entry pair = (Map.Entry) it.next();
ConnectionState connectionState = (ConnectionState) pair.getValue();
totalCount += connectionState.getSessionStates().size();
System.out.println(pair.getKey() + " = " + pair.getValue());
it.remove();
}
return totalCount;
}
}
package com.cengage.ceq.plugin.broker;
import java.lang.management.ManagementFactory;
import java.util.List;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPlugin;
public class IPAuthenticationPlugin implements BrokerPlugin{
public Broker installPlugin(Broker broker) throws Exception {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
// Construct the ObjectName for the Hello MBean we will register
ObjectName mbeanName = new ObjectName("com.cengage:type=Hello");
// Create the Hello World MBean
IPAuth ipAuth = new IPAuthenticationBroker(broker);
StandardMBean mbean = new StandardMBean(ipAuth, IPAuth.class);
// Register the Hello World MBean
mbs.registerMBean(mbean, mbeanName);
return (Broker)ipAuth;
}
}
01:07:31.659 [main] DEBUG o.a.a.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=3, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, StackTraceEnabled=true, CacheEnabled=true, TightEncodingEnabled=true, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000}, magic=[A,c,t,i,v,e,M,Q]}
01:07:31.686 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=10, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, StackTraceEnabled=true, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=104857600, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000}, magic=[A,c,t,i,v,e,M,Q]}
01:07:31.687 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616 before negotiation: OpenWireFormat{version=3, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false}
01:07:31.687 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616 after negotiation: OpenWireFormat{version=3, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false}
01:07:34.270 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.apache.activemq.ActiveMQConnection - Async exception with no exception listener: java.io.EOFException
java.io.EOFException: null
at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[na:1.8.0_51]
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:269) ~[activemq-all-5.1.0.jar:na]
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:203) ~[activemq-all-5.1.0.jar:na]
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:195) ~[activemq-all-5.1.0.jar:na]
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:183) ~[activemq-all-5.1.0.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51]
01:07:34.273 [ActiveMQ Connection Worker: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.tcp.TcpTransport - Stopping transport tcp://localhost/127.0.0.1:61616
package com.activemq.dev.transition.test;
import java.io.FileNotFoundException;
import java.io.IOException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.Message;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.junit.Before;
import org.junit.Test;
public class JmsQueueBrowserTest {
ConnectionFactory factory = null;
Connection connection = null;
Session session = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
Destination destination = null;
String test = "";
@Before
public void setUp() throws Exception {
factory =
new ActiveMQConnectionFactory("tcp://localhost:61616");
//factory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)?randomize=true");
// factory = new ActiveMQConnectionFactory("failover:(tcp://10.170.5.229:61616,tcp://10.170.105.44:61616)?randomize=true");
// factory = new ActiveMQConnectionFactory("failover://(tcp://qa-mindtap-qa-001-app-1-10-171-4-17.cloud.cengage.com:61616,qa-mindtap-qa-001-app-1-10-171-107-35.cloud.cengage.com:61616)?randomize=true");
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("test");
producer = session.createProducer(destination);
consumer = session.createConsumer(destination);
}
@Test
public void test() throws FileNotFoundException, IOException, ParseException {
JSONParser parser = new JSONParser();
try{
connection.start();
for(int i =0; i< 100000 ; i++){
TextMessage message = session.createTextMessage();
message.setText("Hello ...This is a sample message..sending from FirstClient---- "+i);
producer.setTimeToLive(60000);
producer.send(message);
System.out.println("Sent: " + message.getText());
try {
Thread.sleep(600);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("message received is "+(Message)consumer.receive());
}
} catch (JMSException e) {
e.printStackTrace();
}
finally{
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment