Created
January 17, 2011 22:49
-
-
Save derekjw/783646 to your computer and use it in GitHub Desktop.
diff of Akka's Dispatchers.scala using case classes for config
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala | |
index 56dbc11..8b32235 100644 | |
--- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala | |
+++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala | |
@@ -7,10 +7,9 @@ package akka.dispatch | |
import akka.actor.{Actor, ActorRef} | |
import akka.actor.newUuid | |
import akka.config.Config._ | |
+import akka.config.DefaultDispatcherConfig | |
import akka.util.{Duration, Logging} | |
-import net.lag.configgy.ConfigMap | |
- | |
import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy} | |
import java.util.concurrent.TimeUnit | |
@@ -47,19 +46,15 @@ import java.util.concurrent.TimeUnit | |
* @author <a href="http://jonasboner.com">Jonas Bonér</a> | |
*/ | |
object Dispatchers extends Logging { | |
- val THROUGHPUT = config.getInt("akka.actor.throughput", 5) | |
- val DEFAULT_SHUTDOWN_TIMEOUT = config.getLong("akka.actor.dispatcher-shutdown-timeout"). | |
- map(time => Duration(time, TIME_UNIT)). | |
- getOrElse(Duration(1000,TimeUnit.MILLISECONDS)) | |
- val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", -1) | |
- val MAILBOX_PUSH_TIME_OUT = Duration(config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time", 10), TIME_UNIT) | |
- val THROUGHPUT_DEADLINE_TIME = Duration(config.getInt("akka.actor.throughput-deadline-time",-1), TIME_UNIT) | |
+ val THROUGHPUT = config.actor.throughput | |
+ val DEFAULT_SHUTDOWN_TIMEOUT = Duration(config.actor.dispatcherShutdownTimeout, TIME_UNIT) | |
+ val MAILBOX_CAPACITY = config.actor.defaultDispatcher.mailboxCapacity | |
+ val MAILBOX_PUSH_TIME_OUT = Duration(config.actor.defaultDispatcher.mailboxPushTimeout, TIME_UNIT) | |
+ val THROUGHPUT_DEADLINE_TIME = Duration(config.actor.throughputDeadlineTime, TIME_UNIT) | |
val THROUGHPUT_DEADLINE_TIME_MILLIS = THROUGHPUT_DEADLINE_TIME.toMillis.toInt | |
val MAILBOX_TYPE: MailboxType = if (MAILBOX_CAPACITY < 0) UnboundedMailbox() else BoundedMailbox() | |
- lazy val defaultGlobalDispatcher = { | |
- config.getConfigMap("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher) | |
- } | |
+ lazy val defaultGlobalDispatcher = from(config.actor.defaultDispatcher) | |
object globalHawtDispatcher extends HawtDispatcher | |
@@ -147,8 +142,8 @@ object Dispatchers extends Logging { | |
* Utility function that tries to load the specified dispatcher config from the akka.conf | |
* or else use the supplied default dispatcher | |
*/ | |
- def fromConfig(key: String, default: => MessageDispatcher = defaultGlobalDispatcher): MessageDispatcher = | |
- config getConfigMap key flatMap from getOrElse default | |
+ //def fromConfig(key: String, default: => MessageDispatcher = defaultGlobalDispatcher): MessageDispatcher = | |
+ //config getConfigMap key flatMap from getOrElse default | |
/* | |
* Creates of obtains a dispatcher from a ConfigMap according to the format below | |
@@ -172,48 +167,48 @@ object Dispatchers extends Logging { | |
* Returns: None if "type" isn't specified in the config | |
* Throws: IllegalArgumentException if the value of "type" is not valid | |
*/ | |
- def from(cfg: ConfigMap): Option[MessageDispatcher] = { | |
- lazy val name = cfg.getString("name", newUuid.toString) | |
+ def from(cfg: DefaultDispatcherConfig): MessageDispatcher = { | |
+ lazy val name = newUuid.toString // is this used in akka.conf? cfg.getString("name", newUuid.toString) | |
def configureThreadPool(createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = { | |
import ThreadPoolConfigDispatcherBuilder.conf_? | |
//Apply the following options to the config if they are present in the cfg | |
ThreadPoolConfigDispatcherBuilder(createDispatcher,ThreadPoolConfig()).configure( | |
- conf_?(cfg getInt "keep-alive-time" )(time => _.setKeepAliveTime(Duration(time, TIME_UNIT))), | |
- conf_?(cfg getDouble "core-pool-size-factor")(factor => _.setCorePoolSizeFromFactor(factor)), | |
- conf_?(cfg getDouble "max-pool-size-factor" )(factor => _.setMaxPoolSizeFromFactor(factor)), | |
- conf_?(cfg getInt "executor-bounds" )(bounds => _.setExecutorBounds(bounds)), | |
- conf_?(cfg getBool "allow-core-timeout" )(allow => _.setAllowCoreThreadTimeout(allow)), | |
- conf_?(cfg getString "rejection-policy" map { | |
+ _.setKeepAliveTime(Duration(cfg.keepAliveTime, TIME_UNIT)), | |
+ _.setCorePoolSizeFromFactor(cfg.corePoolSizeFactor), | |
+ _.setMaxPoolSizeFromFactor(cfg.maxPoolSizeFactor), | |
+ _.setExecutorBounds(cfg.executorBounds), | |
+ _.setAllowCoreThreadTimeout(cfg.allowCoreTimeout), | |
+ _.setRejectionPolicy(cfg.rejectionPolicy match { | |
case "abort" => new AbortPolicy() | |
case "caller-runs" => new CallerRunsPolicy() | |
case "discard-oldest" => new DiscardOldestPolicy() | |
case "discard" => new DiscardPolicy() | |
case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x) | |
- })(policy => _.setRejectionPolicy(policy))) | |
+ })) | |
} | |
lazy val mailboxType: MailboxType = { | |
- val capacity = cfg.getInt("mailbox-capacity", MAILBOX_CAPACITY) | |
+ val capacity = cfg.mailboxCapacity | |
// FIXME how do we read in isBlocking for mailbox? Now set to 'false'. | |
if (capacity < 0) UnboundedMailbox() | |
- else BoundedMailbox(false, capacity, Duration(cfg.getInt("mailbox-push-timeout", MAILBOX_PUSH_TIME_OUT.toMillis.toInt), TIME_UNIT)) | |
+ else BoundedMailbox(false, capacity, Duration(cfg.mailboxPushTimeout, TIME_UNIT)) //bug? | |
} | |
- cfg.getString("type") map { | |
+ cfg.dispatcher match { | |
case "ExecutorBasedEventDriven" => | |
configureThreadPool(threadPoolConfig => new ExecutorBasedEventDrivenDispatcher( | |
name, | |
- cfg.getInt("throughput", THROUGHPUT), | |
- cfg.getInt("throughput-deadline", THROUGHPUT_DEADLINE_TIME_MILLIS), | |
+ cfg.throughput, | |
+ cfg.throughputDeadlineTime, //bug? | |
mailboxType, | |
threadPoolConfig)).build | |
case "ExecutorBasedEventDrivenWorkStealing" => | |
configureThreadPool(poolCfg => new ExecutorBasedEventDrivenWorkStealingDispatcher(name, mailboxType,poolCfg)).build | |
- case "Hawt" => new HawtDispatcher(cfg.getBool("aggregate",true)) | |
+ case "Hawt" => new HawtDispatcher(cfg.aggregate) | |
case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher | |
case "GlobalHawt" => globalHawtDispatcher | |
case unknown => throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment