Skip to content

Instantly share code, notes, and snippets.

@mengdd
Last active May 16, 2025 07:54
Show Gist options
  • Save mengdd/d8e51d858e3c0f30018e10ed73686569 to your computer and use it in GitHub Desktop.
Save mengdd/d8e51d858e3c0f30018e10ed73686569 to your computer and use it in GitHub Desktop.
Batch size issue demo
data class PendingRequest(
val request: String,
) {
val deferred = CompletableDeferred<String>()
}
class BatchHttpInterceptor constructor(
private val batchIntervalMillis: Long = 10,
private val maxBatchSize: Int = 10,
) {
private val startMark = markNow()
@OptIn(ExperimentalCoroutinesApi::class)
private val dispatcher = Dispatchers.Default.limitedParallelism(1)
private val scope = CoroutineScope(dispatcher)
private val mutex = Mutex()
private val pendingRequests = mutableListOf<PendingRequest>()
suspend fun intercept(request: String): String {
val pendingRequest = PendingRequest(request)
val sendNow = mutex.withLock {
println("add request $pendingRequest")
pendingRequests.add(pendingRequest)
pendingRequests.size >= maxBatchSize
}
if (sendNow) {
println("send now!")
executePendingRequests()
} else {
scope.launch {
delay(batchIntervalMillis - (startMark.elapsedNow().inWholeMilliseconds % batchIntervalMillis) - 1)
executePendingRequests()
}
}
return pendingRequest.deferred.await()
}
private suspend fun executePendingRequests() {
println("executePendingRequests isLocked: ${mutex.isLocked}")
val pending = mutex.withLock { // due to the lock could not be acquired here, the requests accumulates
println("sender acquired")
val copy = pendingRequests.toList()
pendingRequests.clear()
copy
}
if (pending.isEmpty()) {
return
}
println("---> sending pending request size: ${pending.size}")
println(" sending requests: ${pending.map { it.request }}")
}
}
fun main() = runBlocking {
val interceptor = BatchHttpInterceptor(batchIntervalMillis = 10, maxBatchSize = 10)
List(100) { index ->
async(Dispatchers.IO) {
val url = "url $index"
interceptor.intercept(url)
}
}.awaitAll()
println("------")
}
class BatchHttpInterceptor2 constructor(
private val batchIntervalMillis: Long = 10,
private val maxBatchSize: Int = 10,
) {
private val startMark = markNow()
@OptIn(ExperimentalCoroutinesApi::class)
private val dispatcher = Dispatchers.Default.limitedParallelism(1)
private val scope = CoroutineScope(dispatcher)
private val mutex = Mutex()
private val pendingRequests = mutableListOf<PendingRequest>()
suspend fun intercept(request: String): String {
val pendingRequest = PendingRequest(request)
val sendNow = mutex.withLock {
println("add request $pendingRequest")
pendingRequests.add(pendingRequest)
val batchFull = pendingRequests.size >= maxBatchSize
if (batchFull) {
executePendingRequests(false)
}
batchFull
}
if (!sendNow) {
scope.launch {
delay(batchIntervalMillis - (startMark.elapsedNow().inWholeMilliseconds % batchIntervalMillis) - 1)
executePendingRequests(needLock = true)
}
}
return pendingRequest.deferred.await()
}
private suspend fun executePendingRequests(needLock: Boolean) {
println("executePendingRequests isLocked: ${mutex.isLocked}")
val pending = if (needLock) {
mutex.withLock {
println("sender acquired")
val copy = pendingRequests.toList()
pendingRequests.clear()
copy
}
} else {
val copy = pendingRequests.toList()
pendingRequests.clear()
copy
}
if (pending.isEmpty()) {
return
}
println("---> sending pending request size: ${pending.size}")
println(" sending requests: ${pending.map { it.request }}")
}
}
fun main() = runBlocking {
val interceptor = BatchHttpInterceptor2(batchIntervalMillis = 10, maxBatchSize = 10)
List(100) { index ->
async(Dispatchers.IO) {
val url = "url $index"
interceptor.intercept(url)
}
}.awaitAll()
println("------")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment