Created
October 8, 2015 14:05
-
-
Save andy722/3411ac7ae61e0ee57918 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class MQConnection implements ExceptionListener { | |
| private final MQQueueConnectionFactory brokerConnectionFactory; | |
| private Connection connection; | |
| private boolean receiverTransacted; | |
| private int receiverAcknowledge; | |
| private final Map<String, Consumer> consumers; | |
| private final Map<String, Producer> producers; | |
| public MQConnection(String hosts) throws JMSException { | |
| this.brokerConnectionFactory = new MQQueueConnectionFactory(); | |
| brokerConnectionFactory.setConnectionNameList(hosts); | |
| brokerConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT); | |
| brokerConnectionFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT); | |
| this.consumers = new HashMap<String, Consumer>(); | |
| this.producers = new HashMap<String, Producer>(); | |
| } | |
| public MQConnection setChannel(String channel) throws JMSException { | |
| brokerConnectionFactory.setChannel(channel); | |
| return this; | |
| } | |
| public MQConnection setClientReconnectTimeout(int timeoutSeconds) throws JMSException { | |
| brokerConnectionFactory.setClientReconnectTimeout(timeoutSeconds); | |
| return this; | |
| } | |
| public MQConnection setCCSID(int ccsid) throws JMSException { | |
| brokerConnectionFactory.setCCSID(ccsid); | |
| return this; | |
| } | |
| public MQConnection setQueueManager(String queueManager) throws JMSException { | |
| brokerConnectionFactory.setQueueManager(queueManager); | |
| return this; | |
| } | |
| public void setReceiverTransacted(boolean receiverTransacted) { | |
| this.receiverTransacted = receiverTransacted; | |
| } | |
| public void setReceiverAcknowledge(int receiverAcknowledge) { | |
| this.receiverAcknowledge = receiverAcknowledge; | |
| } | |
| public void connect() throws JMSException { | |
| connection = brokerConnectionFactory.createConnection(); | |
| connection.setExceptionListener(MQConnection.this); | |
| connection.start(); | |
| } | |
| public void connect(String username, String password) throws JMSException { | |
| connection = brokerConnectionFactory.createConnection(username, password); | |
| connection.setExceptionListener(MQConnection.this); | |
| connection.start(); | |
| } | |
| private Consumer getConsumer(String queueName) throws JMSException { | |
| Consumer c; | |
| synchronized (consumers) { | |
| c = consumers.get(queueName); | |
| if (c == null) { | |
| c = new Consumer(connection, queueName); | |
| consumers.put(queueName, c); | |
| } | |
| } | |
| return c; | |
| } | |
| private Producer getProducer(String queueName) throws JMSException { | |
| Producer p; | |
| synchronized (producers) { | |
| p = producers.get(queueName); | |
| if (p == null) { | |
| p = new Producer(connection, queueName); | |
| producers.put(queueName, p); | |
| } | |
| } | |
| return p; | |
| } | |
| public void setMessageListener(String queueName, MessageListener listener) throws JMSException { | |
| getConsumer(queueName).setMessageListener(listener); | |
| } | |
| public Message receive(String queueName, long timeoutMillis) throws JMSException { | |
| return getConsumer(queueName).receive(timeoutMillis); | |
| } | |
| public void sendTextMessage(String queueName, String text) throws JMSException { | |
| getProducer(queueName).sendTextMessage(text); | |
| } | |
| public void sendTextMessage(String queueName, | |
| String message, | |
| Map<String, Object> properties) throws JMSException { | |
| getProducer(queueName).sendTextMessage(message, properties); | |
| } | |
| public void sendMessage(String queueName, | |
| Message message) throws JMSException { | |
| getProducer(queueName).sendMessage(message); | |
| } | |
| public void sendBytesMessage(String queueName, byte[] bytes) throws JMSException { | |
| getProducer(queueName).sendBytesMessage(bytes); | |
| } | |
| public void shutdown() throws JMSException { | |
| synchronized (consumers) { | |
| for (Consumer c : consumers.values()) | |
| c.shutdown(); | |
| consumers.clear(); | |
| } | |
| synchronized (producers) { | |
| for (Producer c : producers.values()) | |
| c.shutdown(); | |
| producers.clear(); | |
| } | |
| if (connection != null) | |
| connection.close(); | |
| } | |
| @Override | |
| public void onException(JMSException e) { | |
| System.err.println("NEW ERROR: " + e.getMessage()); | |
| e.printStackTrace(); | |
| } | |
| private class Consumer { | |
| private final Session session; | |
| private final MessageConsumer consumer; | |
| private Consumer(Connection connection, String queueName) throws JMSException { | |
| this.session = connection.createSession(receiverTransacted, receiverAcknowledge); | |
| consumer = session.createConsumer(session.createQueue(queueName)); | |
| } | |
| void setMessageListener(MessageListener l) throws JMSException { | |
| consumer.setMessageListener(l); | |
| } | |
| Message receive() throws JMSException { | |
| return consumer.receive(); | |
| } | |
| Message receive(long timeoutMillis) throws JMSException { | |
| return consumer.receive(timeoutMillis); | |
| } | |
| public void shutdown() throws JMSException { | |
| consumer.close(); | |
| session.close(); | |
| } | |
| } | |
| private class Producer { | |
| private final Session session; | |
| private final MessageProducer producer; | |
| private Producer(Connection connection, String queueName) throws JMSException { | |
| this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); | |
| this.producer = session.createProducer(session.createQueue(queueName)); | |
| } | |
| public void sendTextMessage(String text) throws JMSException { | |
| this.producer.send(session.createTextMessage(text)); | |
| } | |
| public void sendTextMessage(String text, Map<String, Object> properties) throws JMSException { | |
| final TextMessage message = session.createTextMessage(text); | |
| setProperties(message, properties); | |
| this.producer.send(message); | |
| } | |
| private void setProperties(Message m, Map<String, Object> properties) throws JMSException { | |
| if (properties.isEmpty()) { | |
| return; | |
| } | |
| for (Map.Entry<String, Object> entry : properties.entrySet()) { | |
| final String key = entry.getKey(); | |
| final Object value = entry.getValue(); | |
| if (value instanceof String) { | |
| m.setStringProperty(key, (String) value); | |
| } else { | |
| throw new IllegalArgumentException("Unexpected property type:" + | |
| " key = [" + key + "], value = [" + value + "]"); | |
| } | |
| } | |
| } | |
| public void sendBytesMessage(byte[] bytes) throws JMSException { | |
| final BytesMessage m = session.createBytesMessage(); | |
| m.writeBytes(bytes); | |
| this.producer.send(m); | |
| } | |
| public void sendMessage(Message message) throws JMSException { | |
| this.producer.send(message); | |
| } | |
| public void shutdown() throws JMSException { | |
| producer.close(); | |
| session.close(); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment