Last active
March 25, 2019 02:55
-
-
Save Dico200/2879fcbd205a31d1adcf3bab1aea6882 to your computer and use it in GitHub Desktop.
BehaviorSubjectChannel - assuming that means buffering the last element in a broadcast channel for new subscriptions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.* | |
import kotlinx.coroutines.channels.* | |
import kotlin.coroutines.CoroutineContext | |
import kotlin.coroutines.EmptyCoroutineContext | |
class BehaviorSubjectChannel<E>(val del: BroadcastChannel<E>, context: CoroutineContext? = null) : BroadcastChannel<E> by del { | |
@Volatile | |
private var hasElem: Boolean = false | |
@Volatile | |
private var lastElem: E? = null | |
private val scope = CoroutineScope((context ?: EmptyCoroutineContext) + SupervisorJob(context?.get(Job))) | |
init { | |
scope.createBufferJob() | |
} | |
override fun openSubscription(): ReceiveChannel<E> { | |
val result = del.openSubscription() | |
if (hasElem) { | |
scope.launch(start = CoroutineStart.UNDISPATCHED) { | |
@Suppress("UNCHECKED_CAST") | |
(result as SendChannel<E>).send(lastElem as E) | |
} | |
} | |
return result | |
} | |
private fun CoroutineScope.createBufferJob() { | |
launch { | |
val channel = del.openSubscription() | |
try { | |
lastElem = channel.receive() | |
hasElem = true | |
for (elem in channel) { | |
lastElem = elem | |
} | |
} catch (e: ClosedReceiveChannelException) { | |
} finally { | |
channel.cancel() | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment