Skip to content

Instantly share code, notes, and snippets.

@Dico200
Last active March 25, 2019 02:55
Show Gist options
  • Save Dico200/2879fcbd205a31d1adcf3bab1aea6882 to your computer and use it in GitHub Desktop.
Save Dico200/2879fcbd205a31d1adcf3bab1aea6882 to your computer and use it in GitHub Desktop.
BehaviorSubjectChannel - assuming that means buffering the last element in a broadcast channel for new subscriptions
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
class BehaviorSubjectChannel<E>(val del: BroadcastChannel<E>, context: CoroutineContext? = null) : BroadcastChannel<E> by del {
@Volatile
private var hasElem: Boolean = false
@Volatile
private var lastElem: E? = null
private val scope = CoroutineScope((context ?: EmptyCoroutineContext) + SupervisorJob(context?.get(Job)))
init {
scope.createBufferJob()
}
override fun openSubscription(): ReceiveChannel<E> {
val result = del.openSubscription()
if (hasElem) {
scope.launch(start = CoroutineStart.UNDISPATCHED) {
@Suppress("UNCHECKED_CAST")
(result as SendChannel<E>).send(lastElem as E)
}
}
return result
}
private fun CoroutineScope.createBufferJob() {
launch {
val channel = del.openSubscription()
try {
lastElem = channel.receive()
hasElem = true
for (elem in channel) {
lastElem = elem
}
} catch (e: ClosedReceiveChannelException) {
} finally {
channel.cancel()
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment