Skip to content

Instantly share code, notes, and snippets.

@afeinberg
Created January 4, 2010 23:23
Show Gist options
  • Save afeinberg/268966 to your computer and use it in GitHub Desktop.
Save afeinberg/268966 to your computer and use it in GitHub Desktop.
diff --git a/src/java/voldemort/server/VoldemortServer.java b/src/java/voldemort/server/VoldemortServer.java
index f69cd5e..302c5de 100644
--- a/src/java/voldemort/server/VoldemortServer.java
+++ b/src/java/voldemort/server/VoldemortServer.java
@@ -62,8 +62,7 @@ public class VoldemortServer extends AbstractService {
private static final Logger logger = Logger.getLogger(VoldemortServer.class.getName());
public static final long DEFAULT_PUSHER_POLL_MS = 60 * 1000;
-
- private final static int ASYNC_REQUEST_THREADS = 8;
+
private final static int ASYNC_REQUEST_CACHE_SIZE = 64;
private final Node identityNode;
@@ -71,7 +70,7 @@ public class VoldemortServer extends AbstractService {
private final StoreRepository storeRepository;
private final VoldemortConfig voldemortConfig;
private final MetadataStore metadata;
- private final AsyncOperationRunner asyncRunner;
+ private AsyncOperationRunner asyncRunner;
public VoldemortServer(VoldemortConfig config) {
super(ServiceType.VOLDEMORT);
@@ -80,7 +79,6 @@ public class VoldemortServer extends AbstractService {
this.metadata = MetadataStore.readFromDirectory(new File(this.voldemortConfig.getMetadataDirectory()),
voldemortConfig.getNodeId());
this.identityNode = metadata.getCluster().getNodeById(voldemortConfig.getNodeId());
- this.asyncRunner = new AsyncOperationRunner(ASYNC_REQUEST_THREADS, ASYNC_REQUEST_CACHE_SIZE);
this.services = createServices();
}
@@ -95,7 +93,7 @@ public class VoldemortServer extends AbstractService {
metadataInnerEngine.put(MetadataStore.CLUSTER_KEY,
new Versioned<String>(new ClusterMapper().writeCluster(cluster)));
this.metadata = new MetadataStore(metadataInnerEngine, voldemortConfig.getNodeId());
- this.asyncRunner = new AsyncOperationRunner(ASYNC_REQUEST_THREADS, ASYNC_REQUEST_CACHE_SIZE);
+
this.services = createServices();
}
@@ -113,9 +111,12 @@ public class VoldemortServer extends AbstractService {
metadata,
scheduler,
voldemortConfig);
+
+ asyncRunner = new AsyncOperationRunner(scheduler, ASYNC_REQUEST_CACHE_SIZE);
+
services.add(storageService);
services.add(scheduler);
- services.add(this.asyncRunner);
+ services.add(asyncRunner);
if(voldemortConfig.isHttpServerEnabled())
services.add(new HttpService(this,
diff --git a/src/java/voldemort/server/protocol/admin/AsyncOperationRunner.java b/src/java/voldemort/server/protocol/admin/AsyncOperationRunner.java
index b4c9ed1..4fabf73 100644
--- a/src/java/voldemort/server/protocol/admin/AsyncOperationRunner.java
+++ b/src/java/voldemort/server/protocol/admin/AsyncOperationRunner.java
@@ -12,6 +12,8 @@ import voldemort.annotations.jmx.JmxManaged;
import voldemort.annotations.jmx.JmxOperation;
import voldemort.server.AbstractService;
import voldemort.server.ServiceType;
+import voldemort.server.scheduler.SchedulerService;
+import voldemort.utils.SystemTime;
/**
* @author afeinberg
@@ -21,36 +23,17 @@ import voldemort.server.ServiceType;
public class AsyncOperationRunner extends AbstractService {
private final Map<Integer, AsyncOperation> operations;
- private final ExecutorService executor;
private final AtomicInteger lastOperationId = new AtomicInteger(0);
+ private final SchedulerService scheduler;
private final static Logger logger = Logger.getLogger(AsyncOperationRunner.class);
- /**
- * TODO: Unify this with {@link voldemort.server.scheduler.SchedulerService}
- */
- private static final ThreadFactory threadFactory = new ThreadFactory() {
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r);
- thread.setDaemon(true);
- thread.setName(r.getClass().getName());
- thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-
- public void uncaughtException(Thread t, Throwable e) {
- logger.error("Asynchronous operation failed!", e);
- }
- });
-
- return thread;
- }
- };
-
@SuppressWarnings("unchecked")
// apache commons collections aren't updated for 1.5 yet
- public AsyncOperationRunner(int poolSize, int cacheSize) {
+ public AsyncOperationRunner(SchedulerService scheduler, int cacheSize) {
super(ServiceType.ASYNC_SCHEDULER);
operations = Collections.synchronizedMap(new AsyncOperationRepository(cacheSize));
- executor = Executors.newFixedThreadPool(poolSize, threadFactory);
+ this.scheduler = scheduler;
}
/**
@@ -66,7 +49,7 @@ public class AsyncOperationRunner extends AbstractService {
+ " already submitted to the system");
this.operations.put(requestId, operation);
- executor.submit(operation);
+ scheduler.scheduleNow(operation);
logger.debug("Handling async operation " + requestId);
}
diff --git a/test/unit/voldemort/server/protocol/admin/AsyncOperationTest.java b/test/unit/voldemort/server/protocol/admin/AsyncOperationTest.java
index 5cb5429..6818fce 100644
--- a/test/unit/voldemort/server/protocol/admin/AsyncOperationTest.java
+++ b/test/unit/voldemort/server/protocol/admin/AsyncOperationTest.java
@@ -72,7 +72,13 @@ public class AsyncOperationTest extends TestCase {
}
operations.put("foo5", completeLater);
+
assertTrue(operations.containsKey("foo5"));
+
+ for (int i = 0; i < 10; i++) {
+ operations.put("foo" + 5 + i, completeLater);
+ }
+
assertFalse("Actually does LRU heuristics", operations.containsKey("foo2"));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment