Created
October 7, 2016 15:47
-
-
Save AlejandroRivera/34235c35bb62ab572932b373444420a0 to your computer and use it in GitHub Desktop.
Apache QPid as embedded MQ broker
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.redmart.commons.test.mq; | |
import com.google.common.base.Charsets; | |
import com.google.common.collect.ImmutableMap; | |
import com.google.common.io.Resources; | |
import ch.qos.logback.classic.LoggerContext; | |
import ch.qos.logback.classic.util.ContextInitializer; | |
import ch.qos.logback.core.joran.spi.JoranException; | |
import com.fasterxml.jackson.databind.ObjectMapper; | |
import com.redmart.commons.test.integration.RandomPortProvider; | |
import org.apache.commons.io.IOUtils; | |
import org.apache.qpid.server.Broker; | |
import org.apache.qpid.server.BrokerOptions; | |
import org.junit.rules.ExternalResource; | |
import org.junit.rules.TemporaryFolder; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.File; | |
import java.io.FileOutputStream; | |
import java.net.URL; | |
import java.util.Map; | |
import java.util.function.Supplier; | |
/** | |
* A JUnit resource that starts a Apache Qpid Message Queue Broker. | |
* | |
* <pre><code> | |
* public class MyTest { | |
* {@literal @}ClassRule | |
* public static EmbeddedMessageQueueResource mqBroker = new EmbeddedMessageQueueResource(); | |
* | |
* private Connection connection; | |
* | |
* {@literal @}BeforeClass | |
* public static void setupClass() { | |
* ConnectionFactory connectionFactory = new ConnectionFactory(); | |
* connectionFactory.setHost("localhost"); | |
* connectionFactory.setPort(mqBroker.getAmqpPort()); | |
* connectionFactory.setVirtualHost(mqBroker.getVirtualHost()); | |
* connectionFactory.setUsername(mqBroker.getUsername()); | |
* connectionFactory.setPassword(mqBroker.getPassword()); | |
* | |
* Connection connection = connectionFactory.newConnection(); | |
* // ... | |
* } | |
* | |
* {@literal @}AfterClass | |
* public static void afterClass() { | |
* this.connection.close(); | |
* // ... | |
* } | |
* | |
* {@literal @}Test | |
* public testSomething(){ | |
* Channel channel = connection.createChannel(); | |
* // ... | |
* } | |
* } | |
* </code></pre> | |
*/ | |
public class EmbeddedMessageQueueResource extends ExternalResource { | |
public static final Map<String, Object> DEFAULT_VHOST_CONFIG = ImmutableMap.of("type", "Memory"); | |
private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedMessageQueueResource.class); | |
private final int amqpPort; | |
private final String virtualHost; | |
private final String username; | |
private final String password; | |
private final Map<String, Object> virtualHostConfig; | |
private TemporaryFolder temporaryFolder; | |
private Broker mqBroker; | |
public EmbeddedMessageQueueResource() { | |
this(new RandomPortProvider()); | |
} | |
public EmbeddedMessageQueueResource(Supplier<Integer> supplier) { | |
this(supplier.get()); | |
} | |
public EmbeddedMessageQueueResource(int port) { | |
this(port, "default", "guest", "guest", DEFAULT_VHOST_CONFIG, new TemporaryFolder()); | |
} | |
/** | |
* A constructor, d'uh. | |
*/ | |
public EmbeddedMessageQueueResource(int amqpPort, String virtualHost, String username, String password, | |
Map<String, Object> virtualHostConfig, TemporaryFolder temporaryFolder) { | |
this.amqpPort = amqpPort; | |
this.virtualHost = virtualHost; | |
this.virtualHostConfig = virtualHostConfig; | |
this.username = username; | |
this.password = password; | |
this.temporaryFolder = temporaryFolder; | |
} | |
@Override | |
protected void before() throws Throwable { | |
super.before(); | |
this.temporaryFolder.create(); | |
final File homeDirectory = temporaryFolder.newFolder("home"); | |
final File workDirectory = temporaryFolder.newFolder("wordkir"); | |
String config = Resources.toString(Resources.getResource("apache-qpid/conf.json"), Charsets.UTF_8); | |
IOUtils.write(config, new FileOutputStream(new File(homeDirectory, "conf.json"))); | |
String passwd = this.username + ":" + this.password; | |
IOUtils.write(passwd, new FileOutputStream(new File(homeDirectory, "passwd"))); | |
mqBroker = new Broker(); | |
BrokerOptions options = new BrokerOptions(); | |
options.setConfigProperty("qpid.work_dir", workDirectory.getAbsolutePath()); | |
options.setConfigProperty("qpid.home_dir", homeDirectory.getAbsolutePath()); | |
options.setConfigProperty("qpid.amqp_port", String.valueOf(amqpPort)); | |
options.setConfigProperty("qpid.virtual_host", this.virtualHost); | |
options.setConfigProperty("qpid.virtual_host_json_config", | |
new ObjectMapper().writeValueAsString(this.virtualHostConfig)); | |
options.setConfigProperty("queue.deadLetterQueueEnabled", "true"); | |
options.setStartupLoggedToSystemOut(false); | |
options.setInitialConfigurationLocation(homeDirectory.getAbsolutePath() + File.separator + "conf.json"); | |
mqBroker.startup(options); | |
reloadLogbackConfig(); | |
LOGGER.info("Embedded Message Queue Broker (Apache Qpid) started on port {} using username '{}'", amqpPort, username); | |
LOGGER.warn("Apache Qpid isn't RabbitMQ! Learn more about it: https://www.rabbitmq.com/interoperability.html"); | |
} | |
/** | |
* Apache Qpid modifies the Logging configuration manually since it was designed to run as a standalone server. | |
* | |
* <p>We need to reload the config to go back to the expected settings.</p> | |
*/ | |
private void reloadLogbackConfig() throws JoranException { | |
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory(); | |
ContextInitializer ci = new ContextInitializer(loggerContext); | |
URL url = ci.findURLOfDefaultConfigurationFile(true); | |
if (url == null) { | |
LOGGER.error("Could not reload Logback config to be reloaded. This will reduce logging visibility."); | |
return; | |
} | |
loggerContext.reset(); | |
ci.configureByResource(url); | |
} | |
@Override | |
protected void after() { | |
super.after(); | |
mqBroker.shutdown(); | |
temporaryFolder.delete(); | |
} | |
public int getAmqpPort() { | |
return amqpPort; | |
} | |
public String getPassword() { | |
return password; | |
} | |
public String getUsername() { | |
return username; | |
} | |
public String getVirtualHost() { | |
return virtualHost; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment