Skip to content

Instantly share code, notes, and snippets.

@danieldk
Created September 10, 2014 10:55
Show Gist options
  • Save danieldk/d9e42fd5cce4fcf0eaea to your computer and use it in GitHub Desktop.
Save danieldk/d9e42fd5cce4fcf0eaea to your computer and use it in GitHub Desktop.
package de.tuebingen.uni.sfs.clarind.jesque.listeners;
import com.google.common.base.Preconditions;
import net.greghaines.jesque.Job;
import net.greghaines.jesque.utils.JedisUtils;
import net.greghaines.jesque.worker.Worker;
import net.greghaines.jesque.worker.WorkerEvent;
import net.greghaines.jesque.worker.WorkerListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* This listener sends heartbeats during the worker's lifetime. Heartbeats
* are stopped when:
* <p/>
* <ul>
* <li>The worker is shut down.</li>
* <li>The worker thread is not alive.</li>
* </ul>
*
* @author Daniël de Kok <[email protected]>
*/
public class HeartbeatListener implements WorkerListener {
private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatListener.class);
private final JedisPool jedisPool;
private final int heartbeatExpire;
private final int heartbeatInterval;
private HeartbeatRunnable heartbeatRunnable;
private final Lock heartbeatLock = new ReentrantLock();
private final Condition heartbeatCondition = heartbeatLock.newCondition();
/**
* Construct a heartbeat listener.
*
* @param jedisPool The Jedis pool to use for Jedis instances/connections.
* @param heartbeatInterval The interval for heartbeats in seconds.
* @param heartbeatExpire The expiration time for a heartbeat. The heartbeat is removed from
* redis after this amount of time.
*/
public HeartbeatListener(JedisPool jedisPool, int heartbeatInterval, int heartbeatExpire) {
this.jedisPool = jedisPool;
this.heartbeatInterval = heartbeatInterval;
this.heartbeatExpire = heartbeatExpire;
heartbeatRunnable = null;
}
/**
* Construct a heartbeat listener with a heartbeat interval of 1 second and an
* expiration time of 10 seconds.
*/
public HeartbeatListener(JedisPool jedisPool) {
this(jedisPool, 1, 10);
}
@Override
public void onEvent(WorkerEvent event, Worker worker, String queue, Job job, Object runner, Object result, Exception ex) {
if (event == WorkerEvent.WORKER_START) {
heartbeatRunnable = new HeartbeatRunnable(Thread.currentThread(), worker.getName());
heartbeatLock.lock();
try {
new Thread(heartbeatRunnable).start();
// Assure that we have a heartbeat before we continue...
//
// Note: do we want to use a timeout plus throw an exception if there was no heartbeat,
// or just wait forever?
heartbeatCondition.await();
} catch (InterruptedException e) {
heartbeatRunnable.stop();
} finally {
heartbeatLock.unlock();
}
} else if (event == WorkerEvent.WORKER_STOP) {
if (heartbeatRunnable != null) {
heartbeatRunnable.stop();
heartbeatRunnable = null;
}
}
}
private class HeartbeatRunnable implements Runnable {
private String workerName;
private Boolean stopped;
private Thread workerThread;
private HeartbeatRunnable(Thread workerThread, String workerName) {
this.workerThread = workerThread;
this.workerName = workerName;
stopped = false;
}
@Override
public void run() {
Preconditions.checkNotNull(workerName, "Worker name cannot be null");
stopped = false;
while (!stopped) {
if (!workerThread.isAlive()) {
LOGGER.warn("Worker thread is not alive: {}", workerThread);
break;
}
heartbeatLock.lock();
try {
beat();
heartbeatCondition.signalAll();
} finally {
heartbeatLock.unlock();
}
try {
Thread.sleep(heartbeatInterval * 1000);
} catch (InterruptedException e) {
break;
}
}
}
private void beat() {
Jedis jedis = jedisPool.getResource();
try {
JedisUtils.ensureJedisConnection(jedis);
jedis.setex(String.format("heartbeat:%s", workerName), heartbeatExpire, "1");
} finally {
jedisPool.returnResource(jedis);
}
}
public void stop() {
stopped = true;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment