Created
October 23, 2015 23:44
-
-
Save kirked/a81ae4b2e52d8b89a9cc to your computer and use it in GitHub Desktop.
Akka worker pool; backpressure configurable via dispatcher configuration.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package actors | |
import akka.actor._ | |
import akka.pattern.ask | |
import scala.collection.immutable.Queue | |
import scala.concurrent.duration._ | |
import scala.util.{Success, Failure} | |
object WorkerPool { | |
val defaultTimeout = 30.seconds | |
/** requests for worker pool; response will be worker response to `msg`. */ | |
case class Request(msg: Any, expectResponse: Boolean = true, timeout: Option[FiniteDuration] = Some(defaultTimeout)) | |
def newPool(workerProps: Props, workerCount: Int): Props = | |
WorkRequesting.WorkManager.props(workerProps, workerCount) | |
} | |
object WorkRequesting { | |
private object PrivateMessages { | |
case object GimmeWork | |
case class WorkRequest(msg: Any, requestor: ActorRef, expectResponse: Boolean, timeout: FiniteDuration) | |
} | |
object WorkManager extends ActorGenerator2[WorkManager, Props, Int] | |
class WorkManager(workerProps: Props, workerCount: Int) | |
extends Actor | |
with ActorLogging | |
with ErrorLogging { | |
import PrivateMessages._ | |
import WorkerPool._ | |
private var workers: Set[ActorRef] = Seq.fill(workerCount)(makeWorker).toSet | |
private var workQueue = Queue.empty[WorkRequest] | |
def receive = { | |
case GimmeWork => | |
log.debug("GimmeWork from {}", sender) | |
workers = workers + sender | |
routeWork | |
case Terminated(actor) if workers contains actor => | |
log.debug("{} TERMINATED", sender) | |
workers = workers - actor + makeWorker | |
routeWork | |
case Request(msg, responseExpected, timeout) => | |
log.debug("REQUEST ({}, {}, {}) from {}", msg, responseExpected, timeout, sender) | |
workQueue = workQueue.enqueue(WorkRequest(msg, sender, responseExpected, timeout getOrElse defaultTimeout)) | |
routeWork | |
case msg if !workers.contains(sender) => | |
log.debug("RAW {} from {}", msg, sender) | |
workQueue = workQueue.enqueue(WorkRequest(msg, sender, true, defaultTimeout)) | |
routeWork | |
} | |
private def routeWork: Unit = { | |
while (workers.nonEmpty && workQueue.nonEmpty) { | |
val worker = workers.head | |
workers = workers.tail | |
val (work, updatedQueue) = workQueue.dequeue | |
workQueue = updatedQueue | |
worker ! work | |
} | |
} | |
private def makeWorker: ActorRef = context.actorOf(Worker.props(self, workerProps)) | |
} | |
object Worker extends ActorGenerator2[Worker, ActorRef, Props] | |
class Worker(manager: ActorRef, workerProps: Props) | |
extends Actor | |
with ActorLogging | |
with ErrorLogging { | |
import PrivateMessages._ | |
var worker = context.actorOf(workerProps) | |
def receive = { | |
case WorkRequest(msg, requestor, true, timeout) => | |
log.debug("WorkRequest({}, {}, true, {})", msg, requestor, timeout) | |
import context.dispatcher | |
worker.ask(msg)(akka.util.Timeout(timeout)) andThen { | |
case Success(result) => | |
requestor ! result | |
manager ! GimmeWork | |
case Failure(result) => | |
requestor ! result | |
manager ! GimmeWork | |
} | |
case WorkRequest(msg, requestor, _, _) => | |
log.debug("WorkRequest({}, {}, false)", msg, requestor) | |
worker.tell(msg, requestor) | |
manager ! GimmeWork | |
case Terminated(actor) if actor == worker => | |
log.info("worker {} TERMINATED; creating new", actor) | |
worker = context.actorOf(workerProps) | |
case response if sender == worker => | |
log.debug("asking for work") | |
manager ! GimmeWork | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
For example, a bounded pool of image processors, so as not to overload a machine.
Pool Creation
Concurrency Control
Fine-grained concurrency control is provided by configuring the Akka deployment for the actor, as well as the number of workers in the pool. The above example specifies 5 workers, so we could create a separate dispatcher for the pool:
Messaging
Any Message
Sending a non-pool message to the pool forwards it as a request expecting a response with a timeout of 30 seconds.
Pool Request Message
For more control, send a
WorkerPool.Request
message, which provides the ability to send a message where a response is not expected, and a user-specified timeout.