Skip to content

Instantly share code, notes, and snippets.

@Centaur
Created April 29, 2016 11:16
Show Gist options
  • Save Centaur/1a2fe4dc032c31f0c4a67d43be65bab6 to your computer and use it in GitHub Desktop.
Save Centaur/1a2fe4dc032c31f0c4a67d43be65bab6 to your computer and use it in GitHub Desktop.
truncate on subscribe
package org.snippets
import rx.lang.scala._
import rx.lang.scala.subjects.ReplaySubject
import scala.concurrent.Promise
import scala.concurrent.ExecutionContext.Implicits.global
object MergedSubject {
def main(args: Array[String]) {
val r = ReplaySubject[Int]()
r.onNext(1)
r.onNext(2)
r.onNext(3)
val promise = Promise[List[Int]]()
Observable.defer(Observable[Int] { observer =>
r.doOnNext { e =>
println(s"next $e")
observer.onNext(e)
}.subscribe()
observer.onCompleted()
})
.subscribeOn(schedulers.NewThreadScheduler()).toList
.subscribe { l =>
println(s"total $l")
promise.success(l)
}
promise.future map println
r.onNext(4)
Thread.sleep(2000)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment