Created
September 6, 2013 16:46
-
-
Save volgar1x/6466455 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.elves | |
import scala.concurrent.{Promise, Future, ExecutionContextExecutorService} | |
import java.nio.channels.{SelectionKey, ServerSocketChannel, Selector} | |
import java.net.SocketAddress | |
import scala.collection.mutable | |
import scala.beans.BeanProperty | |
class TcpService(addr: SocketAddress, executorFactory: () => ExecutionContextExecutorService) extends Service { | |
import JavaConversions.runnable | |
private var running = false | |
private val inputExecutor = executorFactory() | |
private val outputExecutor = executorFactory() | |
private val selector = Selector.open() | |
private val requests = mutable.Queue.empty[Request[TcpSession]] | |
def startUp() { | |
running = true | |
inputExecutor.execute { () => inputWorker() } | |
outputExecutor.execute { () => outputWorker() } | |
} | |
def tearDown() { | |
running = false | |
inputExecutor.shutdown() | |
outputExecutor.shutdown() | |
} | |
private[elves] def request(req: Request[TcpSession]) { | |
requests += req | |
} | |
private def accept() { | |
} | |
private def read(key: SelectionKey) { | |
} | |
private def inputWorker() { | |
val server = ServerSocketChannel.open() | |
server.configureBlocking(false) | |
server.register(selector, SelectionKey.OP_ACCEPT) | |
server.bind(addr) | |
while (running) { | |
if (selector.select() > 0) { | |
val keys = selector.selectedKeys.iterator | |
while (keys.hasNext) { | |
val key = keys.next | |
keys.remove() | |
if (key.isAcceptable) { | |
accept() | |
} else if (key.isReadable) { | |
read(key) | |
} | |
} | |
} | |
} | |
} | |
private def outputWorker() { | |
while (running) { | |
while (running && requests.isEmpty) { | |
Thread sleep 1 | |
} | |
requests.dequeue() match { | |
case Write(session, o, callback) => | |
case Close(session, callback) => | |
} | |
} | |
} | |
} | |
class TcpSession(@BeanProperty val service: TcpService) extends Session { | |
def write(o: Any): Future[this.type] = { | |
val promise = Promise[this.type]() | |
service request Write(this, o, () => promise.success(this)) | |
promise.future | |
} | |
def close(): Future[this.type] = { | |
val promise = Promise[this.type]() | |
service request Close(this, () => promise.success(this)) | |
promise.future | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment