Skip to content

Instantly share code, notes, and snippets.

@seanparsons
Last active December 15, 2015 08:19
Show Gist options
  • Save seanparsons/5229917 to your computer and use it in GitHub Desktop.
Save seanparsons/5229917 to your computer and use it in GitHub Desktop.
Quick attempt at timeout support for Scalaz Futures.
import scalaz._
import Scalaz._
import scalaz.concurrent._
import scala.annotation.tailrec
import java.util.concurrent.locks.ReentrantReadWriteLock
trait Timeout
object Timeout extends Timeout
case class Timer(timeoutTickMs: Int = 100, workerName: String = "TimeoutContextWorker") {
val safeTickMs = if (timeoutTickMs > 0) timeoutTickMs else 1
private[this] val nondeterminism = Nondeterminism[Future]
@volatile private[this] var continueRunning: Boolean = true
@volatile private[this] var lastNow: Long = System.currentTimeMillis
private[this] val lock = new ReentrantReadWriteLock()
private[this] var futures: Vector[(Long, () => Unit)] = Vector.empty
private[this] val workerRunnable = new Runnable() {
def run() {
@tailrec
def innerRun() {
// Deal with stuff to expire.
lastNow = System.currentTimeMillis
futures.headOption match {
case Some((time, _)) if (time < lastNow) => {
val expiredFutures: Vector[(Long, () => Unit)] = withWrite{
val (past, future) = futures.span(pair => pair._1 < lastNow)
futures = future
past
}
expiredFutures.foreach(call => call._2())
}
case _ => ()
}
// Should we keep running?
if (continueRunning) {
Thread.sleep(safeTickMs)
innerRun()
}
}
innerRun()
}
}
private[this] val workerThread = new Thread(workerRunnable, workerName)
workerThread.start()
def stop() {
continueRunning = false
}
private[this] def withWrite[T](expression: => T): T = {
lock.writeLock().lock()
try {
expression
} finally {
lock.writeLock().unlock()
}
}
def valueWait[T](value: T, waitMs: Long): Future[T] = {
val listen: (T => Unit) => Unit = callback => withWrite{
val waitTime = lastNow + (if (waitMs < 0) 0 else waitMs)
// Lazy implementation for now.
val timedCallback = () => callback(value)
futures = (futures :+ (waitTime, timedCallback)).sortBy(_._1)
}
Future.async[T](listen)
}
def withTimeout[T](future: Future[T], timeout: Long): Future[Timeout \/ T] = {
val timeoutFuture = valueWait(Timeout, timeout)
nondeterminism.choose(timeoutFuture, future).map(_.fold(_._1.left, _._2.right))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment