Created
May 12, 2012 09:54
-
-
Save viktorklang/2665603 to your computer and use it in GitHub Desktop.
A StripedExecutor in Scala using only JDK primitives, to serialize execution per stripe
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// ©2012 Viktor Klang | |
// 4,192 bytes jarred | |
package java.klang | |
import java.util.concurrent. { Executor, ConcurrentHashMap, ConcurrentLinkedQueue } | |
import java.util.concurrent.atomic. { AtomicBoolean } | |
import scala.annotation.tailrec | |
trait Striped { | |
self: Runnable => | |
def stripeId: AnyRef | |
} | |
class StripedExecutor(val parent: Executor) extends Executor { | |
private val stripes = new ConcurrentHashMap[AnyRef, AnyRef] | |
override final def execute(runnable: Runnable): Unit = { | |
runnable match { | |
case r: Striped with Runnable => executeStripe(r, r.stripeId) | |
case other => parent execute other | |
} | |
} | |
private def executeStripe(runnable: Runnable, stripe: AnyRef): Unit = (stripe match { | |
case null => parent | |
case stripe => | |
(stripes get stripe) match { | |
case e: ExecutorStripe => e | |
case null => | |
val newExecutorStripe = new ExecutorStripe(parent) | |
stripes.putIfAbsent(stripe, newExecutorStripe) match { | |
case null => newExecutorStripe | |
case existing: ExecutorStripe => existing | |
} | |
} | |
}) execute runnable | |
} | |
class ExecutorStripe(private val parent: Executor) extends ConcurrentLinkedQueue[Runnable] with Runnable with Executor { | |
private val scheduled = new AtomicBoolean(false) | |
final override def execute(runnable: Runnable): Unit = { | |
assert(offer(runnable)) | |
trySchedule() | |
} | |
final override def run = try { | |
@tailrec def runNext(): Unit = poll match { | |
case null => | |
() | |
case some => | |
some.run() | |
runNext() | |
} | |
runNext() | |
} finally { | |
scheduled.set(false) | |
trySchedule() | |
} | |
private final def trySchedule(): Unit = { | |
if (!isEmpty && scheduled.compareAndSet(false, true)) { | |
try { | |
parent execute this | |
} catch { | |
case e => scheduled.set(false); throw e | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment