Skip to content

Instantly share code, notes, and snippets.

@agolovenko
Last active April 15, 2022 10:08
Show Gist options
  • Save agolovenko/44024011601ccf4bad38fd836f3ccccf to your computer and use it in GitHub Desktop.
Save agolovenko/44024011601ccf4bad38fd836f3ccccf to your computer and use it in GitHub Desktop.
Future with timeout execution in a non-blocking style via akka scheduler
import akka.actor.Scheduler
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
object TimeoutFuture {
private class CancellableExecution[T](body: => T) extends Function[Unit, T] {
private val runningThread = new AtomicReference[Thread](null)
private val isCancelled = new AtomicBoolean(false)
def tryCancel(): Unit = this.synchronized {
if (!isCancelled.get && runningThread.get != null) {
isCancelled.set(true)
runningThread.get.interrupt()
}
}
override def apply(unit: Unit): T = {
this.synchronized {
if (isCancelled.get) throw new RuntimeException("The execution has been cancelled before the execution")
runningThread.set(Thread.currentThread())
}
try {
body
} catch {
case ex: InterruptedException => throw new RuntimeException(s"The execution has been cancelled: ${ex.getMessage}")
} finally {
this.synchronized {
Thread.interrupted()
runningThread.set(null)
}
}
}
}
def apply[T](scheduler: Scheduler, timeout: FiniteDuration)(body: => T)(implicit ec: ExecutionContext): Future[T] = {
val execution = new CancellableExecution(body)
val timeoutPromise = Promise[T]()
scheduler.scheduleOnce(timeout) {
timeoutPromise.failure(new RuntimeException(s"Timeout of $timeout has been hit"))
execution.tryCancel()
}
Future.firstCompletedOf(Seq(Future.unit.map(execution), timeoutPromise.future))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment