Created
October 24, 2018 16:19
-
-
Save zach-klippenstein/ee1f96dd12ea38509fc7e870109491fe to your computer and use it in GitHub Desktop.
Sketch of what a worker pool for limiting concurrent database operations might look like.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.* | |
import kotlinx.coroutines.channels.Channel | |
import kotlin.coroutines.CoroutineContext | |
import kotlin.coroutines.coroutineContext | |
class DatabaseExecutor( | |
nWorkers: Int, | |
context: CoroutineContext | |
) { | |
private inner class Task<R>(val runnable: suspend CoroutineScope.() -> R) { | |
private val _result = CompletableDeferred<R>() | |
/** Should only be called from worker. */ | |
suspend fun computeResult() { | |
// Spin up a new coroutine scope that is a child of the worker scope | |
// to delimit any coroutines started by the task. | |
coroutineScope { | |
try { | |
_result.complete(runnable()) | |
} catch (e: Throwable) { | |
_result.completeExceptionally(e) | |
} | |
} | |
} | |
suspend fun awaitResult() = _result.await() | |
} | |
private val taskQueue = Channel<Task<*>>() | |
/** Use a SupervisorJob because */ | |
private val job = SupervisorJob(context[Job]) | |
init { | |
val workerScope = CoroutineScope(context + job) | |
repeat(nWorkers) { | |
workerScope.launch { | |
for (task in taskQueue) { | |
task.computeResult() | |
} | |
} | |
} | |
} | |
/** | |
* Runs [block] in the context passed to the constructor. | |
* | |
* The block's scope will have a new [Job] that is a child of any job from that context. | |
* | |
* Exceptions thrown from [block] will not be propagated up (i.e. they will not cancel | |
* this worker or other workers), but will be re-thrown to the caller. | |
*/ | |
suspend fun <R> withDB(block: suspend CoroutineScope.() -> R): R = | |
Task(block) | |
.also { taskQueue.send(it) } | |
.awaitResult() | |
/** Waits for all queued tasks to complete, then shuts down all workers. */ | |
suspend fun shutdown() { | |
taskQueue.close() | |
job.join() | |
} | |
} | |
suspend fun main() { | |
val database = DatabaseExecutor( | |
nWorkers = 10, | |
context = coroutineContext + Dispatchers.IO | |
) | |
database.withDB { | |
// suspending database calls | |
} | |
// … | |
database.shutdown() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment