Skip to content

Instantly share code, notes, and snippets.

@aaronlifton3
Created October 10, 2014 13:05
Show Gist options
  • Save aaronlifton3/1b2cd68a1157e16dd8d8 to your computer and use it in GitHub Desktop.
Save aaronlifton3/1b2cd68a1157e16dd8d8 to your computer and use it in GitHub Desktop.
import akka.actor._
import akka.pattern.{after, ask, pipe}
import akka.util.Timeout
import scala.concurrent.ExecutionContext.Implicits._
import scala.concurrent._
import scala.concurrent.duration._
class Job(jobTracker: ActorRef) extends Actor {
var count = 0
def perform {
count += 1
println("doing work")
}
def receive = {
case JobRequestMessage =>
perform
jobTracker ! JobDoneMessage
case JobStatusMessage =>
val c = count
jobTracker ! JobStatusMessage(s"count is $c")
case JobStopMessage =>
context.stop(self)
case JobReq(worktimes, timeout, async) => {
val searchFutures = worktimes map { worktime =>
doWork(worktime)
val fallback = after(timeout, context.system.scheduler) {
Future successful s"$worktime ms > $timeout"
}
Future firstCompletedOf Seq(searchFuture, fallback)
}
// Pipe future results to sender
(Future sequence searchFutures) pipeTo sender
}
}
class JobTracker extends Actor {
def receive = {
case JobStatusMessage(m) =>
println(s"job status: $m")
case JobDoneMessage =>
println("--job done")
sender ! JobStopMessage
context.stop(self)
}
}
object JobTest {
// extends App
def run = {
println("running")
val system = ActorSystem("JobSystem")
val jobTracker = system.actorOf(Props[JobTracker], name = "jobTracker")
val job = system.actorOf(Props(new Job(jobTracker)), name = "job")
job ! JobRequestMessage
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment