Skip to content

Instantly share code, notes, and snippets.

@jchapuis
Last active July 28, 2017 05:33
Show Gist options
  • Save jchapuis/201889106ca214018ab1516f589aa7ad to your computer and use it in GitHub Desktop.
Save jchapuis/201889106ca214018ab1516f589aa7ad to your computer and use it in GitHub Desktop.
Adapters to convert a grpc stream to a back-pressured observable (supporting back-pressure)
import io.grpc.stub.{ClientCallStreamObserver, ClientResponseObserver, StreamObserver}
import monix.execution.Ack.{Continue, Stop}
import monix.execution.{Cancelable, Scheduler}
import monix.reactive.observers.Subscriber
import monix.reactive.{Observable, Observer}
import scala.concurrent.{CancellationException}
import scala.util.{Failure, Success}
object MonixGrpcAdapters {
def observe[TReq, TResp](call: (TReq, StreamObserver[TResp]) => Unit,
request: TReq)(
implicit scheduler: Scheduler): Observable[TResp] = {
Observable.unsafeCreate((s: Subscriber[TResp]) => {
val grpcObserver = createBackpressuredGrpcResponseObserver(s)(scheduler)
call(request, grpcObserver)
Cancelable(() => grpcObserver.onError(new CancellationException()))
})
}
private def createBackpressuredGrpcResponseObserver[ReqT, RespT](
observer: Observer[RespT])(implicit scheduler: Scheduler) =
new ClientResponseObserver[ReqT, RespT] {
var outbound: ClientCallStreamObserver[ReqT] = _
override def beforeStart(
requestStream: ClientCallStreamObserver[ReqT]): Unit = {
outbound = requestStream
outbound.disableAutoInboundFlowControl()
}
override def onError(t: Throwable): Unit = observer.onError(t)
override def onCompleted(): Unit = observer.onComplete()
override def onNext(value: RespT): Unit =
observer
.onNext(value)
.syncOnComplete {
case Success(Continue) => outbound.request(1)
case Success(Stop) => outbound.onError(new CancellationException())
case Failure(t) => outbound.onError(t)
}(scheduler)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment