-
-
Save felipeg48/2c1417c0d92a85abc598 to your computer and use it in GitHub Desktop.
HornetQ : Embedded + JMS + Remote Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<configuration xmlns="urn:hornetq" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd"> | |
<persistence-enabled>false</persistence-enabled> | |
<connectors> | |
<connector name="netty"> | |
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> | |
<param key="host" value="${hornetq.remoting.netty.host}"/> | |
<param key="port" value="${hornetq.remoting.netty.port}"/> | |
</connector> | |
</connectors> | |
<acceptors> | |
<acceptor name="netty"> | |
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class> | |
<param key="host" value="${hornetq.remoting.netty.host}"/> | |
<param key="port" value="${hornetq.remoting.netty.port}"/> | |
</acceptor> | |
</acceptors> | |
<security-settings> | |
<security-setting match="#"> | |
<permission type="createDurableQueue" roles="guest"/> | |
<permission type="deleteDurableQueue" roles="guest"/> | |
<permission type="createNonDurableQueue" roles="guest"/> | |
<permission type="deleteNonDurableQueue" roles="guest"/> | |
<permission type="consume" roles="guest"/> | |
<permission type="send" roles="guest"/> | |
</security-setting> | |
</security-settings> | |
</configuration> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<configuration xmlns="urn:hornetq" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd"> | |
<connection-factory name="ConnectionFactory"> | |
<connectors> | |
<connector-ref connector-name="netty"/> | |
</connectors> | |
<entries> | |
<entry name="ConnectionFactory"/> | |
</entries> | |
</connection-factory> | |
<topic name="${hornetq.topic.example.name}"> | |
<entry name="${hornetq.topic.example.entry}" /> | |
</topic> | |
</configuration> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd"> | |
<defaultuser name="guest" password="guest"> | |
<role name="guest"/> | |
</defaultuser> | |
</configuration> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package hornetq; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.concurrent.atomic.AtomicReference; | |
import javax.jms.Connection; | |
import javax.jms.ConnectionFactory; | |
import javax.jms.JMSException; | |
import javax.jms.Topic; | |
import org.hornetq.api.core.TransportConfiguration; | |
import org.hornetq.api.jms.HornetQJMSClient; | |
import org.hornetq.api.jms.JMSFactoryType; | |
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory; | |
/** | |
* HornetQClient | |
* | |
* @author monzou | |
*/ | |
class HornetQClient { | |
private final AtomicReference<ConnectionFactory> connectionFactoryReference; | |
HornetQClient() { | |
connectionFactoryReference = new AtomicReference<>(); | |
} | |
ConnectionFactory getConnectionFactory() { | |
if (connectionFactoryReference.get() != null) { | |
return connectionFactoryReference.get(); | |
} | |
Map<String, Object> params = new HashMap<String, Object>(); | |
params.put("host", System.getProperty("hornetq.remoting.netty.host")); | |
params.put("port", System.getProperty("hornetq.remoting.netty.port")); | |
TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), params); | |
ConnectionFactory factory = (ConnectionFactory) HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, config); | |
connectionFactoryReference.compareAndSet(null, factory); | |
return connectionFactoryReference.get(); | |
} | |
Connection connect() throws JMSException { | |
Connection connection = getConnectionFactory().createConnection(); | |
connection.start(); | |
return connection; | |
} | |
Topic getExampleTopic() { | |
return HornetQJMSClient.createTopic(System.getProperty("hornetq.topic.example.name")); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package hornetq; | |
import javax.jms.JMSException; | |
import javax.jms.Message; | |
import javax.jms.MessageConsumer; | |
import javax.jms.Session; | |
import javax.jms.TextMessage; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
/** | |
* HornetQConsumer | |
* | |
* @author monzou | |
*/ | |
class HornetQConsumer extends HornetQExampleBase { | |
private static final Logger LOGGER = LoggerFactory.getLogger(HornetQConsumer.class); | |
public static void main(String[] args) { | |
new HornetQConsumer().start(); | |
} | |
private final HornetQClient client; | |
HornetQConsumer() { | |
client = new HornetQClient(); | |
} | |
void start() { | |
try { | |
Receiver receiver = new Receiver(createSession().createConsumer(client.getExampleTopic())); | |
new Thread(receiver).start(); | |
} catch (JMSException e) { | |
throw new RuntimeException("Failed to establish connection", e); | |
} | |
} | |
private Session createSession() throws JMSException { | |
return client.connect().createSession(false, Session.AUTO_ACKNOWLEDGE); | |
} | |
private static class Receiver implements Runnable { | |
final MessageConsumer consumer; | |
Receiver(MessageConsumer consumer) { | |
this.consumer = consumer; | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public void run() { | |
do { | |
try { | |
Message message = consumer.receive(0); | |
if (message != null) { | |
System.out.println("Received message: " + ((TextMessage) message).getText()); | |
} | |
} catch (JMSException e) { | |
LOGGER.warn("Failed to receive message", e); | |
} | |
} while (true); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package hornetq; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.util.Properties; | |
/** | |
* HornetQExampleBase | |
* | |
* @author monzou | |
*/ | |
class HornetQExampleBase { | |
static { | |
InputStream in = HornetQExampleBase.class.getClassLoader().getResourceAsStream("system.properties"); | |
try { | |
if (in != null) { | |
Properties properties = new Properties(); | |
properties.load(in); | |
System.getProperties().putAll(properties); | |
} | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package hornetq; | |
import java.util.Date; | |
import javax.jms.JMSException; | |
import javax.jms.MessageProducer; | |
import javax.jms.Session; | |
import javax.jms.TextMessage; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
/** | |
* HornetQProducer | |
* | |
* @author monzou | |
*/ | |
class HornetQProducer extends HornetQExampleBase { | |
private static final Logger LOGGER = LoggerFactory.getLogger(HornetQProducer.class); | |
public static void main(String[] args) { | |
new HornetQProducer().start(); | |
} | |
private final HornetQClient client; | |
HornetQProducer() { | |
client = new HornetQClient(); | |
} | |
void start() { | |
try { | |
Session session = createSession(); | |
MessageProducer producer = session.createProducer(client.getExampleTopic()); | |
new Thread(new Ping(session, producer)).start(); | |
} catch (JMSException e) { | |
throw new RuntimeException("Failed to start session", e); | |
} | |
} | |
private Session createSession() throws JMSException { | |
return client.connect().createSession(false, Session.AUTO_ACKNOWLEDGE); | |
} | |
private static class Ping implements Runnable { | |
final Session session; | |
final MessageProducer producer; | |
Ping(Session session, MessageProducer producer) { | |
this.session = session; | |
this.producer = producer; | |
} | |
/** {@inheritDoc} */ | |
@Override | |
public void run() { | |
do { | |
try { | |
TextMessage message = session.createTextMessage("Ping: " + new Date()); | |
producer.send(message); | |
Thread.sleep(1000); | |
} catch (JMSException e) { | |
LOGGER.warn("Failed to send a message", e); | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
} | |
} while (true); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package hornetq; | |
import java.util.concurrent.locks.Lock; | |
import java.util.concurrent.locks.ReentrantLock; | |
import org.hornetq.jms.server.embedded.EmbeddedJMS; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
/** | |
* HornetQServer | |
* | |
* @author monzou | |
*/ | |
class HornetQServer extends HornetQExampleBase { | |
private static final Logger LOGGER = LoggerFactory.getLogger(HornetQServer.class); | |
public static void main(String[] args) { | |
new HornetQServer().start(); | |
} | |
private final Lock lock; | |
private final EmbeddedJMS server; | |
HornetQServer() { | |
lock = new ReentrantLock(false); | |
server = new EmbeddedJMS(); | |
} | |
void start() { | |
lock.lock(); | |
try { | |
server.start(); | |
} catch (Exception e) { | |
throw new RuntimeException("Failed to start JMS server", e); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
void sotp() { | |
lock.lock(); | |
try { | |
try { | |
server.stop(); | |
} catch (Exception e) { | |
LOGGER.warn("Failed to stop JMS server", e); | |
} | |
} finally { | |
lock.unlock(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
hornetq.topic.example.name = exampleTopic | |
hornetq.topic.example.entry = example | |
hornetq.remoting.netty.host = localhost | |
hornetq.remoting.netty.port = 5445 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment