Skip to content

Instantly share code, notes, and snippets.

@andy722
Created October 8, 2015 14:05
Show Gist options
  • Select an option

  • Save andy722/3411ac7ae61e0ee57918 to your computer and use it in GitHub Desktop.

Select an option

Save andy722/3411ac7ae61e0ee57918 to your computer and use it in GitHub Desktop.
class MQConnection implements ExceptionListener {
private final MQQueueConnectionFactory brokerConnectionFactory;
private Connection connection;
private boolean receiverTransacted;
private int receiverAcknowledge;
private final Map<String, Consumer> consumers;
private final Map<String, Producer> producers;
public MQConnection(String hosts) throws JMSException {
this.brokerConnectionFactory = new MQQueueConnectionFactory();
brokerConnectionFactory.setConnectionNameList(hosts);
brokerConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
brokerConnectionFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT);
this.consumers = new HashMap<String, Consumer>();
this.producers = new HashMap<String, Producer>();
}
public MQConnection setChannel(String channel) throws JMSException {
brokerConnectionFactory.setChannel(channel);
return this;
}
public MQConnection setClientReconnectTimeout(int timeoutSeconds) throws JMSException {
brokerConnectionFactory.setClientReconnectTimeout(timeoutSeconds);
return this;
}
public MQConnection setCCSID(int ccsid) throws JMSException {
brokerConnectionFactory.setCCSID(ccsid);
return this;
}
public MQConnection setQueueManager(String queueManager) throws JMSException {
brokerConnectionFactory.setQueueManager(queueManager);
return this;
}
public void setReceiverTransacted(boolean receiverTransacted) {
this.receiverTransacted = receiverTransacted;
}
public void setReceiverAcknowledge(int receiverAcknowledge) {
this.receiverAcknowledge = receiverAcknowledge;
}
public void connect() throws JMSException {
connection = brokerConnectionFactory.createConnection();
connection.setExceptionListener(MQConnection.this);
connection.start();
}
public void connect(String username, String password) throws JMSException {
connection = brokerConnectionFactory.createConnection(username, password);
connection.setExceptionListener(MQConnection.this);
connection.start();
}
private Consumer getConsumer(String queueName) throws JMSException {
Consumer c;
synchronized (consumers) {
c = consumers.get(queueName);
if (c == null) {
c = new Consumer(connection, queueName);
consumers.put(queueName, c);
}
}
return c;
}
private Producer getProducer(String queueName) throws JMSException {
Producer p;
synchronized (producers) {
p = producers.get(queueName);
if (p == null) {
p = new Producer(connection, queueName);
producers.put(queueName, p);
}
}
return p;
}
public void setMessageListener(String queueName, MessageListener listener) throws JMSException {
getConsumer(queueName).setMessageListener(listener);
}
public Message receive(String queueName, long timeoutMillis) throws JMSException {
return getConsumer(queueName).receive(timeoutMillis);
}
public void sendTextMessage(String queueName, String text) throws JMSException {
getProducer(queueName).sendTextMessage(text);
}
public void sendTextMessage(String queueName,
String message,
Map<String, Object> properties) throws JMSException {
getProducer(queueName).sendTextMessage(message, properties);
}
public void sendMessage(String queueName,
Message message) throws JMSException {
getProducer(queueName).sendMessage(message);
}
public void sendBytesMessage(String queueName, byte[] bytes) throws JMSException {
getProducer(queueName).sendBytesMessage(bytes);
}
public void shutdown() throws JMSException {
synchronized (consumers) {
for (Consumer c : consumers.values())
c.shutdown();
consumers.clear();
}
synchronized (producers) {
for (Producer c : producers.values())
c.shutdown();
producers.clear();
}
if (connection != null)
connection.close();
}
@Override
public void onException(JMSException e) {
System.err.println("NEW ERROR: " + e.getMessage());
e.printStackTrace();
}
private class Consumer {
private final Session session;
private final MessageConsumer consumer;
private Consumer(Connection connection, String queueName) throws JMSException {
this.session = connection.createSession(receiverTransacted, receiverAcknowledge);
consumer = session.createConsumer(session.createQueue(queueName));
}
void setMessageListener(MessageListener l) throws JMSException {
consumer.setMessageListener(l);
}
Message receive() throws JMSException {
return consumer.receive();
}
Message receive(long timeoutMillis) throws JMSException {
return consumer.receive(timeoutMillis);
}
public void shutdown() throws JMSException {
consumer.close();
session.close();
}
}
private class Producer {
private final Session session;
private final MessageProducer producer;
private Producer(Connection connection, String queueName) throws JMSException {
this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
this.producer = session.createProducer(session.createQueue(queueName));
}
public void sendTextMessage(String text) throws JMSException {
this.producer.send(session.createTextMessage(text));
}
public void sendTextMessage(String text, Map<String, Object> properties) throws JMSException {
final TextMessage message = session.createTextMessage(text);
setProperties(message, properties);
this.producer.send(message);
}
private void setProperties(Message m, Map<String, Object> properties) throws JMSException {
if (properties.isEmpty()) {
return;
}
for (Map.Entry<String, Object> entry : properties.entrySet()) {
final String key = entry.getKey();
final Object value = entry.getValue();
if (value instanceof String) {
m.setStringProperty(key, (String) value);
} else {
throw new IllegalArgumentException("Unexpected property type:" +
" key = [" + key + "], value = [" + value + "]");
}
}
}
public void sendBytesMessage(byte[] bytes) throws JMSException {
final BytesMessage m = session.createBytesMessage();
m.writeBytes(bytes);
this.producer.send(m);
}
public void sendMessage(Message message) throws JMSException {
this.producer.send(message);
}
public void shutdown() throws JMSException {
producer.close();
session.close();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment