Skip to content

Instantly share code, notes, and snippets.

@luciofm
Last active November 26, 2018 18:59
Show Gist options
  • Save luciofm/b166ae2ef2d7c558b6d4e48f1cab03df to your computer and use it in GitHub Desktop.
Save luciofm/b166ae2ef2d7c558b6d4e48f1cab03df to your computer and use it in GitHub Desktop.
Batch actor with coroutines
private inline fun <T> createBatchActor(context: CoroutineContext = CommonPool,
parent: Job? = null,
maxSize: Int = 100,
maxTime: Int = 500,
crossinline block: (List<T>) -> Unit): SendChannel<T> {
return actor(context, parent = parent) {
val batch = ArrayList<T>(maxSize)
var deadline = 0L // deadline for sending this batch to callback block
while(true) {
// when deadline is reached or size is exceeded, pass the batch to the callback block
val remainingTime = deadline - System.currentTimeMillis()
if (batch.isNotEmpty() && remainingTime <= 0 || batch.size >= maxSize) {
Timber.d("Processing batch: ${batch.size}")
block(batch.toList())
batch.clear()
continue
}
// wait until items is received or timeout reached
select<Unit> {
// when received -> add to batch
channel.onReceive {
batch.add(it)
//Timber.d("Adding user to batch: ${batch.size}")
// init deadline on first item added to batch
if (batch.size == 1) deadline = System.currentTimeMillis() + maxTime
}
// when timeout is reached just finish select, note: no timeout when batch is empty
if (batch.isNotEmpty()) onTimeout(remainingTime.orZero()) {}
}
if (!isActive) break
}
}
}
val userActor = createBatchActor<User>(activeUsersContext, parent = connectJob,
maxSize = 500, maxTime = 1000) { users ->
// DO something wiht the batched User list
}
userActor.send(user)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment