Skip to content

Instantly share code, notes, and snippets.

@vadimii
Last active December 20, 2015 23:18
Show Gist options
  • Save vadimii/6210930 to your computer and use it in GitHub Desktop.
Save vadimii/6210930 to your computer and use it in GitHub Desktop.
ActiveMQ Publisher class implemented in java and groovy
package amq
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import static amq.Constants.*
class Constants {
static final BROKER_URL = 'tcp://localhost:61616'
}
class Publisher {
private static factory
private connection
private session
private producer
private jobs = ['suspend', 'delete']
private id = 1000000
public Publisher() {
factory = new ActiveMQConnectionFactory(BROKER_URL)
connection = factory.createConnection()
connection.start()
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
producer = session.createProducer(null)
}
public sendMessage() {
int idx = Math.round((jobs.size - 1) * Math.random())
def job = jobs[idx]
def destination = session.createQueue("JOBS.${job}")
def message = session.createObjectMessage(id++)
println "Sending: id: ${message.object} on queue: ${destination}"
producer.send(destination, message)
}
public close() {
if (connection != null) {
connection.close()
}
}
static void main(args) {
def count = 10
def total = 0
def publisher = new Publisher()
while (total < 1000) {
for (i in 0..count) {
publisher.sendMessage()
}
total += count
println "Published '${count}' of '${total}' job messages"
Thread.sleep(1000)
}
publisher.close()
}
}
package org.apache.activemq.book.ch3.jobs;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Publisher {
private static String brokerURL = "tcp://localhost:61616";
private static transient ConnectionFactory factory;
private transient Connection connection;
private transient Session session;
private transient MessageProducer producer;
private static int count = 10;
private static int total;
private static int id = 1000000;
private String jobs[] = new String[]{"suspend", "delete"};
public Publisher() throws JMSException {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(null);
}
public void close() throws JMSException {
if (connection != null) {
connection.close();
}
}
public static void main(String[] args) throws JMSException {
Publisher publisher = new Publisher();
while (total < 1000) {
for (int i = 0; i < count; i++) {
publisher.sendMessage();
}
total += count;
System.out.println("Published '" + count + "' of '" + total + "' job messages");
try {
Thread.sleep(1000);
} catch (InterruptedException x) {
}
}
publisher.close();
}
public void sendMessage() throws JMSException {
int idx = 0;
while (true) {
idx = (int)Math.round(jobs.length * Math.random());
if (idx < jobs.length) {
break;
}
}
String job = jobs[idx];
Destination destination = session.createQueue("JOBS." + job);
Message message = session.createObjectMessage(id++);
System.out.println("Sending: id: " + ((ObjectMessage)message).getObject() + " on queue: " + destination);
producer.send(destination, message);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment