Created
June 22, 2012 04:29
-
-
Save Randgalt/2970233 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.netflix.curator.framework.recipes.locks; | |
import com.google.common.util.concurrent.ThreadFactoryBuilder; | |
import com.netflix.curator.framework.CuratorFramework; | |
import org.apache.zookeeper.KeeperException; | |
import org.apache.zookeeper.data.Stat; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.Closeable; | |
import java.io.IOException; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.DelayQueue; | |
import java.util.concurrent.Delayed; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.TimeUnit; | |
public class Reaper implements Closeable | |
{ | |
private final Logger log = LoggerFactory.getLogger(getClass()); | |
private final CuratorFramework client; | |
private final ExecutorService executor; | |
private final int reapingThresholdMs; | |
private final DelayQueue<PathHolder> queue = new DelayQueue<PathHolder>(); | |
private volatile Future<Void> task; | |
private static final int REAPING_THRESHOLD_MS = (int)TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES); | |
private static class PathHolder implements Delayed | |
{ | |
private final String path; | |
private final long expirationMs; | |
private PathHolder(String path, int delayMs) | |
{ | |
this.path = path; | |
this.expirationMs = System.currentTimeMillis() + delayMs; | |
} | |
@Override | |
public long getDelay(TimeUnit unit) | |
{ | |
return unit.convert(expirationMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); | |
} | |
@Override | |
public int compareTo(Delayed o) | |
{ | |
long diff = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS); | |
return (diff < 0) ? -1 : ((diff > 0) ? 1 : 0); | |
} | |
} | |
public Reaper(CuratorFramework client) | |
{ | |
this(client, Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("Reaper-%d").build()), REAPING_THRESHOLD_MS); | |
} | |
public Reaper(CuratorFramework client, ExecutorService executor, int reapingThresholdMs) | |
{ | |
this.client = client; | |
this.executor = executor; | |
this.reapingThresholdMs = reapingThresholdMs; | |
} | |
public void addPath(String path) | |
{ | |
queue.add(new PathHolder(path, reapingThresholdMs)); | |
} | |
public void start() throws Exception | |
{ | |
task = executor.submit | |
( | |
new Callable<Void>() | |
{ | |
@Override | |
public Void call() throws Exception | |
{ | |
try | |
{ | |
while ( !Thread.currentThread().isInterrupted() ) | |
{ | |
reap(queue.take().path); | |
} | |
} | |
catch ( InterruptedException e ) | |
{ | |
Thread.currentThread().interrupt(); | |
} | |
return null; | |
} | |
} | |
); | |
} | |
@Override | |
public void close() throws IOException | |
{ | |
try | |
{ | |
queue.clear(); | |
task.cancel(true); | |
} | |
catch ( Exception e ) | |
{ | |
log.error("Canceling task", e); | |
} | |
} | |
private void reap(String path) | |
{ | |
try | |
{ | |
Stat stat = client.checkExists().forPath(path); | |
if ( stat != null ) // otherwise already deleted | |
{ | |
if ( stat.getNumChildren() == 0 ) | |
{ | |
try | |
{ | |
client.delete().forPath(path); | |
log.info("Reaping path: " + path); | |
} | |
catch ( KeeperException.NoNodeException ignore ) | |
{ | |
// ignore | |
} | |
} | |
else | |
{ | |
addPath(path); // it has children, check again later | |
} | |
} | |
} | |
catch ( Exception e ) | |
{ | |
log.error("Trying to reap: " + path, e); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment