Skip to content

Instantly share code, notes, and snippets.

@Szer
Last active September 29, 2021 18:17
Show Gist options
  • Select an option

  • Save Szer/44da3ffa570a713437aae9e69ec401af to your computer and use it in GitHub Desktop.

Select an option

Save Szer/44da3ffa570a713437aae9e69ec401af to your computer and use it in GitHub Desktop.
thread-safe ObjectPool with growth, shrinking and limits
package com.thriveglobal.reset
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import org.slf4j.LoggerFactory
import java.io.Closeable
import java.util.concurrent.LinkedBlockingDeque
import kotlin.time.Duration
data class PoolOptions(
val minInstances: Int,
val maxInstances: Int,
val evictTimeout: Duration,
val clock: Clock,
val scope: CoroutineScope,
// needed to synchronize unit tests
internal val syncer: suspend () -> Unit = {},
) {
companion object {
fun default() = PoolOptions(
minInstances = 1,
maxInstances = 2,
evictTimeout = Duration.minutes(1),
clock = Clock.System,
scope = CoroutineScope(Dispatchers.Default),
)
}
}
interface ObjectPool<T> : Closeable {
suspend fun borrow(action: suspend (T) -> Unit)
}
// This is object pool which should support borrowing resources from it in thread-safe manner
// it will grow linearly up to maxInstances
// and shrink linearly down to minInstances if resource hasn't been used in evictTimeout
class PoolWithEviction<T>(
private val spawnFunction: () -> T,
private val poolOptions: PoolOptions = PoolOptions.default()
) : Closeable, ObjectPool<T> {
private var closed = false
// main channel for server event loop
private val serverCh = Channel<Msg>()
private val resources = LinkedBlockingDeque<ResourceInfo<T>>()
private val logger = LoggerFactory.getLogger(this::class.java)
// current amount of created resources
// Borrowing won't decrease that count
// Will be increased on growth
// Will be decreased on shrink
private var capacity = 0
// indicates whether new resource creation is happening
private var spawnJob: Job? = null
// list of requests who can't find free resource to work
private val waitList = mutableListOf<Channel<T?>>()
val currentCapacity: Int get() = capacity
override suspend fun borrow(action: suspend (T) -> Unit) {
var resource: T?
do {
val replyCh = Channel<T?>()
serverCh.send(Msg.TryBorrow(replyCh))
logger.debug("Client - trying to get resource")
resource = replyCh.receive()
} while (resource == null)
try {
logger.debug("Client - got resource, doing something useful with it")
action(resource)
} finally {
logger.debug("Client - returning resource to the server")
serverCh.send(Msg.Return(resource))
}
}
private fun cleanupStaleResources() {
if (closed) throw Exception("Pool has been closed already")
val now = poolOptions.clock.now()
val item = resources.peekLast()
if (item == null) {
logger.debug("Cleanup - No items in the pool to evict ${this::class.qualifiedName}")
return
}
if (item.lastTimeUsed.plus(poolOptions.evictTimeout) >= now) {
logger.debug("Cleanup - Item with lastTimeUsed: ${item.lastTimeUsed} has been returned back to ${this::class.qualifiedName}")
} else {
if (currentCapacity > poolOptions.minInstances) {
capacity -= 1
resources.removeLast()
logger.debug("Cleanup - Item with lastTimeUsed: ${item.lastTimeUsed} has been removed from ${this::class.qualifiedName}")
} else {
logger.debug("Cleanup - Item with lastTimeUsed: ${item.lastTimeUsed} is stale, but min capacity reached. It has been returned back to ${this::class.qualifiedName}")
}
}
}
private suspend fun signalWaitListToRetry() {
waitList.forEach {
it.send(null)
}
waitList.clear()
}
// background loop which will try to find stale resource every evictTimeout
private val cleanupLoop = poolOptions.scope.launch {
while (!closed) {
poolOptions.syncer()
serverCh.send(Msg.Cleanup)
delay(poolOptions.evictTimeout)
}
}
// main event loop which requests server channel for messages
// it should not block or even await on anything beside sending replies to channels
// this loop MUST be very responsive
@Suppress("UNCHECKED_CAST")
private val serverLoop = poolOptions.scope.launch {
while (!closed) {
when (val msg = serverCh.receive()) {
// time to check for stale resources
is Msg.Cleanup -> cleanupStaleResources()
// new resource finished its initialization. we should signal those who are waiting
Msg.SpawnJobFinished -> {
logger.debug("Server - spawn job has finished, signaling all waiters to retry")
signalWaitListToRetry()
spawnJob = null
}
// resource has been returned, we should signal those who are waiting
is Msg.Return<*> -> {
logger.debug("Server - resource has been returned, signaling all waiters to retry")
resources.addFirst(ResourceInfo((msg.resource as T), lastTimeUsed = poolOptions.clock.now()))
signalWaitListToRetry()
}
// request for borrowing resource
is Msg.TryBorrow<*> -> {
when (val resource = resources.pollFirst()) {
null -> // we don't have free resource available
if (currentCapacity < poolOptions.maxInstances) {
// can create more ...
if (spawnJob != null) {
// ... but resource already being created. put request on hold
logger.debug("Server - No free resources in pool ${this::class.qualifiedName}, but spawn job already there, put reply on wait")
waitList.add(msg.replyCh as Channel<T?>)
} else {
// ... so let's create more! put request on hold
logger.debug("Server - No free resources in pool ${this::class.qualifiedName}, started spawn job, put reply on wait")
spawnJob = launch {
val newResource = spawnFunction()
if (closed) throw Exception("Pool has been closed already")
capacity += 1
resources.addFirst(ResourceInfo(newResource, poolOptions.clock.now()))
serverCh.send(Msg.SpawnJobFinished)
}
waitList.add(msg.replyCh as Channel<T?>)
}
} else {
// can't create more resources
logger.debug("Server - No free resources in pool ${this::class.qualifiedName} but we are at max capacity. put reply on wait")
waitList.add(msg.replyCh as Channel<T?>)
}
else -> { // if we have free resource, just send it back
logger.debug("Server - free resource found to borrow. Sending it back to requester")
(msg.replyCh as Channel<T?>).send(resource.value)
}
}
}
}
}
}
init {
serverLoop.start()
cleanupLoop.start()
logger.info("DisposablePool has started")
}
override fun close() {
closed = true
waitList.clear()
capacity = 0
serverLoop.cancel("Pool has been closed")
cleanupLoop.cancel("Pool has been closed")
logger.info("DisposablePool has been closed")
}
// Our client-server protocol
private sealed interface Msg {
data class TryBorrow<T>(val replyCh: Channel<T?>) : Msg
data class Return<T>(val resource: T) : Msg
object Cleanup : Msg
object SpawnJobFinished : Msg
}
private data class ResourceInfo<T>(val value: T, val lastTimeUsed: Instant)
}
package com.thriveglobal.reset
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runBlockingTest
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import java.util.concurrent.atomic.AtomicInteger
import kotlin.time.Duration
class PoolWithEvictionTests {
// just gives you back integers
private fun spawnFunction(): () -> Int {
val i = AtomicInteger(0)
return {
i.getAndIncrement()
}
}
// tricky function to synchronize awaits inside Pool and outside with controllable time
// idea is that somebody should call tick() and wait for tock(). Tock() will also change clock time by amount specified in duration
// Strictly for controllable unit tests
private fun tickingClocks(duration: Duration = Duration.minutes(1)): Triple<Clock, suspend () -> Unit, suspend () -> Unit> {
var now = Clock.System.now()
// capacity = 1 is important. Tick/Tock functions will always work sequentially because of that
val ch = Channel<Unit>(capacity = 1)
val tick: (suspend () -> Unit) = {
try {
ch.send(Unit)
} catch (e: Exception) {
}
}
val tock: (suspend () -> Unit) = {
// changing our time on tock
now = now.plus(duration)
try {
ch.receive()
} catch (e: Exception) {
}
}
val clock = object : Clock {
// return of this function will be changed on each tock()
override fun now(): Instant = now
}
return Triple(clock, tick, tock)
}
@Test
fun `should be able to borrow from pool`() {
PoolWithEviction(spawnFunction()).use { pool ->
runBlocking {
assertEquals(0, pool.currentCapacity)
pool.borrow { assertEquals(0, it) }
assertEquals(1, pool.currentCapacity)
}
}
}
@Test
fun `should be able to borrow from pool twice consecutively`() {
PoolWithEviction(spawnFunction()).use { pool ->
runBlocking {
assertEquals(0, pool.currentCapacity)
pool.borrow { zero ->
assertEquals(0, zero)
}
pool.borrow { zero ->
assertEquals(0, zero)
}
assertEquals(1, pool.currentCapacity)
}
}
}
@Test
fun `should be able to borrow from pool twice embedded`() {
PoolWithEviction(spawnFunction()).use { pool ->
runBlocking {
assertEquals(0, pool.currentCapacity)
pool.borrow { zero ->
assertEquals(0, zero)
pool.borrow { one ->
assertEquals(1, one)
}
}
assertEquals(2, pool.currentCapacity)
pool.borrow { zero ->
assertEquals(0, zero)
pool.borrow { one ->
assertEquals(1, one)
}
}
assertEquals(2, pool.currentCapacity)
}
}
}
@Test
fun `should evict stale workers`() {
runBlockingTest {
val (clock, tick, tock) = tickingClocks()
val opts = PoolOptions(
minInstances = 0,
maxInstances = 10,
evictTimeout = Duration.minutes(2),
clock = clock,
scope = this,
syncer = tick,
)
PoolWithEviction(spawnFunction(), opts).use { pool ->
assertEquals(0, pool.currentCapacity)
pool.borrow { zero ->
assertEquals(0, zero)
}
tock() // 0min pass
assertEquals(1, pool.currentCapacity)
tock() // 1min pass
assertEquals(1, pool.currentCapacity)
tock() // 2min pass should be evicted now
assertEquals(0, pool.currentCapacity)
tock() // 3min pass should be evicted now
assertEquals(0, pool.currentCapacity)
}
}
}
@Test
fun `should not evict stale workers less than min capacity`() {
runBlockingTest {
val (clock, tick, tock) = tickingClocks()
val opts = PoolOptions(
minInstances = 1,
maxInstances = 10,
evictTimeout = Duration.minutes(2),
clock = clock,
scope = this,
syncer = tick,
)
PoolWithEviction(spawnFunction(), opts).use { pool ->
assertEquals(0, pool.currentCapacity)
pool.borrow { pool.borrow { pool.borrow { } } }
tock() // 0min pass
assertEquals(3, pool.currentCapacity)
tock() // 1min pass
assertEquals(3, pool.currentCapacity)
tock() // 2min pass, one instance should be evicted now
assertEquals(2, pool.currentCapacity)
tock() // 3min pass, one more instance should be evicted now
assertEquals(1, pool.currentCapacity)
tock() // 4min pass, no more eviction, because minInstance = 1
assertEquals(1, pool.currentCapacity)
}
}
}
@Test
fun `should not create new workers higher than max capacity`() {
runBlocking {
val opts = PoolOptions(
minInstances = 1,
maxInstances = 3,
evictTimeout = Duration.minutes(2),
clock = Clock.System,
scope = this,
)
PoolWithEviction(spawnFunction(), opts).use { pool ->
val requests = (1..100).map {
async {
pool.borrow { }
}
}
for (req in requests) {
req.await()
}
assertEquals(3, pool.currentCapacity)
}
}
}
@Test
fun `should evict stale workers even under load`() {
runBlockingTest {
val (clock, tick, tock) = tickingClocks()
val opts = PoolOptions(
minInstances = 0,
maxInstances = 2,
evictTimeout = Duration.minutes(2),
clock = clock,
scope = this,
syncer = tick,
)
PoolWithEviction(spawnFunction(), opts).use { pool ->
assertEquals(0, pool.currentCapacity)
pool.borrow { pool.borrow { } }
tock() // 0min pass
assertEquals(2, pool.currentCapacity)
pool.borrow { }
tock() // 1min pass
assertEquals(2, pool.currentCapacity)
pool.borrow { }
tock() // 2min pass, one instance should be evicted now
assertEquals(1, pool.currentCapacity)
pool.borrow { }
tock() // 3min pass, no more eviction, because we are using one worker constantly
assertEquals(1, pool.currentCapacity)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment