Skip to content

Instantly share code, notes, and snippets.

@ngocdaothanh
Created December 5, 2013 12:11
Show Gist options
  • Save ngocdaothanh/d054baad8fb1a4f93283 to your computer and use it in GitHub Desktop.
Save ngocdaothanh/d054baad8fb1a4f93283 to your computer and use it in GitHub Desktop.
reactive.3
package nodescala
import scala.language.postfixOps
import com.sun.net.httpserver._
import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
import scala.async.Async.{async, await}
import scala.collection._
import scala.collection.JavaConversions._
import java.util.concurrent.{Executor, ThreadPoolExecutor, TimeUnit, LinkedBlockingQueue}
import com.sun.net.httpserver.{HttpExchange, HttpHandler, HttpServer}
import java.net.InetSocketAddress
/** Contains utilities common to the NodeScala© framework.
*/
trait NodeScala {
import NodeScala._
def port: Int
def createListener(relativePath: String): Listener
/** Uses the response object to respond to the write the response back.
* The response should be written back in parts, and the method should
* occasionally check that server was not stopped, otherwise a very long
* response may take very long to finish.
*
* @param exchange the exchange used to write the response back
* @param token the cancellation token for
* @param body the response to write back
*/
private def respond(exchange: Exchange, token: CancellationToken, response: Response): Unit = {
while (token.nonCancelled && response.hasNext()) {
val line = response.next()
exchange.write(line)
}
exchange.close()
}
/** A server:
* 1) creates and starts an http listener
* 2) creates a cancellation token (hint: use one of the `Future` companion methods)
* 3) as long as the token is not cancelled and there is a request from the http listener
* asynchronously process that request using the `respond` method
*
* @param relativePath a relative path on which to start listening on
* @param handler a function mapping a request to a response
* @return a subscription that can stop the server and all its asynchronous operations *entirely*.
*/
def start(relativePath: String)(handler: Request => Response): Subscription = {
val listener = createListener(relativePath)
val lsub = listener.start()
val ssub = CancellationTokenSource()
val stoken = ssub.cancellationToken
def nextRequest() {
if (stoken.nonCancelled) {
val f = listener.nextRequest()
f.onSuccess { case (req, ex) =>
val res = handler(req)
// Server should cancel a long-running or infinite response
val rsub = CancellationTokenSource()
val rtoken = ssub.cancellationToken
val delay = Future.delay(1 seconds)
delay.onComplete { case _ => rsub.unsubscribe() }
respond(ex, rtoken, res)
}
f.onComplete { case _ =>
nextRequest()
}
}
}
nextRequest()
Subscription(lsub, ssub);
}
}
object NodeScala {
/** A request is a multimap of headers, where each header is a key-value pair of strings.
*/
type Request = Map[String, List[String]]
/** A response consists of a potentially long string (e.g. a data file).
* To be able to process this string in parts, the response is encoded
* as an iterator over a subsequences of the response string.
*/
type Response = Iterator[String]
/** Used to write the response to the request.
*/
trait Exchange {
/** Writes to the output stream of the exchange.
*/
def write(s: String): Unit
/** Communicates that the response has ended and that there
* will be no further writes.
*/
def close(): Unit
def request: Request
}
object Exchange {
def apply(exchange: HttpExchange) = new Exchange {
val os = exchange.getResponseBody()
exchange.sendResponseHeaders(200, 0L)
def write(s: String) = os.write(s.getBytes)
def close() = os.close()
def request: Request = {
val headers = for ((k, vs) <- exchange.getRequestHeaders) yield (k, vs.toList)
immutable.Map() ++ headers
}
}
}
trait Listener {
def port: Int
def relativePath: String
def start(): Subscription
def createContext(handler: Exchange => Unit): Unit
def removeContext(): Unit
/** Given a relative path:
* 1) constructs an uncompleted promise
* 2) installs an asynchronous request handler using `createContext`
* that completes the promise with a request when it arrives
* and then deregisters itself using `removeContext`
* 3) returns the future with the request
*
* @param relativePath the relative path on which we want to listen to requests
* @return the promise holding the pair of a request and an exchange object
*/
def nextRequest(): Future[(Request, Exchange)] = {
val p = Promise[(Request, Exchange)]()
createContext { ex =>
p.success((ex.request, ex))
removeContext()
}
p.future
}
}
object Listener {
class Default(val port: Int, val relativePath: String) extends Listener {
private val s = HttpServer.create(new InetSocketAddress(port), 0)
private val executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue)
s.setExecutor(executor)
def start() = {
s.start()
new Subscription {
def unsubscribe() = {
s.stop(0)
executor.shutdown()
}
}
}
def createContext(handler: Exchange => Unit) = s.createContext(relativePath, new HttpHandler {
def handle(httpxchg: HttpExchange) = handler(Exchange(httpxchg))
})
def removeContext() = s.removeContext(relativePath)
}
}
/** The standard server implementation.
*/
class Default(val port: Int) extends NodeScala {
def createListener(relativePath: String) = new Listener.Default(port, relativePath)
}
}
import scala.language.postfixOps
import scala.util._
import scala.util.control.NonFatal
import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
import scala.async.Async.{async, await}
import scala.actors.threadpool.TimeUnit
/** Contains basic data types, data structures and `Future` extensions.
*/
package object nodescala {
class FulfilPromiseAfter(p: Promise[Unit], t: Duration) extends Runnable {
def run() {
blocking {
Thread.sleep(t.toMillis)
p.success()
}
}
}
/** Adds extensions methods to the `Future` companion object.
*/
implicit class FutureCompanionOps[T](val f: Future.type) extends AnyVal {
/** Returns a future that is always completed with `value`.
*/
def always[T](value: T): Future[T] = {
Future.successful(value)
}
/** Returns a future that is never completed.
*
* This future may be useful when testing if timeout logic works correctly.
*/
def never[T]: Future[T] = {
val p = Promise[T]()
p.future
}
/** Given a list of futures `fs`, returns the future holding the list of values of all the futures from `fs`.
* The returned future is completed only once all of the futures in `fs` have been completed.
* The values in the list are in the same order as corresponding futures `fs`.
* If any of the futures `fs` fails, the resulting future also fails.
*/
def all[T](fs: List[Future[T]]): Future[List[T]] = {
Future.sequence(fs)
}
/** Given a list of futures `fs`, returns the future holding the value of the future from `fs` that completed first.
* If the first completing future in `fs` fails, then the result is failed as well.
*
* E.g.:
*
* Future.any(List(Future { 1 }, Future { 2 }, Future { throw new Exception }))
*
* may return a `Future` succeeded with `1`, `2` or failed with an `Exception`.
*/
def any[T](fs: List[Future[T]]): Future[T] = {
Future.firstCompletedOf(fs)
}
/** Returns a future with a unit value that is completed after time `t`.
*/
def delay(t: Duration): Future[Unit] = {
val p = Promise[Unit]()
val th = new Thread(new FulfilPromiseAfter(p, t))
th.start()
p.future
}
/** Completes this future with user input.
*/
def userInput(message: String): Future[String] = Future {
readLine(message)
}
/** Creates a cancellable context for an execution and runs it.
*/
def run()(f: CancellationToken => Future[Unit]): Subscription = {
val ret = CancellationTokenSource()
f(ret.cancellationToken)
ret
}
}
/** Adds extension methods to future objects.
*/
implicit class FutureOps[T](val f: Future[T]) extends AnyVal {
/** Returns the result of this future if it is completed now.
* Otherwise, throws a `NoSuchElementException`.
*
* Note: This method does not wait for the result.
* It is thus non-blocking.
* However, it is also non-deterministic -- it may throw or return a value
* depending on the current state of the `Future`.
*/
def now: T = {
if (f.isCompleted) {
f.value match {
case None => throw new NoSuchElementException
case Some(tri) => tri.get
}
} else {
throw new NoSuchElementException
}
}
/** Continues the computation of this future by taking the current future
* and mapping it into another future.
*
* The function `cont` is called only after the current future completes.
* The resulting future contains a value returned by `cont`.
*/
def continueWith[S](cont: Future[T] => S): Future[S] = {
f.map { v => cont(f) }
}
/** Continues the computation of this future by taking the result
* of the current future and mapping it into another future.
*
* The function `cont` is called only after the current future completes.
* The resulting future contains a value returned by `cont`.
*/
def continue[S](cont: Try[T] => S): Future[S] = {
f.transform({ _ => cont(f.value.get) }, { t => t })
}
}
/** Subscription objects are used to be able to unsubscribe
* from some event source.
*/
trait Subscription {
def unsubscribe(): Unit
}
object Subscription {
/** Given two subscriptions `s1` and `s2` returns a new composite subscription
* such that when the new composite subscription cancels both `s1` and `s2`
* when `unsubscribe` is called.
*/
def apply(s1: Subscription, s2: Subscription) = new Subscription {
def unsubscribe() {
s1.unsubscribe()
s2.unsubscribe()
}
}
}
/** Used to check if cancellation was requested.
*/
trait CancellationToken {
def isCancelled: Boolean
def nonCancelled = !isCancelled
}
/** The `CancellationTokenSource` is a special kind of `Subscription` that
* returns a `cancellationToken` which is cancelled by calling `unsubscribe`.
*
* After calling `unsubscribe` once, the associated `cancellationToken` will
* forever remain cancelled -- its `isCancelled` will return `false.
*/
trait CancellationTokenSource extends Subscription {
def cancellationToken: CancellationToken
}
/** Creates cancellation token sources.
*/
object CancellationTokenSource {
/** Creates a new `CancellationTokenSource`.
*/
def apply(): CancellationTokenSource = new CancellationTokenSource {
val p = Promise[Unit]()
val cancellationToken = new CancellationToken {
def isCancelled = p.future.value != None
}
def unsubscribe() {
p.trySuccess(())
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment