Created
November 8, 2012 17:41
-
-
Save fieldju/4040309 to your computer and use it in GitHub Desktop.
ActiveMQBrokerMonitorThread
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package cnwk.foreman.ingestion; | |
import cnwk.foreman.util.LangUtils; | |
import org.apache.activemq.broker.BrokerService; | |
import org.apache.activemq.util.ServiceStopper; | |
import org.apache.commons.lang.StringUtils; | |
import org.apache.log4j.Logger; | |
import java.net.InetAddress; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
/** | |
* Created by Justin Field | [email protected] on 10/2/12 | |
* Description: simple thread for handling the starting of the active mq | |
* broker so that it does not block foreman when it gets blocked after not getting | |
* the lock on the DB | |
*/ | |
public class ActiveMQBrokerMonitorThread implements Runnable { | |
private static final Logger LOG = Logger.getLogger(ActiveMQBrokerMonitorThread.class); | |
private static final long ONE_MINUTE = 60000; | |
private AtomicBoolean running = new AtomicBoolean(true); | |
private BrokerService broker; | |
private ParallelIngestionService parallelIngestionService; | |
private BrokerFactory brokerFactory; | |
public Boolean isRunning() { | |
return running.get(); | |
} | |
@SuppressWarnings({"UnusedDeclaration"}) | |
public void setRunning(final Boolean running) { | |
this.running.set(running); | |
} | |
public ActiveMQBrokerMonitorThread(final BrokerFactory brokerFactory, final ParallelIngestionService parallelIngestionService) { | |
this.brokerFactory = brokerFactory; | |
this.parallelIngestionService = parallelIngestionService; | |
} | |
public void run() { | |
while (isRunning()) { | |
checkAndStartBroker(); | |
try { | |
if (! broker.isStarted()) | |
broker.waitUntilStarted(); | |
} catch (Exception e) { | |
LOG.error("failed to wait for broker to start", e); | |
} | |
try { | |
Thread.sleep(ONE_MINUTE); | |
} catch (Exception e) { | |
LOG.error("sleep interrupted", e); | |
} | |
verifyMasterLock(); | |
} | |
} | |
/** | |
* checks to see if the broker is started | |
* if not it tries to start it | |
*/ | |
public void checkAndStartBroker() { | |
if (broker == null || ! broker.isStarted()) { | |
try { | |
broker = getNewBroker(); | |
broker.start(); | |
} catch (Exception e) { | |
LOG.error("Broker failed to start", e); | |
} | |
} | |
} | |
/** | |
* queries the DB to verify who has the master lock | |
* if its another broker and this is not a slave then it shuts down the broker | |
*/ | |
public void verifyMasterLock() { | |
String currentLockHolder = parallelIngestionService.getActiveMQLockHolder(); | |
String hostname = null; | |
try { | |
hostname = InetAddress.getLocalHost().getHostName(); | |
} catch (Exception e) { | |
LOG.error("failed to get hostname", e); | |
} | |
// if the broker is not a slave and is not in the db as the master | |
// then we have to masters and this is bad | |
if ( ! StringUtils.equals(currentLockHolder, hostname) && ! broker.isSlave()) { | |
LOG.error("Broker: " + LangUtils.ifNull(broker.getBrokerName(), "null") + | |
" thinks its a master when the DB says Broker: " + currentLockHolder + | |
" is the master, restarting broker"); | |
try { | |
broker.stopAllConnectors(new ServiceStopper()); | |
broker.stop(); | |
broker.waitUntilStopped(); | |
} catch (Exception e) { | |
LOG.error("failed to stop broker", e); | |
} | |
} | |
} | |
// Please note that this is a custom brokerFactory and not the one that is included with ActiveMQ | |
private BrokerService getNewBroker() { | |
return brokerFactory.getBean(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment