Skip to content

Instantly share code, notes, and snippets.

@matsu-chara
Last active February 21, 2016 05:28
Show Gist options
  • Save matsu-chara/6f99e1800395aa9f9f5a to your computer and use it in GitHub Desktop.
Save matsu-chara/6f99e1800395aa9f9f5a to your computer and use it in GitHub Desktop.
NonBlockingにしないとこうなる
package com.example
import akka.actor._
import akka.pattern._
import com.example.DbActor.Message.{BlockingRequest, NonBlockingRequest, Result}
import scala.concurrent.{ExecutionContext, Future}
object BlockingNonBlockingSample {
def main(args: Array[String]): Unit = {
val system = ActorSystem("mofu")
val ec = system.dispatchers.lookup("db-dispatcher")
val dbActorProp = DbActor.props(ec)
val handler = system.actorOf(HandleActor.props(dbActorProp))
(1 to 1000).foreach { handler ! _ }
Thread.sleep(2000)
system.shutdown()
}
}
class HandleActor(dbActorProps: Props) extends Actor {
val dbActor = context.actorOf(dbActorProps, name = "db-actor")
var total: Int = 0
/**
* BlockingRequestだと20件程度しか処理できない
* NonBlockingRequestだと全件処理できる(application.confのdispatcherの設定次第でスループットは変化する)
*/
def receive: Receive = {
// case x: Int => dbActor ! BlockingRequest(x)
case x: Int => dbActor ! NonBlockingRequest(x)
case Result(x) =>
total = total + x
println(s"res = $x total = $total")
}
}
object HandleActor {
def props(dbActorProps: Props) = Props(classOf[HandleActor], dbActorProps)
}
class DbActor(dbExecutionContext: ExecutionContext) extends Actor {
def receive: Receive = {
case BlockingRequest(x) =>
Thread.sleep(100) // IO wait
sender() ! Result(x)
case NonBlockingRequest(x) =>
val future = Future {
Thread.sleep(100) // IO wait
Result(x)
}(dbExecutionContext)
pipe(future)(context.dispatcher) to sender()
}
}
object DbActor {
def props(dbBlockExecutionContext: ExecutionContext) = Props(classOf[DbActor], dbBlockExecutionContext)
object Message {
sealed trait Message
case class BlockingRequest(x: Int) extends Message
case class NonBlockingRequest(x: Int) extends Message
case class Result(x: Int) extends Message
}
}
package com.example2
import akka.actor._
import akka.routing.RoundRobinGroup
import com.example2.DbActor.Message.{BlockingRequest, Result}
object BlockingNonBlockingSample2 {
def main(args: Array[String]): Unit = {
val system = ActorSystem("mofu")
val dbActorProp = DbActor.props
val handler = system.actorOf(HandleActor.props(dbActorProp))
(1 to 1000).foreach { handler ! _ }
Thread.sleep(2000)
system.shutdown()
}
}
class HandleActor(dbActorProps: Props) extends Actor {
val dbGroup = {
val dbActors = (1 to 100).map { i =>
context.actorOf(
dbActorProps
.withDispatcher("db-dispatcher"),
name = s"db-actor-$i"
)
}
context.actorOf(RoundRobinGroup(dbActors.map(_.path.toStringWithoutAddress)).props(), name = "worker-router")
}
var total: Int = 0
def receive: Receive = {
case x: Int => dbGroup ! BlockingRequest(x)
case Result(x) =>
total = total + x
println(s"res = $x total = $total")
}
}
object HandleActor {
def props(dbActorProps: Props) = Props(classOf[HandleActor], dbActorProps)
}
class DbActor extends Actor {
def receive: Receive = {
case BlockingRequest(x) =>
Thread.sleep(100) // IO wait
sender() ! Result(x)
}
}
object DbActor {
def props = Props(classOf[DbActor])
object Message {
sealed trait Message
case class BlockingRequest(x: Int) extends Message
case class Result(x: Int) extends Message
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment