Created
April 19, 2018 23:47
-
-
Save Groostav/6406b161f381979a454bb6e5214df6c5 to your computer and use it in GitHub Desktop.
A sequential-izing ExecutorService implementation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.empowerops.common | |
import kotlinx.coroutines.experimental.* | |
import kotlinx.coroutines.experimental.future.await | |
import java.time.Duration | |
import java.util.concurrent.* | |
import java.util.concurrent.TimeUnit.MILLISECONDS | |
import java.util.concurrent.TimeUnit.NANOSECONDS | |
import java.util.concurrent.atomic.AtomicLong | |
import java.util.concurrent.atomic.AtomicReference | |
import kotlin.reflect.KClass | |
sealed class ExecutionIntent | |
object Blockable: ExecutionIntent(){ override fun toString() = "BLOCKABLE" } | |
object Worker: ExecutionIntent() { override fun toString() = "WORKER" } | |
object Sequential: ExecutionIntent() { override fun toString() = "SEQUENTIAL" } | |
/** | |
* Broker for sequential access to the blockable thread pool. | |
* | |
* This class is used to create an executor that mimmicks a single thread for tasks that must be executed in some kind | |
* of thread-confined environment. This makes it a good choice to preserve the message-based actor | |
* property where message mediators can run in a confined environment, providing some amount | |
* of optimistic concurrency and parallelism-protection. | |
*/ | |
class SequentialPoolBroker : ExecutorService { | |
private val state = AtomicReference<State>(Idle) | |
private val backingExecutor = BackingThreadPool | |
private val backingDispatcher = backingExecutor.asCoroutineDispatcher() | |
override fun execute(command: Runnable) { | |
val previousState = state.getAndUpdate { it.appendJob(command) } | |
if(previousState is Idle) anxiousExecutors.offer(this) | |
} | |
override fun <T : Any?> submit(task: Runnable, result: T): CompletableFuture<T> = doSubmit { task.run(); result } | |
override fun <T : Any?> submit(task: Callable<T>): CompletableFuture<T> = doSubmit { task.call() } | |
override fun submit(task: Runnable): CompletableFuture<Unit> = doSubmit { task.run() } | |
private inline fun <T> doSubmit(crossinline task: () -> T): CompletableFuture<T> { | |
val resultFuture = CompletableFuture<T>() | |
val newJob = Runnable job@ { | |
if(resultFuture.isCancelled) return@job | |
try { resultFuture.complete(task.invoke()) } | |
catch(ex: Exception) { resultFuture.completeExceptionally(ex) } | |
} | |
val previousState = state.getAndUpdate { it.appendJob(newJob) } | |
if(previousState is Idle) anxiousExecutors.offer(this) | |
return resultFuture | |
} | |
override fun isTerminated(): Boolean = TODO() | |
override fun shutdown() = TODO() | |
override fun shutdownNow(): List<Runnable> = TODO() | |
override fun awaitTermination(timeout: Long, unit: TimeUnit?) = TODO() | |
override fun isShutdown(): Boolean = backingExecutor.isShutdown | |
companion object { | |
private val anxiousExecutors: LinkedBlockingQueue<SequentialPoolBroker> = LinkedBlockingQueue() | |
val CorePoolThreadCount by envParam(default = 4) | |
//see Executors.newCachedThreadPool, which looks very similar | |
private val backingPool = ThreadPoolExecutor( | |
CorePoolThreadCount, | |
/*max pool size*/ Int.MAX_VALUE, | |
/*keep alive time*/ 5, TimeUnit.SECONDS, | |
SynchronousQueue(), | |
ThreadSource(true, Sequential) | |
) | |
fun makeNewSequentialExecutor(callerType: KClass<*>, callerID: Int): ExecutorService { | |
val actualExecutor = SequentialPoolBroker() | |
val renamingExecutor = ThreadRenamingDelegatingExecutorService(actualExecutor, Sequential, callerType, callerID) | |
return renamingExecutor | |
} | |
} | |
object BackingThreadPool : ExecutorService by backingPool { | |
init { | |
val threadName = "${SequentialPoolBroker::class.simpleName}.${this::class.simpleName}-dispatcher" | |
Thread(threadName, daemon = true) { | |
while (true) { | |
val executor = anxiousExecutors.take() | |
[email protected] { | |
do { | |
val (newState, nextJob) = executor.state.updateLeftAndGet { it.takeJob() } | |
when (newState) { | |
Idle -> { /*nothing to do except return.*/ } | |
is Running -> { | |
try { nextJob.run() } | |
catch (ex: Throwable) { | |
try { Thread.currentThread().handleException(ex) } | |
// exceptions can occur here if there is some kind of overflow, | |
// in this event there is literally nothing we can do. | |
catch(ex: Throwable) { | |
try { ex.printStackTrace() } | |
finally { System.exit(-1) } | |
} | |
} | |
} | |
} | |
} | |
while (newState != Idle) | |
} | |
} | |
}.start() | |
} | |
} | |
} | |
private sealed class State { | |
abstract fun appendJob(job: Runnable): State //implementations must be ref-transparent | |
abstract fun takeJob(): Pair<State, Runnable> //implementations must be ref-transparent | |
} | |
private object Idle: State() { | |
override fun appendJob(job: Runnable): State = Running(listOf(job)) | |
override fun takeJob(): Pair<State, Runnable> = TODO() | |
} | |
private class Running(val jobs: List<Runnable>): State() { | |
override fun appendJob(job: Runnable): State = Running(jobs + job) | |
override fun takeJob(): Pair<State, Runnable> { | |
if(jobs.isEmpty()) return Idle to NULL_JOB | |
val removed = jobs.first() | |
val remaining = jobs.drop(1) | |
return (Running(remaining) to removed) | |
} | |
} | |
private object NULL_JOB: Runnable { override fun run() {} } | |
inline fun <L, R> AtomicReference<L>.updateLeftAndGet(updateFunction: (L) -> Pair<L, R>): Pair<L, R> { | |
var next: L | |
var result: R | |
do { | |
val prev = get() | |
val (candidateL, candidateR) = updateFunction(prev) | |
next = candidateL | |
result = candidateR | |
} | |
while ( ! compareAndSet(prev, next)) | |
return next to result | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment