Created
August 10, 2012 20:37
-
-
Save jamie-allen/3317661 to your computer and use it in GitHub Desktop.
Simple example of how to use Akka IOManager Iteratee and exporting work to another actor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor._ | |
import akka.pattern.ask | |
import akka.util._ | |
import akka.util.duration._ | |
import scala.util.control.Exception._ | |
/** | |
* To test, execute this code and use this command in a shell: "telnet localhost 8080" | |
* At the prompt, type in numbers and press enter, and they will be accumulated, returning | |
* the total value each time. | |
*/ | |
object IoManagerBootstrap extends App { | |
class Accumulator extends Actor { | |
var total = 0 | |
def receive = { | |
case i: Int => total += i; sender ! total | |
} | |
} | |
class ServerActor extends Actor { | |
// Note - in a real world impl, we'd use a different dispatcher for the accumulator actor | |
val accumulator = context.actorOf(Props[Accumulator]) | |
def receive = { | |
case IO.Listening(server, address) => | |
println("The server is listening on socket " + address) | |
case IO.Connected(socket, address) => | |
println("Successfully connected to " + address) | |
case IO.NewClient(server) => | |
println("New incoming connection on server") | |
val socket = server.accept() | |
case IO.Read(socket, bytes) => | |
implicit val timeout: Timeout = 2 seconds | |
// Data arrives with CR/LF chars at end. Strip them off, but re-add them to the ByteString return value | |
val futTotal = accumulator ? (catching(classOf[NumberFormatException]) (bytes.utf8String.dropRight(2)).toInt) | |
futTotal map { res => socket.asWritable.write(ByteString(res.toString) ++ ByteString(13, 10)) } | |
case IO.Closed(socket: IO.SocketHandle, cause) => | |
println("Socket has closed, cause: " + cause) | |
case IO.Closed(server: IO.ServerHandle, cause) => | |
println("Server socket has closed, cause: " + cause) | |
} | |
} | |
val actorSystem = ActorSystem() | |
val serverActor = actorSystem.actorOf(Props[ServerActor]) | |
val socket = IOManager(actorSystem).listen("localhost", 8080)(serverActor) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment