Last active
February 16, 2023 16:55
-
-
Save gstraymond/1805d2f5c1b98c2cd2d9e70f3d32ec1e to your computer and use it in GitHub Desktop.
Converts a MongoDB observable to a ZIO stream
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.mongodb.scala.{Observable, Observer} | |
import zio.stream._ | |
import zio.{IO, ZIO} | |
object ZioMongoSupport { | |
implicit class ObsConverter[T](observable: Observable[T]) { | |
def toStream: Stream[Throwable, T] = toStream(identity) | |
def toStream[E](handleError: Throwable => E): Stream[E, T] = | |
Stream.effectAsync { f => | |
observable.subscribe { | |
new Observer[T] { | |
override def onNext(result: T): Unit = f(ZIO.succeed(result)) | |
override def onError(e: Throwable): Unit = f(ZIO.fail(Some(handleError(e)))) | |
override def onComplete(): Unit = f(ZIO.fail(None)) | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment