Last active
May 16, 2025 07:54
-
-
Save mengdd/d8e51d858e3c0f30018e10ed73686569 to your computer and use it in GitHub Desktop.
Batch size issue demo
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| data class PendingRequest( | |
| val request: String, | |
| ) { | |
| val deferred = CompletableDeferred<String>() | |
| } | |
| class BatchHttpInterceptor constructor( | |
| private val batchIntervalMillis: Long = 10, | |
| private val maxBatchSize: Int = 10, | |
| ) { | |
| private val startMark = markNow() | |
| @OptIn(ExperimentalCoroutinesApi::class) | |
| private val dispatcher = Dispatchers.Default.limitedParallelism(1) | |
| private val scope = CoroutineScope(dispatcher) | |
| private val mutex = Mutex() | |
| private val pendingRequests = mutableListOf<PendingRequest>() | |
| suspend fun intercept(request: String): String { | |
| val pendingRequest = PendingRequest(request) | |
| val sendNow = mutex.withLock { | |
| println("add request $pendingRequest") | |
| pendingRequests.add(pendingRequest) | |
| pendingRequests.size >= maxBatchSize | |
| } | |
| if (sendNow) { | |
| println("send now!") | |
| executePendingRequests() | |
| } else { | |
| scope.launch { | |
| delay(batchIntervalMillis - (startMark.elapsedNow().inWholeMilliseconds % batchIntervalMillis) - 1) | |
| executePendingRequests() | |
| } | |
| } | |
| return pendingRequest.deferred.await() | |
| } | |
| private suspend fun executePendingRequests() { | |
| println("executePendingRequests isLocked: ${mutex.isLocked}") | |
| val pending = mutex.withLock { // due to the lock could not be acquired here, the requests accumulates | |
| println("sender acquired") | |
| val copy = pendingRequests.toList() | |
| pendingRequests.clear() | |
| copy | |
| } | |
| if (pending.isEmpty()) { | |
| return | |
| } | |
| println("---> sending pending request size: ${pending.size}") | |
| println(" sending requests: ${pending.map { it.request }}") | |
| } | |
| } | |
| fun main() = runBlocking { | |
| val interceptor = BatchHttpInterceptor(batchIntervalMillis = 10, maxBatchSize = 10) | |
| List(100) { index -> | |
| async(Dispatchers.IO) { | |
| val url = "url $index" | |
| interceptor.intercept(url) | |
| } | |
| }.awaitAll() | |
| println("------") | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class BatchHttpInterceptor2 constructor( | |
| private val batchIntervalMillis: Long = 10, | |
| private val maxBatchSize: Int = 10, | |
| ) { | |
| private val startMark = markNow() | |
| @OptIn(ExperimentalCoroutinesApi::class) | |
| private val dispatcher = Dispatchers.Default.limitedParallelism(1) | |
| private val scope = CoroutineScope(dispatcher) | |
| private val mutex = Mutex() | |
| private val pendingRequests = mutableListOf<PendingRequest>() | |
| suspend fun intercept(request: String): String { | |
| val pendingRequest = PendingRequest(request) | |
| val sendNow = mutex.withLock { | |
| println("add request $pendingRequest") | |
| pendingRequests.add(pendingRequest) | |
| val batchFull = pendingRequests.size >= maxBatchSize | |
| if (batchFull) { | |
| executePendingRequests(false) | |
| } | |
| batchFull | |
| } | |
| if (!sendNow) { | |
| scope.launch { | |
| delay(batchIntervalMillis - (startMark.elapsedNow().inWholeMilliseconds % batchIntervalMillis) - 1) | |
| executePendingRequests(needLock = true) | |
| } | |
| } | |
| return pendingRequest.deferred.await() | |
| } | |
| private suspend fun executePendingRequests(needLock: Boolean) { | |
| println("executePendingRequests isLocked: ${mutex.isLocked}") | |
| val pending = if (needLock) { | |
| mutex.withLock { | |
| println("sender acquired") | |
| val copy = pendingRequests.toList() | |
| pendingRequests.clear() | |
| copy | |
| } | |
| } else { | |
| val copy = pendingRequests.toList() | |
| pendingRequests.clear() | |
| copy | |
| } | |
| if (pending.isEmpty()) { | |
| return | |
| } | |
| println("---> sending pending request size: ${pending.size}") | |
| println(" sending requests: ${pending.map { it.request }}") | |
| } | |
| } | |
| fun main() = runBlocking { | |
| val interceptor = BatchHttpInterceptor2(batchIntervalMillis = 10, maxBatchSize = 10) | |
| List(100) { index -> | |
| async(Dispatchers.IO) { | |
| val url = "url $index" | |
| interceptor.intercept(url) | |
| } | |
| }.awaitAll() | |
| println("------") | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment