Last active
December 12, 2021 12:36
-
-
Save Diego81/9887105 to your computer and use it in GitHub Desktop.
Simple Word Counter implemented using Akka
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.{ Actor, ActorRef, Props, ActorSystem } | |
case class ProcessStringMsg(string: String) | |
case class StringProcessedMsg(words: Integer) | |
class StringCounterActor extends Actor { | |
def receive = { | |
case ProcessStringMsg(string) => { | |
val wordsInLine = string.split(" ").length | |
sender ! StringProcessedMsg(wordsInLine) | |
} | |
case _ => println("Error: message not recognized") | |
} | |
} | |
case class StartProcessFileMsg() | |
class WordCounterActor(filename: String) extends Actor { | |
private var running = false | |
private var totalLines = 0 | |
private var linesProcessed = 0 | |
private var result = 0 | |
private var fileSender: Option[ActorRef] = None | |
def receive = { | |
case StartProcessFileMsg() => { | |
if (running) { | |
// println just used for example purposes; | |
// Akka logger should be used instead | |
println("Warning: duplicate start message received") | |
} else { | |
running = true | |
fileSender = Some(sender) // save reference to process invoker | |
import scala.io.Source._ | |
fromFile(filename).getLines.foreach { line => | |
context.actorOf(Props[StringCounterActor]) ! ProcessStringMsg(line) | |
totalLines += 1 | |
} | |
} | |
} | |
case StringProcessedMsg(words) => { | |
result += words | |
linesProcessed += 1 | |
if (linesProcessed == totalLines) { | |
fileSender.map(_ ! result) // provide result to process invoker | |
} | |
} | |
case _ => println("message not recognized!") | |
} | |
} | |
object Sample extends App { | |
import akka.util.Timeout | |
import scala.concurrent.duration._ | |
import akka.pattern.ask | |
import akka.dispatch.ExecutionContexts._ | |
implicit val ec = global | |
override def main(args: Array[String]) { | |
val system = ActorSystem("System") | |
val actor = system.actorOf(Props(new WordCounterActor(args(0)))) | |
implicit val timeout = Timeout(25 seconds) | |
val future = actor ? StartProcessFileMsg() | |
future.map { result => | |
println("Total number of words " + result) | |
system.shutdown | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for the tutorial! Though it didn't run as expected for me. I would recommend giving the precise invocation that would run this code, along with the scala version etc.
I am running with scala 2.11.8, and I get an NPE in the main method. I see the following in App's documentation:
@deprecatedOverriding( message = "main should not be overridden" , since = "2.11.0" )
I modified the SampleApp class to look like this:
It also isn't obvious that the program needs an argument, the file whose words will be counted.
Finally, there's a deprecation warning I haven't yet tracked down:
[warn] The
-
command is deprecated in favor ofonFailure
and will be removed in 0.14.0Again, thanks for the tutorial! Still going over how the program works.
Sunil