Last active
February 1, 2024 20:55
-
-
Save upeter/05529866459c7d49eefab544ab30165b to your computer and use it in GitHub Desktop.
Test demonstrating that Periodik blocks Dispatcher Thread
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package dev.akif.periodik | |
import dev.akif.periodik | |
import kotlinx.coroutines.* | |
import org.junit.jupiter.api.Timeout | |
import org.slf4j.Logger | |
import org.slf4j.LoggerFactory | |
import kotlin.random.Random | |
import kotlin.reflect.KProperty | |
import kotlin.test.Test | |
import kotlin.test.assertEquals | |
import kotlin.time.Duration | |
import kotlin.time.Duration.Companion.milliseconds | |
class PeriodikBrokenTest { | |
@Test | |
@Timeout(5) | |
fun `should show that simple ReloadDelegate implementation does not block Dispatcher Thread `() = runBlocking { | |
val service = Service(singleThreadDispather) | |
val results = (1..3).map { | |
service.ratesWorking.also { | |
log.info("Rates: ${service.ratesWorking}") | |
delay(1000) | |
} | |
} | |
//expect rates to be updated every 500 millis, so fetching them 3 times every second, they should be changed 3 times | |
assertEquals(results.toSet().size, 3) | |
} | |
@Test | |
@Timeout(5) | |
fun `should show that periodiek blocks Dispatcher Thread permanently`() = runBlocking { | |
val service = Service(singleThreadDispather) | |
val results = (1..3).map { | |
service.ratesWorking.also { | |
log.info("Rates: ${service.ratesWithPeriodiek}") | |
delay(1000) | |
} | |
} | |
//expect rates to be updated every 500 millis, so fetching them 3 times every second, they should be changed 3 times | |
assertEquals(results.toSet().size, 3) | |
} | |
companion object { | |
private val log: Logger = LoggerFactory.getLogger(PeriodikBrokenTest::class.java) | |
val singleThreadDispather = newSingleThreadContext("single-thread") | |
} | |
} | |
/** | |
* Working multithreaded minimal implementation | |
*/ | |
class ReloadDelegate<T>( | |
dispatcher: CoroutineDispatcher = Dispatchers.Default, | |
val delay: Duration, | |
val updateFun: suspend () -> T | |
) { | |
//for the initial, which must be fetched right away, Dispatchers.IO is a safe choice, could also be made configurable | |
private var backingField: Result<T> = runCatching { runBlocking(Dispatchers.IO) { updateFun() } } | |
private val schedulerScope = CoroutineScope(dispatcher) | |
init { | |
scheduleTasks() | |
} | |
private fun scheduleTasks() { | |
schedulerScope.launch { | |
delay(delay) | |
backingField = runCatching { updateFun()} | |
scheduleTasks() | |
} | |
} | |
operator fun getValue(thisRef: Any, property: KProperty<*>): T = backingField.getOrThrow() | |
} | |
/** | |
* Dummy implementation using Periodik and simple delegate | |
*/ | |
class Service(coroutineDispatcher: CoroutineDispatcher) { | |
val rateApi = RateApi() | |
val ratesWorking: Map<String, Map<String, Double>> by ReloadDelegate( | |
coroutineDispatcher, | |
500.milliseconds, | |
::fetchParallel | |
) | |
val ratesWithPeriodiek: Map<String, Map<String, Double>> by periodik() | |
.on(Schedule.every(500.milliseconds)) | |
.initializeLazily() | |
.loggingWithSlf4j() | |
.buildSuspending(coroutineDispatcher) { | |
fetchParallel() | |
} | |
suspend fun fetchParallel() = coroutineScope { | |
val ratesForAll = | |
supportedCurrencies.map { c -> | |
async { | |
val ratesForCurrency = rateApi.ratesFor(c).filterKeys { supportedCurrencies.contains(it) } | |
c to ratesForCurrency | |
} | |
}.awaitAll() | |
ratesForAll.associateBy( | |
keySelector = { (currency, _) -> currency }, | |
valueTransform = { (_, ratesForCurrency) -> ratesForCurrency } | |
) | |
} | |
suspend fun get(from: String, to: String): ExchangeRate? = | |
when { | |
from == to -> ExchangeRate(from, to, 1.0) | |
else -> { | |
log.info("Getting exchange rate from $from to $to") | |
ratesWithPeriodiek[from]?.let { it[to] }?.let { ExchangeRate(from, to, it) } | |
} | |
} | |
companion object { | |
private val log: Logger = LoggerFactory.getLogger(Service::class.java) | |
val supportedCurrencies = setOf("USD", "EUR", "TRY", "GBP") | |
} | |
} | |
class RateApi { | |
companion object { | |
private val log: Logger = LoggerFactory.getLogger(RateApi::class.java) | |
private const val lower: Double = 0.2 | |
private const val upper: Double = 50.0 | |
} | |
suspend fun ratesFor(currency: String): Map<String, Double> { | |
log.info("Getting rates for {}", currency) | |
delay(300.milliseconds) | |
return mapOf( | |
"EUR" to Random.nextDouble(lower, upper), | |
"USD" to Random.nextDouble(lower, upper), | |
"TRY" to Random.nextDouble(lower, upper) | |
) | |
} | |
} | |
data class ExchangeRate( | |
val from: String, | |
val to: String, | |
val rate: Double | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment