Created
September 10, 2014 10:55
-
-
Save danieldk/d9e42fd5cce4fcf0eaea to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package de.tuebingen.uni.sfs.clarind.jesque.listeners; | |
| import com.google.common.base.Preconditions; | |
| import net.greghaines.jesque.Job; | |
| import net.greghaines.jesque.utils.JedisUtils; | |
| import net.greghaines.jesque.worker.Worker; | |
| import net.greghaines.jesque.worker.WorkerEvent; | |
| import net.greghaines.jesque.worker.WorkerListener; | |
| import org.slf4j.Logger; | |
| import org.slf4j.LoggerFactory; | |
| import redis.clients.jedis.Jedis; | |
| import redis.clients.jedis.JedisPool; | |
| import java.util.concurrent.locks.Condition; | |
| import java.util.concurrent.locks.Lock; | |
| import java.util.concurrent.locks.ReentrantLock; | |
| /** | |
| * This listener sends heartbeats during the worker's lifetime. Heartbeats | |
| * are stopped when: | |
| * <p/> | |
| * <ul> | |
| * <li>The worker is shut down.</li> | |
| * <li>The worker thread is not alive.</li> | |
| * </ul> | |
| * | |
| * @author Daniël de Kok <[email protected]> | |
| */ | |
| public class HeartbeatListener implements WorkerListener { | |
| private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatListener.class); | |
| private final JedisPool jedisPool; | |
| private final int heartbeatExpire; | |
| private final int heartbeatInterval; | |
| private HeartbeatRunnable heartbeatRunnable; | |
| private final Lock heartbeatLock = new ReentrantLock(); | |
| private final Condition heartbeatCondition = heartbeatLock.newCondition(); | |
| /** | |
| * Construct a heartbeat listener. | |
| * | |
| * @param jedisPool The Jedis pool to use for Jedis instances/connections. | |
| * @param heartbeatInterval The interval for heartbeats in seconds. | |
| * @param heartbeatExpire The expiration time for a heartbeat. The heartbeat is removed from | |
| * redis after this amount of time. | |
| */ | |
| public HeartbeatListener(JedisPool jedisPool, int heartbeatInterval, int heartbeatExpire) { | |
| this.jedisPool = jedisPool; | |
| this.heartbeatInterval = heartbeatInterval; | |
| this.heartbeatExpire = heartbeatExpire; | |
| heartbeatRunnable = null; | |
| } | |
| /** | |
| * Construct a heartbeat listener with a heartbeat interval of 1 second and an | |
| * expiration time of 10 seconds. | |
| */ | |
| public HeartbeatListener(JedisPool jedisPool) { | |
| this(jedisPool, 1, 10); | |
| } | |
| @Override | |
| public void onEvent(WorkerEvent event, Worker worker, String queue, Job job, Object runner, Object result, Exception ex) { | |
| if (event == WorkerEvent.WORKER_START) { | |
| heartbeatRunnable = new HeartbeatRunnable(Thread.currentThread(), worker.getName()); | |
| heartbeatLock.lock(); | |
| try { | |
| new Thread(heartbeatRunnable).start(); | |
| // Assure that we have a heartbeat before we continue... | |
| // | |
| // Note: do we want to use a timeout plus throw an exception if there was no heartbeat, | |
| // or just wait forever? | |
| heartbeatCondition.await(); | |
| } catch (InterruptedException e) { | |
| heartbeatRunnable.stop(); | |
| } finally { | |
| heartbeatLock.unlock(); | |
| } | |
| } else if (event == WorkerEvent.WORKER_STOP) { | |
| if (heartbeatRunnable != null) { | |
| heartbeatRunnable.stop(); | |
| heartbeatRunnable = null; | |
| } | |
| } | |
| } | |
| private class HeartbeatRunnable implements Runnable { | |
| private String workerName; | |
| private Boolean stopped; | |
| private Thread workerThread; | |
| private HeartbeatRunnable(Thread workerThread, String workerName) { | |
| this.workerThread = workerThread; | |
| this.workerName = workerName; | |
| stopped = false; | |
| } | |
| @Override | |
| public void run() { | |
| Preconditions.checkNotNull(workerName, "Worker name cannot be null"); | |
| stopped = false; | |
| while (!stopped) { | |
| if (!workerThread.isAlive()) { | |
| LOGGER.warn("Worker thread is not alive: {}", workerThread); | |
| break; | |
| } | |
| heartbeatLock.lock(); | |
| try { | |
| beat(); | |
| heartbeatCondition.signalAll(); | |
| } finally { | |
| heartbeatLock.unlock(); | |
| } | |
| try { | |
| Thread.sleep(heartbeatInterval * 1000); | |
| } catch (InterruptedException e) { | |
| break; | |
| } | |
| } | |
| } | |
| private void beat() { | |
| Jedis jedis = jedisPool.getResource(); | |
| try { | |
| JedisUtils.ensureJedisConnection(jedis); | |
| jedis.setex(String.format("heartbeat:%s", workerName), heartbeatExpire, "1"); | |
| } finally { | |
| jedisPool.returnResource(jedis); | |
| } | |
| } | |
| public void stop() { | |
| stopped = true; | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment