Skip to content

Instantly share code, notes, and snippets.

@creeefs
Last active April 19, 2017 22:02
Show Gist options
  • Select an option

  • Save creeefs/b00f7b96b232af23684428f668a682fd to your computer and use it in GitHub Desktop.

Select an option

Save creeefs/b00f7b96b232af23684428f668a682fd to your computer and use it in GitHub Desktop.
Testing toFutureOpt method in RxUtil
package mlbam.betljus.core.util
import mlbam.betljus.core.util.RxUtil._
import org.scalatest.FlatSpec
import org.scalatest.concurrent.ScalaFutures._
import rx.Observable
class RxUtilTest extends FlatSpec {
"toFutureOpt" should "return an error if the observable fails" in {
val f = Observable
.just(1)
.doOnNext(i => if (i == 1) throw new RuntimeException("uhoh"))
.toFutureOpt
whenReady(f) {
ex => assert(ex.isInstanceOf[RuntimeException])
}
}
}
@creeefs

creeefs commented Apr 19, 2017

Copy link
Copy Markdown
Author
package mlbam.betljus.core.util

import mlbam.betljus.core.util.RxUtil._
import org.scalatest.AsyncFlatSpec
import rx.Observable

class RxUtilTest extends AsyncFlatSpec {
  "toFutureOpt" should "return an error if the observable fails" in {
    recoverToSucceededIf[RuntimeException] {
      Observable
        .just(1)
        .doOnNext(i => if (i == 1) throw new RuntimeException("uhoh"))
        .toFutureOpt
    }
  }
}

@creeefs

creeefs commented Apr 19, 2017

Copy link
Copy Markdown
Author
package mlbam.betljus.core.util

import scala.concurrent.{ Future, Promise }

import org.reactivestreams.{ Publisher, Subscriber }

import akka.NotUsed
import akka.stream.scaladsl.Source
import rx.Observable

/**
 * Extensions to convert rx.Observable into Futures and akka Streams
 */
object RxUtil {
  implicit class ObservableExtensions[T](o: Observable[T]) {

    /**
     * @return a Future for a single-element Observable
     */
    def toFuture: Future[T] = {
      val p = Promise[T]
      o.single()
        .subscribe(p.success, p.failure)
      p.future
    }

    /**
     * @return the first element of the observable, or None if the observable is empty.
     */
    def toFutureOpt: Future[Option[T]] = {
      val p = Promise[Option[T]]
      o.map[Option[T]](Some(_))
        .singleOrDefault(None)
        .subscribe(p.success, p.failure)
      p.future
    }

    /**
     * @return An akka-stream version of this observable
     */
    def toAkka: Source[T, NotUsed] = {
      val pub = new Publisher[T] {
        override def subscribe(s: Subscriber[_ >: T]): Unit = {
          val rxSub = new rx.Subscriber[T]() {
            override def onError(e: Throwable): Unit = s.onError(e)

            override def onCompleted(): Unit = s.onComplete()

            override def onNext(t: T): Unit = s.onNext(t)
          }
          o.subscribe(rxSub)
        }
      }
      Source.fromPublisher(pub)
    }
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment