Created
January 30, 2019 22:33
-
-
Save Groostav/2976cffaee572e2cf2b3eb1c6220ea82 to your computer and use it in GitHub Desktop.
a channel that distrubutes messages from another channel, with state checks to make sure you cant loose any messages
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package groostav.kotlinx.exec | |
import kotlinx.coroutines.* | |
import kotlinx.coroutines.Dispatchers.Unconfined | |
import kotlinx.coroutines.channels.Channel | |
import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS | |
import kotlinx.coroutines.channels.ReceiveChannel | |
import kotlinx.coroutines.channels.consumeEach | |
import java.util.concurrent.atomic.AtomicInteger | |
import java.util.concurrent.atomic.AtomicReference | |
// the express purpose of this object is to block on send, | |
// adhere to all back-pressure provided by any of the subscribers! | |
// in this way we pass on any problems back up to source! | |
class SimpleInlineMulticaster<T>(val name: String) { | |
constructor(): this("anonymous${counter.getAndIncrement()}") | |
sealed class State<T> { | |
class Registration<T>(val subs: List<Channel<T>> = emptyList()): State<T>() { | |
override fun toString() = "Registration" | |
} | |
class Running<T>(val subs: List<Channel<T>> = emptyList(), val src: ReceiveChannel<T>): State<T>() { | |
override fun toString() = "Running{$src}" | |
} | |
class Closed<T>(): State<T>() { | |
override fun toString() = "Closed" | |
} | |
} | |
private val state: AtomicReference<State<T>> = AtomicReference(State.Registration()) | |
private val sourceJob = CompletableDeferred<Unit>() | |
init { | |
trace { "instanced $this" } | |
} | |
fun sinkFrom(source: ReceiveChannel<T>): Job { | |
val newState = state.updateAndGet { | |
when(it){ | |
is State.Registration -> State.Running(it.subs, source) | |
is State.Running -> throw IllegalStateException("already started") | |
is State.Closed -> throw IllegalStateException("already started") | |
} | |
} | |
if (newState !is State.Running) { | |
throw IllegalStateException("can only start syndicating once") | |
} | |
trace { "publishing src=$source to $this, locked-in subs: ${newState.subs.joinToString("\n\t", "\n\t")}" } | |
return GlobalScope.launch(Unconfined + CoroutineName([email protected]())) { | |
try { | |
source.consumeEach { next -> | |
// note: even if we have zero subs, we still want to read to completion | |
// this is because source is likely a rendezvous channel, and thus we need to block on it. | |
for (sub in newState.subs) { | |
sub.send(next) | |
// apply back-pressure from _all_ subs, | |
// suspending the upstream until all children are satisfied. | |
} | |
} | |
} | |
finally { | |
shutdown() | |
} | |
} | |
} | |
private fun shutdown(){ | |
val previous = state.getAndUpdate { | |
when(it){ | |
is State.Registration -> throw IllegalStateException() | |
is State.Running -> State.Closed() | |
is State.Closed -> it | |
} | |
} | |
if(previous is State.Running){ | |
trace { "${this@SimpleInlineMulticaster} saw EOF, closing subs" } | |
for (it in previous.subs) { it.close() } | |
sourceJob.complete(Unit) | |
trace { "all subs of ${this@SimpleInlineMulticaster} closed" } | |
} | |
} | |
fun openSubscription(description: String? = null): ReceiveChannel<T> { | |
val registered = state.updateAndGet { | |
when(it){ | |
is State.Registration<T> -> { | |
val subscription = newSubscription(it, description) | |
State.Registration(it.subs + subscription) | |
} | |
is State.Running, is State.Closed -> throw IllegalStateException("state = $it") | |
} | |
} | |
registered as? State.Registration<T> ?: throw IllegalStateException("state = $registered") | |
val subscription = registered.subs.last() | |
trace { "opened $subscription from ${this@SimpleInlineMulticaster}" } | |
return subscription | |
} | |
private fun newSubscription(registration: State.Registration<T>, description: String?): Channel<T> { | |
val subSuffix = if(description != null) "[$description]" else "" | |
val resultActual = Channel<T>(RENDEZVOUS) | |
return object: Channel<T> by resultActual { | |
val id = registration.subs.size+1 | |
override fun toString() = "sub$id$subSuffix-$name[$resultActual]" | |
} | |
} | |
// suspends until source is empty and all elements have been dispatched to all subscribers. | |
// key functional difference here vs BroadcastChannel. | |
fun asJob(): Job = sourceJob | |
override fun toString() = "caster-$name{${state.get()}}" | |
companion object { | |
private val counter = AtomicInteger(1) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment