Skip to content

Instantly share code, notes, and snippets.

@assiotis
Created January 1, 2018 00:19
Show Gist options
  • Save assiotis/a613815c6b67b01ed12e8a91e46ac3ac to your computer and use it in GitHub Desktop.
Save assiotis/a613815c6b67b01ed12e8a91e46ac3ac to your computer and use it in GitHub Desktop.
An example of how to translate between gRPC and akka streams server-side using the reactive-grpc project
import java.util.concurrent.ExecutorService
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import com.salesforce.reactivegrpccommon.{ReactivePublisherBackpressureOnReadyHandler, ReactiveStreamObserverPublisher}
import io.grpc.internal.{GrpcUtil, SerializingExecutor}
import io.grpc.stub.{CallStreamObserver, ServerCallStreamObserver, StreamObserver}
import io.grpc.{Status, StatusException, StatusRuntimeException}
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}
object ServerCalls {
def oneToOne[TReq, TResp](req: TReq, obs: StreamObserver[TResp], delegate: Flow[TReq, TResp, _])(implicit mat: Materializer): Unit = {
Source
.single(req)
.via(delegate)
.filterNot(_ => obs.asInstanceOf[ServerCallStreamObserver[TResp]].isCancelled)
.runForeach(obs.onNext)
.onComplete {
case Success(_) => obs.onCompleted()
case Failure(t: StatusException) => obs.onError(t)
case Failure(t: StatusRuntimeException) => obs.onError(t)
case Failure(t: Throwable) => obs.onError(Status.fromThrowable(t).asException())
}(ReactiveExecutionContext.getExecutionContext)
}
def oneToMany[TReq, TResp](req: TReq, obs: StreamObserver[TResp], delegate: Flow[TReq, TResp, _])(implicit mat: Materializer): Unit = {
val subscriber = new ReactivePublisherBackpressureOnReadyHandler(obs.asInstanceOf[ServerCallStreamObserver[TResp]])
Source
.single(req)
.via(delegate)
.runWith(Sink.fromSubscriber(subscriber))
}
def manyToOne[TReq, TResp](obs: StreamObserver[TResp], delegate: Flow[TReq, TResp, _])(implicit mat: Materializer): StreamObserver[TReq] = {
val publisher = new ReactiveStreamObserverPublisher[TReq](obs.asInstanceOf[CallStreamObserver[TResp]])
Source
.fromPublisher(publisher)
.via(delegate)
.limit(1)
.filterNot(_ => publisher.isCanceled)
.runForeach(obs.onNext)
.onComplete {
case Success(_) => obs.onCompleted()
case Failure(t: StatusException) => obs.onError(t)
case Failure(t: StatusRuntimeException) => obs.onError(t)
case Failure(t: Throwable) => obs.onError(Status.fromThrowable(t).asException())
}(ReactiveExecutionContext.getExecutionContext)
new StreamObserver[TReq] {
override def onError(t: Throwable): Unit = publisher.onError(t)
override def onCompleted(): Unit = publisher.onCompleted()
override def onNext(value: TReq): Unit = publisher.onNext(value)
}
}
def manyToMany[TReq, TResp](obs: StreamObserver[TResp], delegate: Flow[TReq, TResp, _])(implicit mat: Materializer): StreamObserver[TReq] = {
val publisher = new ReactiveStreamObserverPublisher[TReq](obs.asInstanceOf[CallStreamObserver[TResp]])
val subscriber = new ReactivePublisherBackpressureOnReadyHandler(obs.asInstanceOf[ServerCallStreamObserver[TResp]])
Source
.fromPublisher(publisher)
.via(delegate)
.filterNot(_ => publisher.isCanceled)
.runWith(Sink.fromSubscriber(subscriber))
new StreamObserver[TReq] {
override def onError(t: Throwable): Unit = publisher.onError(t)
override def onCompleted(): Unit = publisher.onCompleted()
override def onNext(value: TReq): Unit = publisher.onNext(value)
}
}
}
object ReactiveExecutionContext {
lazy val executor: ExecutorService = GrpcUtil.SHARED_CHANNEL_EXECUTOR.create()
def getExecutionContext = ExecutionContext.fromExecutor(new SerializingExecutor(executor))
}
@unoexperto
Copy link

@assiotis Do you have example of SBT project which compile proto files using RxGrpcGenerator ? I'm having problems with it :( salesforce/reactive-grpc#94

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment