Created
December 5, 2013 12:11
-
-
Save ngocdaothanh/d054baad8fb1a4f93283 to your computer and use it in GitHub Desktop.
reactive.3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package nodescala | |
import scala.language.postfixOps | |
import com.sun.net.httpserver._ | |
import scala.concurrent._ | |
import scala.concurrent.duration._ | |
import ExecutionContext.Implicits.global | |
import scala.async.Async.{async, await} | |
import scala.collection._ | |
import scala.collection.JavaConversions._ | |
import java.util.concurrent.{Executor, ThreadPoolExecutor, TimeUnit, LinkedBlockingQueue} | |
import com.sun.net.httpserver.{HttpExchange, HttpHandler, HttpServer} | |
import java.net.InetSocketAddress | |
/** Contains utilities common to the NodeScala© framework. | |
*/ | |
trait NodeScala { | |
import NodeScala._ | |
def port: Int | |
def createListener(relativePath: String): Listener | |
/** Uses the response object to respond to the write the response back. | |
* The response should be written back in parts, and the method should | |
* occasionally check that server was not stopped, otherwise a very long | |
* response may take very long to finish. | |
* | |
* @param exchange the exchange used to write the response back | |
* @param token the cancellation token for | |
* @param body the response to write back | |
*/ | |
private def respond(exchange: Exchange, token: CancellationToken, response: Response): Unit = { | |
while (token.nonCancelled && response.hasNext()) { | |
val line = response.next() | |
exchange.write(line) | |
} | |
exchange.close() | |
} | |
/** A server: | |
* 1) creates and starts an http listener | |
* 2) creates a cancellation token (hint: use one of the `Future` companion methods) | |
* 3) as long as the token is not cancelled and there is a request from the http listener | |
* asynchronously process that request using the `respond` method | |
* | |
* @param relativePath a relative path on which to start listening on | |
* @param handler a function mapping a request to a response | |
* @return a subscription that can stop the server and all its asynchronous operations *entirely*. | |
*/ | |
def start(relativePath: String)(handler: Request => Response): Subscription = { | |
val listener = createListener(relativePath) | |
val lsub = listener.start() | |
val ssub = CancellationTokenSource() | |
val stoken = ssub.cancellationToken | |
def nextRequest() { | |
if (stoken.nonCancelled) { | |
val f = listener.nextRequest() | |
f.onSuccess { case (req, ex) => | |
val res = handler(req) | |
// Server should cancel a long-running or infinite response | |
val rsub = CancellationTokenSource() | |
val rtoken = ssub.cancellationToken | |
val delay = Future.delay(1 seconds) | |
delay.onComplete { case _ => rsub.unsubscribe() } | |
respond(ex, rtoken, res) | |
} | |
f.onComplete { case _ => | |
nextRequest() | |
} | |
} | |
} | |
nextRequest() | |
Subscription(lsub, ssub); | |
} | |
} | |
object NodeScala { | |
/** A request is a multimap of headers, where each header is a key-value pair of strings. | |
*/ | |
type Request = Map[String, List[String]] | |
/** A response consists of a potentially long string (e.g. a data file). | |
* To be able to process this string in parts, the response is encoded | |
* as an iterator over a subsequences of the response string. | |
*/ | |
type Response = Iterator[String] | |
/** Used to write the response to the request. | |
*/ | |
trait Exchange { | |
/** Writes to the output stream of the exchange. | |
*/ | |
def write(s: String): Unit | |
/** Communicates that the response has ended and that there | |
* will be no further writes. | |
*/ | |
def close(): Unit | |
def request: Request | |
} | |
object Exchange { | |
def apply(exchange: HttpExchange) = new Exchange { | |
val os = exchange.getResponseBody() | |
exchange.sendResponseHeaders(200, 0L) | |
def write(s: String) = os.write(s.getBytes) | |
def close() = os.close() | |
def request: Request = { | |
val headers = for ((k, vs) <- exchange.getRequestHeaders) yield (k, vs.toList) | |
immutable.Map() ++ headers | |
} | |
} | |
} | |
trait Listener { | |
def port: Int | |
def relativePath: String | |
def start(): Subscription | |
def createContext(handler: Exchange => Unit): Unit | |
def removeContext(): Unit | |
/** Given a relative path: | |
* 1) constructs an uncompleted promise | |
* 2) installs an asynchronous request handler using `createContext` | |
* that completes the promise with a request when it arrives | |
* and then deregisters itself using `removeContext` | |
* 3) returns the future with the request | |
* | |
* @param relativePath the relative path on which we want to listen to requests | |
* @return the promise holding the pair of a request and an exchange object | |
*/ | |
def nextRequest(): Future[(Request, Exchange)] = { | |
val p = Promise[(Request, Exchange)]() | |
createContext { ex => | |
p.success((ex.request, ex)) | |
removeContext() | |
} | |
p.future | |
} | |
} | |
object Listener { | |
class Default(val port: Int, val relativePath: String) extends Listener { | |
private val s = HttpServer.create(new InetSocketAddress(port), 0) | |
private val executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue) | |
s.setExecutor(executor) | |
def start() = { | |
s.start() | |
new Subscription { | |
def unsubscribe() = { | |
s.stop(0) | |
executor.shutdown() | |
} | |
} | |
} | |
def createContext(handler: Exchange => Unit) = s.createContext(relativePath, new HttpHandler { | |
def handle(httpxchg: HttpExchange) = handler(Exchange(httpxchg)) | |
}) | |
def removeContext() = s.removeContext(relativePath) | |
} | |
} | |
/** The standard server implementation. | |
*/ | |
class Default(val port: Int) extends NodeScala { | |
def createListener(relativePath: String) = new Listener.Default(port, relativePath) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.language.postfixOps | |
import scala.util._ | |
import scala.util.control.NonFatal | |
import scala.concurrent._ | |
import scala.concurrent.duration._ | |
import ExecutionContext.Implicits.global | |
import scala.async.Async.{async, await} | |
import scala.actors.threadpool.TimeUnit | |
/** Contains basic data types, data structures and `Future` extensions. | |
*/ | |
package object nodescala { | |
class FulfilPromiseAfter(p: Promise[Unit], t: Duration) extends Runnable { | |
def run() { | |
blocking { | |
Thread.sleep(t.toMillis) | |
p.success() | |
} | |
} | |
} | |
/** Adds extensions methods to the `Future` companion object. | |
*/ | |
implicit class FutureCompanionOps[T](val f: Future.type) extends AnyVal { | |
/** Returns a future that is always completed with `value`. | |
*/ | |
def always[T](value: T): Future[T] = { | |
Future.successful(value) | |
} | |
/** Returns a future that is never completed. | |
* | |
* This future may be useful when testing if timeout logic works correctly. | |
*/ | |
def never[T]: Future[T] = { | |
val p = Promise[T]() | |
p.future | |
} | |
/** Given a list of futures `fs`, returns the future holding the list of values of all the futures from `fs`. | |
* The returned future is completed only once all of the futures in `fs` have been completed. | |
* The values in the list are in the same order as corresponding futures `fs`. | |
* If any of the futures `fs` fails, the resulting future also fails. | |
*/ | |
def all[T](fs: List[Future[T]]): Future[List[T]] = { | |
Future.sequence(fs) | |
} | |
/** Given a list of futures `fs`, returns the future holding the value of the future from `fs` that completed first. | |
* If the first completing future in `fs` fails, then the result is failed as well. | |
* | |
* E.g.: | |
* | |
* Future.any(List(Future { 1 }, Future { 2 }, Future { throw new Exception })) | |
* | |
* may return a `Future` succeeded with `1`, `2` or failed with an `Exception`. | |
*/ | |
def any[T](fs: List[Future[T]]): Future[T] = { | |
Future.firstCompletedOf(fs) | |
} | |
/** Returns a future with a unit value that is completed after time `t`. | |
*/ | |
def delay(t: Duration): Future[Unit] = { | |
val p = Promise[Unit]() | |
val th = new Thread(new FulfilPromiseAfter(p, t)) | |
th.start() | |
p.future | |
} | |
/** Completes this future with user input. | |
*/ | |
def userInput(message: String): Future[String] = Future { | |
readLine(message) | |
} | |
/** Creates a cancellable context for an execution and runs it. | |
*/ | |
def run()(f: CancellationToken => Future[Unit]): Subscription = { | |
val ret = CancellationTokenSource() | |
f(ret.cancellationToken) | |
ret | |
} | |
} | |
/** Adds extension methods to future objects. | |
*/ | |
implicit class FutureOps[T](val f: Future[T]) extends AnyVal { | |
/** Returns the result of this future if it is completed now. | |
* Otherwise, throws a `NoSuchElementException`. | |
* | |
* Note: This method does not wait for the result. | |
* It is thus non-blocking. | |
* However, it is also non-deterministic -- it may throw or return a value | |
* depending on the current state of the `Future`. | |
*/ | |
def now: T = { | |
if (f.isCompleted) { | |
f.value match { | |
case None => throw new NoSuchElementException | |
case Some(tri) => tri.get | |
} | |
} else { | |
throw new NoSuchElementException | |
} | |
} | |
/** Continues the computation of this future by taking the current future | |
* and mapping it into another future. | |
* | |
* The function `cont` is called only after the current future completes. | |
* The resulting future contains a value returned by `cont`. | |
*/ | |
def continueWith[S](cont: Future[T] => S): Future[S] = { | |
f.map { v => cont(f) } | |
} | |
/** Continues the computation of this future by taking the result | |
* of the current future and mapping it into another future. | |
* | |
* The function `cont` is called only after the current future completes. | |
* The resulting future contains a value returned by `cont`. | |
*/ | |
def continue[S](cont: Try[T] => S): Future[S] = { | |
f.transform({ _ => cont(f.value.get) }, { t => t }) | |
} | |
} | |
/** Subscription objects are used to be able to unsubscribe | |
* from some event source. | |
*/ | |
trait Subscription { | |
def unsubscribe(): Unit | |
} | |
object Subscription { | |
/** Given two subscriptions `s1` and `s2` returns a new composite subscription | |
* such that when the new composite subscription cancels both `s1` and `s2` | |
* when `unsubscribe` is called. | |
*/ | |
def apply(s1: Subscription, s2: Subscription) = new Subscription { | |
def unsubscribe() { | |
s1.unsubscribe() | |
s2.unsubscribe() | |
} | |
} | |
} | |
/** Used to check if cancellation was requested. | |
*/ | |
trait CancellationToken { | |
def isCancelled: Boolean | |
def nonCancelled = !isCancelled | |
} | |
/** The `CancellationTokenSource` is a special kind of `Subscription` that | |
* returns a `cancellationToken` which is cancelled by calling `unsubscribe`. | |
* | |
* After calling `unsubscribe` once, the associated `cancellationToken` will | |
* forever remain cancelled -- its `isCancelled` will return `false. | |
*/ | |
trait CancellationTokenSource extends Subscription { | |
def cancellationToken: CancellationToken | |
} | |
/** Creates cancellation token sources. | |
*/ | |
object CancellationTokenSource { | |
/** Creates a new `CancellationTokenSource`. | |
*/ | |
def apply(): CancellationTokenSource = new CancellationTokenSource { | |
val p = Promise[Unit]() | |
val cancellationToken = new CancellationToken { | |
def isCancelled = p.future.value != None | |
} | |
def unsubscribe() { | |
p.trySuccess(()) | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment