Last active
September 29, 2021 18:17
-
-
Save Szer/44da3ffa570a713437aae9e69ec401af to your computer and use it in GitHub Desktop.
thread-safe ObjectPool with growth, shrinking and limits
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.thriveglobal.reset | |
| import kotlinx.coroutines.* | |
| import kotlinx.coroutines.channels.Channel | |
| import kotlinx.datetime.Clock | |
| import kotlinx.datetime.Instant | |
| import org.slf4j.LoggerFactory | |
| import java.io.Closeable | |
| import java.util.concurrent.LinkedBlockingDeque | |
| import kotlin.time.Duration | |
| data class PoolOptions( | |
| val minInstances: Int, | |
| val maxInstances: Int, | |
| val evictTimeout: Duration, | |
| val clock: Clock, | |
| val scope: CoroutineScope, | |
| // needed to synchronize unit tests | |
| internal val syncer: suspend () -> Unit = {}, | |
| ) { | |
| companion object { | |
| fun default() = PoolOptions( | |
| minInstances = 1, | |
| maxInstances = 2, | |
| evictTimeout = Duration.minutes(1), | |
| clock = Clock.System, | |
| scope = CoroutineScope(Dispatchers.Default), | |
| ) | |
| } | |
| } | |
| interface ObjectPool<T> : Closeable { | |
| suspend fun borrow(action: suspend (T) -> Unit) | |
| } | |
| // This is object pool which should support borrowing resources from it in thread-safe manner | |
| // it will grow linearly up to maxInstances | |
| // and shrink linearly down to minInstances if resource hasn't been used in evictTimeout | |
| class PoolWithEviction<T>( | |
| private val spawnFunction: () -> T, | |
| private val poolOptions: PoolOptions = PoolOptions.default() | |
| ) : Closeable, ObjectPool<T> { | |
| private var closed = false | |
| // main channel for server event loop | |
| private val serverCh = Channel<Msg>() | |
| private val resources = LinkedBlockingDeque<ResourceInfo<T>>() | |
| private val logger = LoggerFactory.getLogger(this::class.java) | |
| // current amount of created resources | |
| // Borrowing won't decrease that count | |
| // Will be increased on growth | |
| // Will be decreased on shrink | |
| private var capacity = 0 | |
| // indicates whether new resource creation is happening | |
| private var spawnJob: Job? = null | |
| // list of requests who can't find free resource to work | |
| private val waitList = mutableListOf<Channel<T?>>() | |
| val currentCapacity: Int get() = capacity | |
| override suspend fun borrow(action: suspend (T) -> Unit) { | |
| var resource: T? | |
| do { | |
| val replyCh = Channel<T?>() | |
| serverCh.send(Msg.TryBorrow(replyCh)) | |
| logger.debug("Client - trying to get resource") | |
| resource = replyCh.receive() | |
| } while (resource == null) | |
| try { | |
| logger.debug("Client - got resource, doing something useful with it") | |
| action(resource) | |
| } finally { | |
| logger.debug("Client - returning resource to the server") | |
| serverCh.send(Msg.Return(resource)) | |
| } | |
| } | |
| private fun cleanupStaleResources() { | |
| if (closed) throw Exception("Pool has been closed already") | |
| val now = poolOptions.clock.now() | |
| val item = resources.peekLast() | |
| if (item == null) { | |
| logger.debug("Cleanup - No items in the pool to evict ${this::class.qualifiedName}") | |
| return | |
| } | |
| if (item.lastTimeUsed.plus(poolOptions.evictTimeout) >= now) { | |
| logger.debug("Cleanup - Item with lastTimeUsed: ${item.lastTimeUsed} has been returned back to ${this::class.qualifiedName}") | |
| } else { | |
| if (currentCapacity > poolOptions.minInstances) { | |
| capacity -= 1 | |
| resources.removeLast() | |
| logger.debug("Cleanup - Item with lastTimeUsed: ${item.lastTimeUsed} has been removed from ${this::class.qualifiedName}") | |
| } else { | |
| logger.debug("Cleanup - Item with lastTimeUsed: ${item.lastTimeUsed} is stale, but min capacity reached. It has been returned back to ${this::class.qualifiedName}") | |
| } | |
| } | |
| } | |
| private suspend fun signalWaitListToRetry() { | |
| waitList.forEach { | |
| it.send(null) | |
| } | |
| waitList.clear() | |
| } | |
| // background loop which will try to find stale resource every evictTimeout | |
| private val cleanupLoop = poolOptions.scope.launch { | |
| while (!closed) { | |
| poolOptions.syncer() | |
| serverCh.send(Msg.Cleanup) | |
| delay(poolOptions.evictTimeout) | |
| } | |
| } | |
| // main event loop which requests server channel for messages | |
| // it should not block or even await on anything beside sending replies to channels | |
| // this loop MUST be very responsive | |
| @Suppress("UNCHECKED_CAST") | |
| private val serverLoop = poolOptions.scope.launch { | |
| while (!closed) { | |
| when (val msg = serverCh.receive()) { | |
| // time to check for stale resources | |
| is Msg.Cleanup -> cleanupStaleResources() | |
| // new resource finished its initialization. we should signal those who are waiting | |
| Msg.SpawnJobFinished -> { | |
| logger.debug("Server - spawn job has finished, signaling all waiters to retry") | |
| signalWaitListToRetry() | |
| spawnJob = null | |
| } | |
| // resource has been returned, we should signal those who are waiting | |
| is Msg.Return<*> -> { | |
| logger.debug("Server - resource has been returned, signaling all waiters to retry") | |
| resources.addFirst(ResourceInfo((msg.resource as T), lastTimeUsed = poolOptions.clock.now())) | |
| signalWaitListToRetry() | |
| } | |
| // request for borrowing resource | |
| is Msg.TryBorrow<*> -> { | |
| when (val resource = resources.pollFirst()) { | |
| null -> // we don't have free resource available | |
| if (currentCapacity < poolOptions.maxInstances) { | |
| // can create more ... | |
| if (spawnJob != null) { | |
| // ... but resource already being created. put request on hold | |
| logger.debug("Server - No free resources in pool ${this::class.qualifiedName}, but spawn job already there, put reply on wait") | |
| waitList.add(msg.replyCh as Channel<T?>) | |
| } else { | |
| // ... so let's create more! put request on hold | |
| logger.debug("Server - No free resources in pool ${this::class.qualifiedName}, started spawn job, put reply on wait") | |
| spawnJob = launch { | |
| val newResource = spawnFunction() | |
| if (closed) throw Exception("Pool has been closed already") | |
| capacity += 1 | |
| resources.addFirst(ResourceInfo(newResource, poolOptions.clock.now())) | |
| serverCh.send(Msg.SpawnJobFinished) | |
| } | |
| waitList.add(msg.replyCh as Channel<T?>) | |
| } | |
| } else { | |
| // can't create more resources | |
| logger.debug("Server - No free resources in pool ${this::class.qualifiedName} but we are at max capacity. put reply on wait") | |
| waitList.add(msg.replyCh as Channel<T?>) | |
| } | |
| else -> { // if we have free resource, just send it back | |
| logger.debug("Server - free resource found to borrow. Sending it back to requester") | |
| (msg.replyCh as Channel<T?>).send(resource.value) | |
| } | |
| } | |
| } | |
| } | |
| } | |
| } | |
| init { | |
| serverLoop.start() | |
| cleanupLoop.start() | |
| logger.info("DisposablePool has started") | |
| } | |
| override fun close() { | |
| closed = true | |
| waitList.clear() | |
| capacity = 0 | |
| serverLoop.cancel("Pool has been closed") | |
| cleanupLoop.cancel("Pool has been closed") | |
| logger.info("DisposablePool has been closed") | |
| } | |
| // Our client-server protocol | |
| private sealed interface Msg { | |
| data class TryBorrow<T>(val replyCh: Channel<T?>) : Msg | |
| data class Return<T>(val resource: T) : Msg | |
| object Cleanup : Msg | |
| object SpawnJobFinished : Msg | |
| } | |
| private data class ResourceInfo<T>(val value: T, val lastTimeUsed: Instant) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.thriveglobal.reset | |
| import kotlinx.coroutines.async | |
| import kotlinx.coroutines.channels.Channel | |
| import kotlinx.coroutines.runBlocking | |
| import kotlinx.coroutines.test.runBlockingTest | |
| import kotlinx.datetime.Clock | |
| import kotlinx.datetime.Instant | |
| import org.junit.jupiter.api.Assertions.* | |
| import org.junit.jupiter.api.Test | |
| import java.util.concurrent.atomic.AtomicInteger | |
| import kotlin.time.Duration | |
| class PoolWithEvictionTests { | |
| // just gives you back integers | |
| private fun spawnFunction(): () -> Int { | |
| val i = AtomicInteger(0) | |
| return { | |
| i.getAndIncrement() | |
| } | |
| } | |
| // tricky function to synchronize awaits inside Pool and outside with controllable time | |
| // idea is that somebody should call tick() and wait for tock(). Tock() will also change clock time by amount specified in duration | |
| // Strictly for controllable unit tests | |
| private fun tickingClocks(duration: Duration = Duration.minutes(1)): Triple<Clock, suspend () -> Unit, suspend () -> Unit> { | |
| var now = Clock.System.now() | |
| // capacity = 1 is important. Tick/Tock functions will always work sequentially because of that | |
| val ch = Channel<Unit>(capacity = 1) | |
| val tick: (suspend () -> Unit) = { | |
| try { | |
| ch.send(Unit) | |
| } catch (e: Exception) { | |
| } | |
| } | |
| val tock: (suspend () -> Unit) = { | |
| // changing our time on tock | |
| now = now.plus(duration) | |
| try { | |
| ch.receive() | |
| } catch (e: Exception) { | |
| } | |
| } | |
| val clock = object : Clock { | |
| // return of this function will be changed on each tock() | |
| override fun now(): Instant = now | |
| } | |
| return Triple(clock, tick, tock) | |
| } | |
| @Test | |
| fun `should be able to borrow from pool`() { | |
| PoolWithEviction(spawnFunction()).use { pool -> | |
| runBlocking { | |
| assertEquals(0, pool.currentCapacity) | |
| pool.borrow { assertEquals(0, it) } | |
| assertEquals(1, pool.currentCapacity) | |
| } | |
| } | |
| } | |
| @Test | |
| fun `should be able to borrow from pool twice consecutively`() { | |
| PoolWithEviction(spawnFunction()).use { pool -> | |
| runBlocking { | |
| assertEquals(0, pool.currentCapacity) | |
| pool.borrow { zero -> | |
| assertEquals(0, zero) | |
| } | |
| pool.borrow { zero -> | |
| assertEquals(0, zero) | |
| } | |
| assertEquals(1, pool.currentCapacity) | |
| } | |
| } | |
| } | |
| @Test | |
| fun `should be able to borrow from pool twice embedded`() { | |
| PoolWithEviction(spawnFunction()).use { pool -> | |
| runBlocking { | |
| assertEquals(0, pool.currentCapacity) | |
| pool.borrow { zero -> | |
| assertEquals(0, zero) | |
| pool.borrow { one -> | |
| assertEquals(1, one) | |
| } | |
| } | |
| assertEquals(2, pool.currentCapacity) | |
| pool.borrow { zero -> | |
| assertEquals(0, zero) | |
| pool.borrow { one -> | |
| assertEquals(1, one) | |
| } | |
| } | |
| assertEquals(2, pool.currentCapacity) | |
| } | |
| } | |
| } | |
| @Test | |
| fun `should evict stale workers`() { | |
| runBlockingTest { | |
| val (clock, tick, tock) = tickingClocks() | |
| val opts = PoolOptions( | |
| minInstances = 0, | |
| maxInstances = 10, | |
| evictTimeout = Duration.minutes(2), | |
| clock = clock, | |
| scope = this, | |
| syncer = tick, | |
| ) | |
| PoolWithEviction(spawnFunction(), opts).use { pool -> | |
| assertEquals(0, pool.currentCapacity) | |
| pool.borrow { zero -> | |
| assertEquals(0, zero) | |
| } | |
| tock() // 0min pass | |
| assertEquals(1, pool.currentCapacity) | |
| tock() // 1min pass | |
| assertEquals(1, pool.currentCapacity) | |
| tock() // 2min pass should be evicted now | |
| assertEquals(0, pool.currentCapacity) | |
| tock() // 3min pass should be evicted now | |
| assertEquals(0, pool.currentCapacity) | |
| } | |
| } | |
| } | |
| @Test | |
| fun `should not evict stale workers less than min capacity`() { | |
| runBlockingTest { | |
| val (clock, tick, tock) = tickingClocks() | |
| val opts = PoolOptions( | |
| minInstances = 1, | |
| maxInstances = 10, | |
| evictTimeout = Duration.minutes(2), | |
| clock = clock, | |
| scope = this, | |
| syncer = tick, | |
| ) | |
| PoolWithEviction(spawnFunction(), opts).use { pool -> | |
| assertEquals(0, pool.currentCapacity) | |
| pool.borrow { pool.borrow { pool.borrow { } } } | |
| tock() // 0min pass | |
| assertEquals(3, pool.currentCapacity) | |
| tock() // 1min pass | |
| assertEquals(3, pool.currentCapacity) | |
| tock() // 2min pass, one instance should be evicted now | |
| assertEquals(2, pool.currentCapacity) | |
| tock() // 3min pass, one more instance should be evicted now | |
| assertEquals(1, pool.currentCapacity) | |
| tock() // 4min pass, no more eviction, because minInstance = 1 | |
| assertEquals(1, pool.currentCapacity) | |
| } | |
| } | |
| } | |
| @Test | |
| fun `should not create new workers higher than max capacity`() { | |
| runBlocking { | |
| val opts = PoolOptions( | |
| minInstances = 1, | |
| maxInstances = 3, | |
| evictTimeout = Duration.minutes(2), | |
| clock = Clock.System, | |
| scope = this, | |
| ) | |
| PoolWithEviction(spawnFunction(), opts).use { pool -> | |
| val requests = (1..100).map { | |
| async { | |
| pool.borrow { } | |
| } | |
| } | |
| for (req in requests) { | |
| req.await() | |
| } | |
| assertEquals(3, pool.currentCapacity) | |
| } | |
| } | |
| } | |
| @Test | |
| fun `should evict stale workers even under load`() { | |
| runBlockingTest { | |
| val (clock, tick, tock) = tickingClocks() | |
| val opts = PoolOptions( | |
| minInstances = 0, | |
| maxInstances = 2, | |
| evictTimeout = Duration.minutes(2), | |
| clock = clock, | |
| scope = this, | |
| syncer = tick, | |
| ) | |
| PoolWithEviction(spawnFunction(), opts).use { pool -> | |
| assertEquals(0, pool.currentCapacity) | |
| pool.borrow { pool.borrow { } } | |
| tock() // 0min pass | |
| assertEquals(2, pool.currentCapacity) | |
| pool.borrow { } | |
| tock() // 1min pass | |
| assertEquals(2, pool.currentCapacity) | |
| pool.borrow { } | |
| tock() // 2min pass, one instance should be evicted now | |
| assertEquals(1, pool.currentCapacity) | |
| pool.borrow { } | |
| tock() // 3min pass, no more eviction, because we are using one worker constantly | |
| assertEquals(1, pool.currentCapacity) | |
| } | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment