Created
September 14, 2010 05:54
-
-
Save zentrope/578610 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// A small chat server in order to work out the details of a | |
// socket server using Actors. And maybe just a little attempt | |
// to wrap TCP itself inside an Actor, kinda like Erlang does | |
// it. The whole idea is to try and make things seem simpler, | |
// ultimately, and to remove any shared state. | |
package zentrope.chat { | |
import java.net.Socket | |
import java.net.ServerSocket | |
import scala.actors.DaemonActor | |
import scala.actors.Actor.actor | |
object Util { | |
def addShutdownHook(body: => Unit) = | |
Runtime.getRuntime.addShutdownHook(new Thread { | |
override def run { body } | |
}) | |
} | |
object Router extends DaemonActor { | |
// Routes messages received from one client to | |
// all the other participating clients and also | |
// manages the list of participating clients. | |
case class AddClient(client: Client) | |
case class RemoveClient(client: Client) | |
case class Broadcast(msg: String) | |
def add(client: Client) = { | |
println (" :: router adding " + client) ; | |
this ! AddClient(client) | |
} | |
def remove(client: Client) = { | |
println (" :: router removing " + client) | |
this ! RemoveClient(client) | |
} | |
def broadcast(msg: String) = | |
this ! Broadcast(msg) | |
def act = { | |
var clients: List[Client] = List.empty | |
loop { react { | |
case AddClient(client) => | |
clients = client :: clients | |
case RemoveClient(client) => | |
clients = clients filter { c => c != client } | |
case Broadcast(msg) => | |
clients foreach { client => | |
client.broadcast(msg) | |
} | |
}} | |
} | |
} | |
//========================================================================= | |
class Client(socket: Socket, id: Int) extends DaemonActor { | |
private case class TcpLine(line: String) | |
private case class TcpSend(line: String) | |
private case class TcpClose(reason: String) | |
private case class TcpRead() | |
import java.io._ | |
// Implicit methods tell the compiler how to turn the parameter's type | |
// into the return type. So, when you assign an InputStream to a | |
// val that is typed BufferedReader, this method is called. | |
private implicit def inputStreamWrapper(in: InputStream): BufferedReader = | |
new BufferedReader(new InputStreamReader(in)) | |
private implicit def outputStreamWrapper(out: OutputStream): PrintWriter = | |
new PrintWriter(new OutputStreamWriter(out)) | |
override def toString(): String = { | |
format("<client:%d>", id) | |
} | |
def broadcast(msg: String) { | |
this ! TcpSend(msg) | |
} | |
private def handle(func: => Unit) = { | |
actor { | |
try { | |
func | |
} | |
catch { | |
case t: Throwable => | |
this ! TcpClose(t.getMessage) | |
} | |
} | |
} | |
private def read(in: BufferedReader) = { | |
handle { | |
val line = in.readLine() | |
line match { | |
case null => | |
throw new Exception("Client closed socket.") | |
case "quit" => | |
throw new Exception("Client quits.") | |
case _ => | |
this ! TcpLine(line) | |
} | |
} | |
} | |
private def write(out: PrintWriter, line: String) = { | |
handle { | |
val out: PrintWriter = socket.getOutputStream() | |
out.println(line) | |
out.flush() | |
} | |
} | |
override def start() = { | |
super.start() | |
this ! TcpRead | |
this | |
} | |
def act() { | |
val in: BufferedReader = socket.getInputStream() | |
val out: PrintWriter = socket.getOutputStream() | |
Router.add(this) | |
var keepGoing = true | |
loopWhile(keepGoing) { react { | |
case TcpRead => | |
read(in) | |
case TcpSend(line) => | |
write(out, line) | |
case TcpLine(line) => | |
val msg = format("client.%d> %s", id, line) ; | |
println(msg) ; | |
Router.broadcast(msg) ; | |
this ! TcpRead | |
case TcpClose(reason) => | |
println(format(" :: closing client [%d] due to [%s].", id, reason)) | |
socket.close() ; | |
Router.remove(this) ; | |
keepGoing = false | |
}} | |
} | |
} | |
//========================================================================= | |
class ChatServer(val port: Int) { | |
// Accepts incoming connections and spawns an actor | |
// for each one. | |
var serverSocket: Option[ServerSocket] = None | |
def loopUntilSocketClosed(func: => Unit) = { | |
try { | |
while (true) { | |
func | |
} | |
} | |
catch { | |
case (t: Throwable) => { | |
println(format(" :: server shutting down due to: [%s]", | |
t.getMessage)) | |
} | |
} | |
} | |
def start() = { | |
Router.start | |
serverSocket = Some(new ServerSocket(port)) | |
var clientId: Int = 0 | |
loopUntilSocketClosed { | |
println(" :: waiting for a connection (aren't we all)") | |
val clientSocket = serverSocket.get.accept() | |
println(" :: got a connection") | |
val client = new Client(clientSocket, clientId) | |
println(" :: starting client actor " + client) | |
client.start | |
clientId += 1 | |
} | |
println(" :: no longer accepting connections") | |
} | |
def stop() { | |
println(" :: closing server socket") | |
serverSocket match { | |
case None => | |
; | |
case Some(s) => | |
s.close() | |
} | |
} | |
} | |
//========================================================================= | |
object Main { | |
def stopServer(server: Option[ChatServer]) = server match { | |
case None => | |
; | |
case Some(s) => | |
s.stop() | |
} | |
def main(args: Array[String]) { | |
println("Hello chat."); | |
println(" :: starting server") | |
val server = Some(new ChatServer(9999)) | |
Util.addShutdownHook { | |
println("\n :: shutdown hook stopping server") | |
stopServer(server) | |
} | |
try { | |
server.get.start() | |
} | |
catch { | |
case t: Throwable => | |
println(" :: " + t); | |
} | |
println(" :: done serving") | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
A nearly featureless chat server, with some protection, but not really. It can be overwhelmed by too many connections, for instance.
Anyway, the challenge was to acquaint myself with how to write what is essentially a threaded socket server using actors. Notice there's never a mention of a thread!
The "state" of the chat server is maintained in an actor called the Router. When a client gets a line over a socket, it sends it to the router, which in turn sends a message to each client.
Part of there reason this is so large is because I was sorta going down the route of Erlang's TCP modules which hide a lot of the read/write details from the users of the them. For instance, I could imagine generalizing the above a little to make it a Line Oriented TCP Client of some sort, and then just registering handlers with it.
Or something.
Another adjustment would be to get the ChatServer class itself to be an Actor rather than an infinite loop. But it's late. So I'm going to leave it at that.