Last active
April 19, 2017 22:02
-
-
Save creeefs/b00f7b96b232af23684428f668a682fd to your computer and use it in GitHub Desktop.
Testing toFutureOpt method in RxUtil
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package mlbam.betljus.core.util | |
| import mlbam.betljus.core.util.RxUtil._ | |
| import org.scalatest.FlatSpec | |
| import org.scalatest.concurrent.ScalaFutures._ | |
| import rx.Observable | |
| class RxUtilTest extends FlatSpec { | |
| "toFutureOpt" should "return an error if the observable fails" in { | |
| val f = Observable | |
| .just(1) | |
| .doOnNext(i => if (i == 1) throw new RuntimeException("uhoh")) | |
| .toFutureOpt | |
| whenReady(f) { | |
| ex => assert(ex.isInstanceOf[RuntimeException]) | |
| } | |
| } | |
| } |
creeefs
commented
Apr 19, 2017
Author
Author
package mlbam.betljus.core.util
import scala.concurrent.{ Future, Promise }
import org.reactivestreams.{ Publisher, Subscriber }
import akka.NotUsed
import akka.stream.scaladsl.Source
import rx.Observable
/**
* Extensions to convert rx.Observable into Futures and akka Streams
*/
object RxUtil {
implicit class ObservableExtensions[T](o: Observable[T]) {
/**
* @return a Future for a single-element Observable
*/
def toFuture: Future[T] = {
val p = Promise[T]
o.single()
.subscribe(p.success, p.failure)
p.future
}
/**
* @return the first element of the observable, or None if the observable is empty.
*/
def toFutureOpt: Future[Option[T]] = {
val p = Promise[Option[T]]
o.map[Option[T]](Some(_))
.singleOrDefault(None)
.subscribe(p.success, p.failure)
p.future
}
/**
* @return An akka-stream version of this observable
*/
def toAkka: Source[T, NotUsed] = {
val pub = new Publisher[T] {
override def subscribe(s: Subscriber[_ >: T]): Unit = {
val rxSub = new rx.Subscriber[T]() {
override def onError(e: Throwable): Unit = s.onError(e)
override def onCompleted(): Unit = s.onComplete()
override def onNext(t: T): Unit = s.onNext(t)
}
o.subscribe(rxSub)
}
}
Source.fromPublisher(pub)
}
}
}
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment