Skip to content

Instantly share code, notes, and snippets.

@alexandru
Last active September 11, 2019 15:22
Show Gist options
  • Save alexandru/3b6947c734a53f4212b404d7ec460ab0 to your computer and use it in GitHub Desktop.
Save alexandru/3b6947c734a53f4212b404d7ec460ab0 to your computer and use it in GitHub Desktop.
import monix.execution.{Cancelable, Scheduler}
import monix.execution.atomic.Atomic
import monix.reactive.Observable
import monix.reactive.observables.ConnectableObservable
import monix.reactive.observers.Subscriber
import monix.reactive.subjects.PublishSubject
final class AwaitSubscribersObservable[A](source: Observable[A], awaitCount: Int)
(implicit sc: Scheduler) extends Observable[A] {
private[this] val count = Atomic(0)
private[this] val conn =
ConnectableObservable.cacheUntilConnect(source, PublishSubject[A]())(sc)
override def unsafeSubscribeFn(subscriber: Subscriber[A]): Cancelable = {
count.get() match {
case ACTIVE =>
conn.unsafeSubscribeFn(subscriber)
case n =>
val n2 = n + 1
var update = n2
if (n2 >= awaitCount) {
update = ACTIVE
}
if (!count.compareAndSet(n, update))
unsafeSubscribeFn(subscriber) // retry
else {
val c = conn.unsafeSubscribeFn(subscriber)
if (update == ACTIVE) conn.connect()
Cancelable { () =>
c.cancel()
count.transformAndGet(n => if (n > 0) n - 1 else n)
}
}
}
}
private[this] val ACTIVE = -1
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment