Created
January 4, 2010 23:23
-
-
Save afeinberg/268966 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/java/voldemort/server/VoldemortServer.java b/src/java/voldemort/server/VoldemortServer.java | |
index f69cd5e..302c5de 100644 | |
--- a/src/java/voldemort/server/VoldemortServer.java | |
+++ b/src/java/voldemort/server/VoldemortServer.java | |
@@ -62,8 +62,7 @@ public class VoldemortServer extends AbstractService { | |
private static final Logger logger = Logger.getLogger(VoldemortServer.class.getName()); | |
public static final long DEFAULT_PUSHER_POLL_MS = 60 * 1000; | |
- | |
- private final static int ASYNC_REQUEST_THREADS = 8; | |
+ | |
private final static int ASYNC_REQUEST_CACHE_SIZE = 64; | |
private final Node identityNode; | |
@@ -71,7 +70,7 @@ public class VoldemortServer extends AbstractService { | |
private final StoreRepository storeRepository; | |
private final VoldemortConfig voldemortConfig; | |
private final MetadataStore metadata; | |
- private final AsyncOperationRunner asyncRunner; | |
+ private AsyncOperationRunner asyncRunner; | |
public VoldemortServer(VoldemortConfig config) { | |
super(ServiceType.VOLDEMORT); | |
@@ -80,7 +79,6 @@ public class VoldemortServer extends AbstractService { | |
this.metadata = MetadataStore.readFromDirectory(new File(this.voldemortConfig.getMetadataDirectory()), | |
voldemortConfig.getNodeId()); | |
this.identityNode = metadata.getCluster().getNodeById(voldemortConfig.getNodeId()); | |
- this.asyncRunner = new AsyncOperationRunner(ASYNC_REQUEST_THREADS, ASYNC_REQUEST_CACHE_SIZE); | |
this.services = createServices(); | |
} | |
@@ -95,7 +93,7 @@ public class VoldemortServer extends AbstractService { | |
metadataInnerEngine.put(MetadataStore.CLUSTER_KEY, | |
new Versioned<String>(new ClusterMapper().writeCluster(cluster))); | |
this.metadata = new MetadataStore(metadataInnerEngine, voldemortConfig.getNodeId()); | |
- this.asyncRunner = new AsyncOperationRunner(ASYNC_REQUEST_THREADS, ASYNC_REQUEST_CACHE_SIZE); | |
+ | |
this.services = createServices(); | |
} | |
@@ -113,9 +111,12 @@ public class VoldemortServer extends AbstractService { | |
metadata, | |
scheduler, | |
voldemortConfig); | |
+ | |
+ asyncRunner = new AsyncOperationRunner(scheduler, ASYNC_REQUEST_CACHE_SIZE); | |
+ | |
services.add(storageService); | |
services.add(scheduler); | |
- services.add(this.asyncRunner); | |
+ services.add(asyncRunner); | |
if(voldemortConfig.isHttpServerEnabled()) | |
services.add(new HttpService(this, | |
diff --git a/src/java/voldemort/server/protocol/admin/AsyncOperationRunner.java b/src/java/voldemort/server/protocol/admin/AsyncOperationRunner.java | |
index b4c9ed1..4fabf73 100644 | |
--- a/src/java/voldemort/server/protocol/admin/AsyncOperationRunner.java | |
+++ b/src/java/voldemort/server/protocol/admin/AsyncOperationRunner.java | |
@@ -12,6 +12,8 @@ import voldemort.annotations.jmx.JmxManaged; | |
import voldemort.annotations.jmx.JmxOperation; | |
import voldemort.server.AbstractService; | |
import voldemort.server.ServiceType; | |
+import voldemort.server.scheduler.SchedulerService; | |
+import voldemort.utils.SystemTime; | |
/** | |
* @author afeinberg | |
@@ -21,36 +23,17 @@ import voldemort.server.ServiceType; | |
public class AsyncOperationRunner extends AbstractService { | |
private final Map<Integer, AsyncOperation> operations; | |
- private final ExecutorService executor; | |
private final AtomicInteger lastOperationId = new AtomicInteger(0); | |
+ private final SchedulerService scheduler; | |
private final static Logger logger = Logger.getLogger(AsyncOperationRunner.class); | |
- /** | |
- * TODO: Unify this with {@link voldemort.server.scheduler.SchedulerService} | |
- */ | |
- private static final ThreadFactory threadFactory = new ThreadFactory() { | |
- public Thread newThread(Runnable r) { | |
- Thread thread = new Thread(r); | |
- thread.setDaemon(true); | |
- thread.setName(r.getClass().getName()); | |
- thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { | |
- | |
- public void uncaughtException(Thread t, Throwable e) { | |
- logger.error("Asynchronous operation failed!", e); | |
- } | |
- }); | |
- | |
- return thread; | |
- } | |
- }; | |
- | |
@SuppressWarnings("unchecked") | |
// apache commons collections aren't updated for 1.5 yet | |
- public AsyncOperationRunner(int poolSize, int cacheSize) { | |
+ public AsyncOperationRunner(SchedulerService scheduler, int cacheSize) { | |
super(ServiceType.ASYNC_SCHEDULER); | |
operations = Collections.synchronizedMap(new AsyncOperationRepository(cacheSize)); | |
- executor = Executors.newFixedThreadPool(poolSize, threadFactory); | |
+ this.scheduler = scheduler; | |
} | |
/** | |
@@ -66,7 +49,7 @@ public class AsyncOperationRunner extends AbstractService { | |
+ " already submitted to the system"); | |
this.operations.put(requestId, operation); | |
- executor.submit(operation); | |
+ scheduler.scheduleNow(operation); | |
logger.debug("Handling async operation " + requestId); | |
} | |
diff --git a/test/unit/voldemort/server/protocol/admin/AsyncOperationTest.java b/test/unit/voldemort/server/protocol/admin/AsyncOperationTest.java | |
index 5cb5429..6818fce 100644 | |
--- a/test/unit/voldemort/server/protocol/admin/AsyncOperationTest.java | |
+++ b/test/unit/voldemort/server/protocol/admin/AsyncOperationTest.java | |
@@ -72,7 +72,13 @@ public class AsyncOperationTest extends TestCase { | |
} | |
operations.put("foo5", completeLater); | |
+ | |
assertTrue(operations.containsKey("foo5")); | |
+ | |
+ for (int i = 0; i < 10; i++) { | |
+ operations.put("foo" + 5 + i, completeLater); | |
+ } | |
+ | |
assertFalse("Actually does LRU heuristics", operations.containsKey("foo2")); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment