Skip to content

Instantly share code, notes, and snippets.

@Groostav
Created April 19, 2018 23:47
Show Gist options
  • Save Groostav/6406b161f381979a454bb6e5214df6c5 to your computer and use it in GitHub Desktop.
Save Groostav/6406b161f381979a454bb6e5214df6c5 to your computer and use it in GitHub Desktop.
A sequential-izing ExecutorService implementation
package com.empowerops.common
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.future.await
import java.time.Duration
import java.util.concurrent.*
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.TimeUnit.NANOSECONDS
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicReference
import kotlin.reflect.KClass
sealed class ExecutionIntent
object Blockable: ExecutionIntent(){ override fun toString() = "BLOCKABLE" }
object Worker: ExecutionIntent() { override fun toString() = "WORKER" }
object Sequential: ExecutionIntent() { override fun toString() = "SEQUENTIAL" }
/**
* Broker for sequential access to the blockable thread pool.
*
* This class is used to create an executor that mimmicks a single thread for tasks that must be executed in some kind
* of thread-confined environment. This makes it a good choice to preserve the message-based actor
* property where message mediators can run in a confined environment, providing some amount
* of optimistic concurrency and parallelism-protection.
*/
class SequentialPoolBroker : ExecutorService {
private val state = AtomicReference<State>(Idle)
private val backingExecutor = BackingThreadPool
private val backingDispatcher = backingExecutor.asCoroutineDispatcher()
override fun execute(command: Runnable) {
val previousState = state.getAndUpdate { it.appendJob(command) }
if(previousState is Idle) anxiousExecutors.offer(this)
}
override fun <T : Any?> submit(task: Runnable, result: T): CompletableFuture<T> = doSubmit { task.run(); result }
override fun <T : Any?> submit(task: Callable<T>): CompletableFuture<T> = doSubmit { task.call() }
override fun submit(task: Runnable): CompletableFuture<Unit> = doSubmit { task.run() }
private inline fun <T> doSubmit(crossinline task: () -> T): CompletableFuture<T> {
val resultFuture = CompletableFuture<T>()
val newJob = Runnable job@ {
if(resultFuture.isCancelled) return@job
try { resultFuture.complete(task.invoke()) }
catch(ex: Exception) { resultFuture.completeExceptionally(ex) }
}
val previousState = state.getAndUpdate { it.appendJob(newJob) }
if(previousState is Idle) anxiousExecutors.offer(this)
return resultFuture
}
override fun isTerminated(): Boolean = TODO()
override fun shutdown() = TODO()
override fun shutdownNow(): List<Runnable> = TODO()
override fun awaitTermination(timeout: Long, unit: TimeUnit?) = TODO()
override fun isShutdown(): Boolean = backingExecutor.isShutdown
companion object {
private val anxiousExecutors: LinkedBlockingQueue<SequentialPoolBroker> = LinkedBlockingQueue()
val CorePoolThreadCount by envParam(default = 4)
//see Executors.newCachedThreadPool, which looks very similar
private val backingPool = ThreadPoolExecutor(
CorePoolThreadCount,
/*max pool size*/ Int.MAX_VALUE,
/*keep alive time*/ 5, TimeUnit.SECONDS,
SynchronousQueue(),
ThreadSource(true, Sequential)
)
fun makeNewSequentialExecutor(callerType: KClass<*>, callerID: Int): ExecutorService {
val actualExecutor = SequentialPoolBroker()
val renamingExecutor = ThreadRenamingDelegatingExecutorService(actualExecutor, Sequential, callerType, callerID)
return renamingExecutor
}
}
object BackingThreadPool : ExecutorService by backingPool {
init {
val threadName = "${SequentialPoolBroker::class.simpleName}.${this::class.simpleName}-dispatcher"
Thread(threadName, daemon = true) {
while (true) {
val executor = anxiousExecutors.take()
[email protected] {
do {
val (newState, nextJob) = executor.state.updateLeftAndGet { it.takeJob() }
when (newState) {
Idle -> { /*nothing to do except return.*/ }
is Running -> {
try { nextJob.run() }
catch (ex: Throwable) {
try { Thread.currentThread().handleException(ex) }
// exceptions can occur here if there is some kind of overflow,
// in this event there is literally nothing we can do.
catch(ex: Throwable) {
try { ex.printStackTrace() }
finally { System.exit(-1) }
}
}
}
}
}
while (newState != Idle)
}
}
}.start()
}
}
}
private sealed class State {
abstract fun appendJob(job: Runnable): State //implementations must be ref-transparent
abstract fun takeJob(): Pair<State, Runnable> //implementations must be ref-transparent
}
private object Idle: State() {
override fun appendJob(job: Runnable): State = Running(listOf(job))
override fun takeJob(): Pair<State, Runnable> = TODO()
}
private class Running(val jobs: List<Runnable>): State() {
override fun appendJob(job: Runnable): State = Running(jobs + job)
override fun takeJob(): Pair<State, Runnable> {
if(jobs.isEmpty()) return Idle to NULL_JOB
val removed = jobs.first()
val remaining = jobs.drop(1)
return (Running(remaining) to removed)
}
}
private object NULL_JOB: Runnable { override fun run() {} }
inline fun <L, R> AtomicReference<L>.updateLeftAndGet(updateFunction: (L) -> Pair<L, R>): Pair<L, R> {
var next: L
var result: R
do {
val prev = get()
val (candidateL, candidateR) = updateFunction(prev)
next = candidateL
result = candidateR
}
while ( ! compareAndSet(prev, next))
return next to result
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment