Created
January 17, 2013 00:32
-
-
Save viktorklang/4552423 to your computer and use it in GitHub Desktop.
Wraps an ExecutionContext into a new ExecutionContext which will execute its tasks in sequence, always.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.ConcurrentLinkedQueue | |
import java.util.concurrent.atomic.AtomicInteger | |
import scala.concurrent.ExecutionContext | |
import scala.util.control.NonFatal | |
import scala.annotation.tailrec | |
object SerializedExecutionContext { | |
def apply(batchSize: Int)(implicit context: ExecutionContext): ExecutionContext = { | |
require(batchSize > 0, s"SerializedExecutionContext.batchSize must be greater than 0 but was $batchSize") | |
new ConcurrentLinkedQueue[Runnable] with Runnable with ExecutionContext { | |
private final val on = new AtomicInteger(0) | |
@tailrec private final def run(done: Int): Unit = if (done < batchSize) { | |
poll() match { | |
case null => () | |
case some => | |
try some.run() catch { case NonFatal(t) => context reportFailure t } | |
run(done + 1) | |
} | |
} | |
override def add(task: Runnable): Boolean = { | |
val r = super.add(task) | |
attach() | |
r | |
} | |
final def run(): Unit = | |
try { if (on.get == 1) run(0) } finally { on.set(0); attach() } | |
final def attach(): Unit = | |
if(!isEmpty && on.compareAndSet(0, 1)) { | |
try context.execute(this) catch { case t: Throwable => on.set(0); throw t } | |
} | |
override final def execute(task: Runnable): Unit = add(task) | |
override final def reportFailure(t: Throwable): Unit = context reportFailure t | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment