Skip to content

Instantly share code, notes, and snippets.

@felipeg48
Forked from monzou/HornetQClient.java
Last active August 26, 2015 16:42
Show Gist options
  • Save felipeg48/2c1417c0d92a85abc598 to your computer and use it in GitHub Desktop.
Save felipeg48/2c1417c0d92a85abc598 to your computer and use it in GitHub Desktop.
HornetQ : Embedded + JMS + Remote Example
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
<persistence-enabled>false</persistence-enabled>
<connectors>
<connector name="netty">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host}"/>
<param key="port" value="${hornetq.remoting.netty.port}"/>
</connector>
</connectors>
<acceptors>
<acceptor name="netty">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host}"/>
<param key="port" value="${hornetq.remoting.netty.port}"/>
</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="send" roles="guest"/>
</security-setting>
</security-settings>
</configuration>
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<connection-factory name="ConnectionFactory">
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
<entries>
<entry name="ConnectionFactory"/>
</entries>
</connection-factory>
<topic name="${hornetq.topic.example.name}">
<entry name="${hornetq.topic.example.entry}" />
</topic>
</configuration>
<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
<defaultuser name="guest" password="guest">
<role name="guest"/>
</defaultuser>
</configuration>
package hornetq;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Topic;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
/**
* HornetQClient
*
* @author monzou
*/
class HornetQClient {
private final AtomicReference<ConnectionFactory> connectionFactoryReference;
HornetQClient() {
connectionFactoryReference = new AtomicReference<>();
}
ConnectionFactory getConnectionFactory() {
if (connectionFactoryReference.get() != null) {
return connectionFactoryReference.get();
}
Map<String, Object> params = new HashMap<String, Object>();
params.put("host", System.getProperty("hornetq.remoting.netty.host"));
params.put("port", System.getProperty("hornetq.remoting.netty.port"));
TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
ConnectionFactory factory = (ConnectionFactory) HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, config);
connectionFactoryReference.compareAndSet(null, factory);
return connectionFactoryReference.get();
}
Connection connect() throws JMSException {
Connection connection = getConnectionFactory().createConnection();
connection.start();
return connection;
}
Topic getExampleTopic() {
return HornetQJMSClient.createTopic(System.getProperty("hornetq.topic.example.name"));
}
}
package hornetq;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* HornetQConsumer
*
* @author monzou
*/
class HornetQConsumer extends HornetQExampleBase {
private static final Logger LOGGER = LoggerFactory.getLogger(HornetQConsumer.class);
public static void main(String[] args) {
new HornetQConsumer().start();
}
private final HornetQClient client;
HornetQConsumer() {
client = new HornetQClient();
}
void start() {
try {
Receiver receiver = new Receiver(createSession().createConsumer(client.getExampleTopic()));
new Thread(receiver).start();
} catch (JMSException e) {
throw new RuntimeException("Failed to establish connection", e);
}
}
private Session createSession() throws JMSException {
return client.connect().createSession(false, Session.AUTO_ACKNOWLEDGE);
}
private static class Receiver implements Runnable {
final MessageConsumer consumer;
Receiver(MessageConsumer consumer) {
this.consumer = consumer;
}
/** {@inheritDoc} */
@Override
public void run() {
do {
try {
Message message = consumer.receive(0);
if (message != null) {
System.out.println("Received message: " + ((TextMessage) message).getText());
}
} catch (JMSException e) {
LOGGER.warn("Failed to receive message", e);
}
} while (true);
}
}
}
package hornetq;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
* HornetQExampleBase
*
* @author monzou
*/
class HornetQExampleBase {
static {
InputStream in = HornetQExampleBase.class.getClassLoader().getResourceAsStream("system.properties");
try {
if (in != null) {
Properties properties = new Properties();
properties.load(in);
System.getProperties().putAll(properties);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
package hornetq;
import java.util.Date;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* HornetQProducer
*
* @author monzou
*/
class HornetQProducer extends HornetQExampleBase {
private static final Logger LOGGER = LoggerFactory.getLogger(HornetQProducer.class);
public static void main(String[] args) {
new HornetQProducer().start();
}
private final HornetQClient client;
HornetQProducer() {
client = new HornetQClient();
}
void start() {
try {
Session session = createSession();
MessageProducer producer = session.createProducer(client.getExampleTopic());
new Thread(new Ping(session, producer)).start();
} catch (JMSException e) {
throw new RuntimeException("Failed to start session", e);
}
}
private Session createSession() throws JMSException {
return client.connect().createSession(false, Session.AUTO_ACKNOWLEDGE);
}
private static class Ping implements Runnable {
final Session session;
final MessageProducer producer;
Ping(Session session, MessageProducer producer) {
this.session = session;
this.producer = producer;
}
/** {@inheritDoc} */
@Override
public void run() {
do {
try {
TextMessage message = session.createTextMessage("Ping: " + new Date());
producer.send(message);
Thread.sleep(1000);
} catch (JMSException e) {
LOGGER.warn("Failed to send a message", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} while (true);
}
}
}
package hornetq;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.hornetq.jms.server.embedded.EmbeddedJMS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* HornetQServer
*
* @author monzou
*/
class HornetQServer extends HornetQExampleBase {
private static final Logger LOGGER = LoggerFactory.getLogger(HornetQServer.class);
public static void main(String[] args) {
new HornetQServer().start();
}
private final Lock lock;
private final EmbeddedJMS server;
HornetQServer() {
lock = new ReentrantLock(false);
server = new EmbeddedJMS();
}
void start() {
lock.lock();
try {
server.start();
} catch (Exception e) {
throw new RuntimeException("Failed to start JMS server", e);
} finally {
lock.unlock();
}
}
void sotp() {
lock.lock();
try {
try {
server.stop();
} catch (Exception e) {
LOGGER.warn("Failed to stop JMS server", e);
}
} finally {
lock.unlock();
}
}
}
hornetq.topic.example.name = exampleTopic
hornetq.topic.example.entry = example
hornetq.remoting.netty.host = localhost
hornetq.remoting.netty.port = 5445
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment