Last active
December 12, 2021 12:36
-
-
Save Diego81/9887105 to your computer and use it in GitHub Desktop.
Simple Word Counter implemented using Akka
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.{ Actor, ActorRef, Props, ActorSystem } | |
case class ProcessStringMsg(string: String) | |
case class StringProcessedMsg(words: Integer) | |
class StringCounterActor extends Actor { | |
def receive = { | |
case ProcessStringMsg(string) => { | |
val wordsInLine = string.split(" ").length | |
sender ! StringProcessedMsg(wordsInLine) | |
} | |
case _ => println("Error: message not recognized") | |
} | |
} | |
case class StartProcessFileMsg() | |
class WordCounterActor(filename: String) extends Actor { | |
private var running = false | |
private var totalLines = 0 | |
private var linesProcessed = 0 | |
private var result = 0 | |
private var fileSender: Option[ActorRef] = None | |
def receive = { | |
case StartProcessFileMsg() => { | |
if (running) { | |
// println just used for example purposes; | |
// Akka logger should be used instead | |
println("Warning: duplicate start message received") | |
} else { | |
running = true | |
fileSender = Some(sender) // save reference to process invoker | |
import scala.io.Source._ | |
fromFile(filename).getLines.foreach { line => | |
context.actorOf(Props[StringCounterActor]) ! ProcessStringMsg(line) | |
totalLines += 1 | |
} | |
} | |
} | |
case StringProcessedMsg(words) => { | |
result += words | |
linesProcessed += 1 | |
if (linesProcessed == totalLines) { | |
fileSender.map(_ ! result) // provide result to process invoker | |
} | |
} | |
case _ => println("message not recognized!") | |
} | |
} | |
object Sample extends App { | |
import akka.util.Timeout | |
import scala.concurrent.duration._ | |
import akka.pattern.ask | |
import akka.dispatch.ExecutionContexts._ | |
implicit val ec = global | |
override def main(args: Array[String]) { | |
val system = ActorSystem("System") | |
val actor = system.actorOf(Props(new WordCounterActor(args(0)))) | |
implicit val timeout = Timeout(25 seconds) | |
val future = actor ? StartProcessFileMsg() | |
future.map { result => | |
println("Total number of words " + result) | |
system.shutdown | |
} | |
} | |
} |
Thanks for the tutorial! Though it didn't run as expected for me. I would recommend giving the precise invocation that would run this code, along with the scala version etc.
I am running with scala 2.11.8, and I get an NPE in the main method. I see the following in App's documentation:
@deprecatedOverriding( message = "main should not be overridden" , since = "2.11.0" )
I modified the SampleApp class to look like this:
import scala.concurrent.duration._
import akka.actor.{ActorSystem, Props}
import akka.dispatch.ExecutionContexts._
import akka.pattern.ask
import akka.util.Timeout
object Sample extends App {
implicit val ec = global
val system = ActorSystem("System")
val actor = system.actorOf(Props(new WordCounterActor(args(0))))
implicit val timeout = Timeout(25.seconds)
val future = actor ? StartProcessFileMsg()
future.map { result =>
println("Total number of words " + result)
system.shutdown
}
}
It also isn't obvious that the program needs an argument, the file whose words will be counted.
Finally, there's a deprecation warning I haven't yet tracked down:
[warn] The -
command is deprecated in favor of onFailure
and will be removed in 0.14.0
Again, thanks for the tutorial! Still going over how the program works.
Sunil
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
(Kibitzing on the questions, as requested on akka-user.)
It isn't -- the ActorContext is part and parcel of the Actor itself, and is automatically built by the system without application intervention. (Or do you mean the ExecutionContext? If so, that's pulled in implicitly on line 59.)
Ah -- that signature just isn't obvious. That's actually a constructor call, which is creating a Props object. Since the Actor doesn't take any constructor parameters, the Props call doesn't need any params, and can omit the parentheses. It's a little confusing when you're learning, though.
Hmm. Good question -- it isn't obvious why WordCounter is checking for duplication. In general, Akka is "at most once delivery", so messages can get dropped (particularly when sending between nodes); if you need a higher degree of reliability, you sometimes need to introduce timeouts and retries at the sending end, and that usually results in needing duplication checks at the receiving end.
But that doesn't appear to be the case here, so I suspect this is just defensively coded against a case that doesn't actually happen here.
Doesn't need to -- parents can always access their children. (That's built into ActorContext.)
In a more realistic example, probably so.
In a longer-lived app, yes -- this example just isn't showing that. But yes, in a more realistic example, you might well have the parent shutting down its children, or simply shutting itself down (which will kill its children): the latter would be done by adding
after line 46.
The preferred approach depends on the requirements of the application; for example, in my application the children are mostly in charge of their own lifespans, and if they don't get any traffic for a certain period of time they work with their parents to shut down cleanly. But there is no one-size-fits-all answer -- it depends entirely on how your app works.