Skip to content

Instantly share code, notes, and snippets.

@gstraymond
Last active February 16, 2023 16:55
Show Gist options
  • Save gstraymond/1805d2f5c1b98c2cd2d9e70f3d32ec1e to your computer and use it in GitHub Desktop.
Save gstraymond/1805d2f5c1b98c2cd2d9e70f3d32ec1e to your computer and use it in GitHub Desktop.
Converts a MongoDB observable to a ZIO stream
import org.mongodb.scala.{Observable, Observer}
import zio.stream._
import zio.{IO, ZIO}
object ZioMongoSupport {
implicit class ObsConverter[T](observable: Observable[T]) {
def toStream: Stream[Throwable, T] = toStream(identity)
def toStream[E](handleError: Throwable => E): Stream[E, T] =
Stream.effectAsync { f =>
observable.subscribe {
new Observer[T] {
override def onNext(result: T): Unit = f(ZIO.succeed(result))
override def onError(e: Throwable): Unit = f(ZIO.fail(Some(handleError(e))))
override def onComplete(): Unit = f(ZIO.fail(None))
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment