-
-
Save Diego81/9887105 to your computer and use it in GitHub Desktop.
import akka.actor.{ Actor, ActorRef, Props, ActorSystem } | |
case class ProcessStringMsg(string: String) | |
case class StringProcessedMsg(words: Integer) | |
class StringCounterActor extends Actor { | |
def receive = { | |
case ProcessStringMsg(string) => { | |
val wordsInLine = string.split(" ").length | |
sender ! StringProcessedMsg(wordsInLine) | |
} | |
case _ => println("Error: message not recognized") | |
} | |
} | |
case class StartProcessFileMsg() | |
class WordCounterActor(filename: String) extends Actor { | |
private var running = false | |
private var totalLines = 0 | |
private var linesProcessed = 0 | |
private var result = 0 | |
private var fileSender: Option[ActorRef] = None | |
def receive = { | |
case StartProcessFileMsg() => { | |
if (running) { | |
// println just used for example purposes; | |
// Akka logger should be used instead | |
println("Warning: duplicate start message received") | |
} else { | |
running = true | |
fileSender = Some(sender) // save reference to process invoker | |
import scala.io.Source._ | |
fromFile(filename).getLines.foreach { line => | |
context.actorOf(Props[StringCounterActor]) ! ProcessStringMsg(line) | |
totalLines += 1 | |
} | |
} | |
} | |
case StringProcessedMsg(words) => { | |
result += words | |
linesProcessed += 1 | |
if (linesProcessed == totalLines) { | |
fileSender.map(_ ! result) // provide result to process invoker | |
} | |
} | |
case _ => println("message not recognized!") | |
} | |
} | |
object Sample extends App { | |
import akka.util.Timeout | |
import scala.concurrent.duration._ | |
import akka.pattern.ask | |
import akka.dispatch.ExecutionContexts._ | |
implicit val ec = global | |
override def main(args: Array[String]) { | |
val system = ActorSystem("System") | |
val actor = system.actorOf(Props(new WordCounterActor(args(0)))) | |
implicit val timeout = Timeout(25 seconds) | |
val future = actor ? StartProcessFileMsg() | |
future.map { result => | |
println("Total number of words " + result) | |
system.shutdown | |
} | |
} | |
} |
Hi there! Beautiful learning example! THANK YOU so much for sharing this!
Couple followup questions, and welcome to direct me to further reference Links for me to find out:
-
Not totally understanding this line L37:
context.actorOf(Props[StringCounterActor]) ,,,
ie. How is Context Initialized? -- Are you using some external config property file that's not in this GitHub repo?
How is Props initialized? -- Are you using some external config property file that's not in this GitHub repo? -
What's the best-practice for checking for duplicate messages in Actors --
i.e. Why are you checking for that in the WordCounter Actor; but not in the StringCounter Actor? -
What's the best-practice for Parent delegation to Child Actors --
i.e. Should the WordCounter Parent Actor be saving references to all of its created Children; and control how many
instances get created?
Should WordCounter be messaging its Children to shutdown; instead of calling a system.shutdown?
THANKS in advance for any followup info!
Dagny T
(Kibitzing on the questions, as requested on akka-user.)
How is Context Initialized?
It isn't -- the ActorContext is part and parcel of the Actor itself, and is automatically built by the system without application intervention. (Or do you mean the ExecutionContext? If so, that's pulled in implicitly on line 59.)
How is Props initialized?
Ah -- that signature just isn't obvious. That's actually a constructor call, which is creating a Props object. Since the Actor doesn't take any constructor parameters, the Props call doesn't need any params, and can omit the parentheses. It's a little confusing when you're learning, though.
Why are you checking for that in the WordCounter Actor; but not in the StringCounter Actor?
Hmm. Good question -- it isn't obvious why WordCounter is checking for duplication. In general, Akka is "at most once delivery", so messages can get dropped (particularly when sending between nodes); if you need a higher degree of reliability, you sometimes need to introduce timeouts and retries at the sending end, and that usually results in needing duplication checks at the receiving end.
But that doesn't appear to be the case here, so I suspect this is just defensively coded against a case that doesn't actually happen here.
Should the WordCounter Parent Actor be saving references to all of its created Children?
Doesn't need to -- parents can always access their children. (That's built into ActorContext.)
control how many instances get created?
In a more realistic example, probably so.
Should WordCounter be messaging its Children to shutdown; instead of calling a system.shutdown?
In a longer-lived app, yes -- this example just isn't showing that. But yes, in a more realistic example, you might well have the parent shutting down its children, or simply shutting itself down (which will kill its children): the latter would be done by adding
context.stop(self)
after line 46.
The preferred approach depends on the requirements of the application; for example, in my application the children are mostly in charge of their own lifespans, and if they don't get any traffic for a certain period of time they work with their parents to shut down cleanly. But there is no one-size-fits-all answer -- it depends entirely on how your app works.
Thanks for the tutorial! Though it didn't run as expected for me. I would recommend giving the precise invocation that would run this code, along with the scala version etc.
I am running with scala 2.11.8, and I get an NPE in the main method. I see the following in App's documentation:
@deprecatedOverriding( message = "main should not be overridden" , since = "2.11.0" )
I modified the SampleApp class to look like this:
import scala.concurrent.duration._
import akka.actor.{ActorSystem, Props}
import akka.dispatch.ExecutionContexts._
import akka.pattern.ask
import akka.util.Timeout
object Sample extends App {
implicit val ec = global
val system = ActorSystem("System")
val actor = system.actorOf(Props(new WordCounterActor(args(0))))
implicit val timeout = Timeout(25.seconds)
val future = actor ? StartProcessFileMsg()
future.map { result =>
println("Total number of words " + result)
system.shutdown
}
}
It also isn't obvious that the program needs an argument, the file whose words will be counted.
Finally, there's a deprecation warning I haven't yet tracked down:
[warn] The -
command is deprecated in favor of onFailure
and will be removed in 0.14.0
Again, thanks for the tutorial! Still going over how the program works.
Sunil
I had to move
implicit val ec = global
inside the main method to make it work :)https://www.toptal.com/scala/concurrency-and-fault-tolerance-made-easy-an-intro-to-akka#comment-1776147740