Last active
June 22, 2023 19:44
-
-
Save hkolbeck/e1d5fa50c6418c13f53576fb1c049c14 to your computer and use it in GitHub Desktop.
A sketch of a system for sharing buffers where they pass through a broadcast and are only returned to the pool once all expected consumers have handled them
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package industries.hannah.pixelblaze.sensor | |
import kotlinx.coroutines.* | |
import kotlinx.coroutines.flow.MutableSharedFlow | |
import kotlinx.coroutines.sync.Mutex | |
import kotlinx.coroutines.sync.withLock | |
import java.io.Closeable | |
import java.util.* | |
import java.util.concurrent.ArrayBlockingQueue | |
import java.util.concurrent.ConcurrentHashMap | |
import java.util.concurrent.TimeUnit | |
import java.util.concurrent.atomic.AtomicInteger | |
import java.util.concurrent.atomic.AtomicReference | |
private data class BufWrapper( | |
val buf: ByteArray, | |
val remainingConsumers: AtomicInteger, | |
val generation: ULong, | |
val generationMembers: Set<UUID> | |
) | |
private const val BUFFER_POOL = 10 | |
private const val BUFFER_SIZE = 1024 | |
class SharingIsCaring( | |
private val handlerScope: CoroutineScope | |
) : Closeable { | |
// Protects activeListeners for writes, main listener loops don't interact | |
private val listenerGuard = Mutex(false) | |
// Current listener IDs | |
private val activeListeners = ConcurrentHashMap<UUID, Unit>() | |
// Wrapper around data about the current generation | |
private val generationGuard = AtomicReference<Pair<ULong, Set<UUID>>>(Pair(0u, setOf())) | |
// The pool of buffers being shared | |
private val bufPool = ArrayBlockingQueue<ByteArray>(BUFFER_POOL) | |
// The Multithreaded-in multi-consumer-out flow | |
private val flow = MutableSharedFlow<BufWrapper>() | |
init { | |
repeat(BUFFER_POOL) { | |
bufPool.add(ByteArray(BUFFER_SIZE)) | |
} | |
} | |
suspend fun addListener(handler: (ByteArray) -> Unit): UUID { | |
val id = UUID.randomUUID() | |
listenerGuard.withLock { | |
// Add the new listener to a new generation | |
activeListeners[id] = Unit | |
val oldGen = generationGuard.get() | |
val newGen = Pair(oldGen.first + 1u, activeListeners.keys().toList().toSet()) | |
generationGuard.set(newGen) | |
// Launch the coroutine, flow.collect {} never returns until the job its in is cancelled | |
handlerScope.launch { | |
var generation = newGen.first | |
flow.collect { bufWrapper -> | |
var shouldDispatch = true | |
if (generation < bufWrapper.generation) { | |
if (bufWrapper.generationMembers.contains(id)) { | |
generation = bufWrapper.generation | |
} else { | |
// We're no longer wanted, cancel ourselves | |
coroutineContext.job.cancel() | |
shouldDispatch = false | |
} | |
} | |
if (shouldDispatch) { | |
try { | |
handler(bufWrapper.buf) | |
} catch (t: Throwable) { | |
println("Exception in listener: $id: ${t.message}") | |
t.printStackTrace() | |
} finally { | |
// If we're the last consumer expected, return the buffer to the pool | |
if (bufWrapper.remainingConsumers.decrementAndGet() == 0) { | |
bufPool.offer(bufWrapper.buf) //If this fails it'll be GCed | |
} | |
} | |
} | |
} | |
} | |
} | |
return id | |
} | |
suspend fun removeListener(id: UUID) { | |
listenerGuard.withLock { | |
val oldGen = generationGuard.get() | |
activeListeners.remove(id) | |
val newGen = Pair(oldGen.first + 1u, activeListeners.keys().toList().toSet()) | |
generationGuard.set(newGen) | |
// We don't want to cancel here, because there could be buffers in the pipe that expect it to be alive. | |
// We cancel in the listener itself instead as soon as a next-gen wrapper hits it | |
} | |
} | |
// Assume all serialization produces the same data length, equal to the BUFFER_SIZE | |
suspend fun send(toSend: Any) { | |
// Try to use the shared buffers, but just alloc if we can't | |
val buf = bufPool.poll(0, TimeUnit.MILLISECONDS) ?: ByteArray(BUFFER_SIZE) | |
val generation = generationGuard.get() | |
/* Do some serialization */ | |
flow.emit(BufWrapper(buf, AtomicInteger(generation.second.size), generation.first, generation.second)) | |
} | |
suspend fun removeAllListeners() { | |
listenerGuard.withLock { | |
activeListeners.clear() | |
val oldGen = generationGuard.get() | |
val newGen = Pair<ULong, Set<UUID>>(oldGen.first + 1u, setOf()) | |
generationGuard.set(newGen) | |
} | |
} | |
override fun close() { | |
runBlocking { | |
removeAllListeners() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment