Created
January 1, 2018 00:19
-
-
Save assiotis/a613815c6b67b01ed12e8a91e46ac3ac to your computer and use it in GitHub Desktop.
An example of how to translate between gRPC and akka streams server-side using the reactive-grpc project
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.ExecutorService | |
import akka.stream.Materializer | |
import akka.stream.scaladsl.{Flow, Sink, Source} | |
import com.salesforce.reactivegrpccommon.{ReactivePublisherBackpressureOnReadyHandler, ReactiveStreamObserverPublisher} | |
import io.grpc.internal.{GrpcUtil, SerializingExecutor} | |
import io.grpc.stub.{CallStreamObserver, ServerCallStreamObserver, StreamObserver} | |
import io.grpc.{Status, StatusException, StatusRuntimeException} | |
import scala.concurrent.ExecutionContext | |
import scala.util.{Failure, Success} | |
object ServerCalls { | |
def oneToOne[TReq, TResp](req: TReq, obs: StreamObserver[TResp], delegate: Flow[TReq, TResp, _])(implicit mat: Materializer): Unit = { | |
Source | |
.single(req) | |
.via(delegate) | |
.filterNot(_ => obs.asInstanceOf[ServerCallStreamObserver[TResp]].isCancelled) | |
.runForeach(obs.onNext) | |
.onComplete { | |
case Success(_) => obs.onCompleted() | |
case Failure(t: StatusException) => obs.onError(t) | |
case Failure(t: StatusRuntimeException) => obs.onError(t) | |
case Failure(t: Throwable) => obs.onError(Status.fromThrowable(t).asException()) | |
}(ReactiveExecutionContext.getExecutionContext) | |
} | |
def oneToMany[TReq, TResp](req: TReq, obs: StreamObserver[TResp], delegate: Flow[TReq, TResp, _])(implicit mat: Materializer): Unit = { | |
val subscriber = new ReactivePublisherBackpressureOnReadyHandler(obs.asInstanceOf[ServerCallStreamObserver[TResp]]) | |
Source | |
.single(req) | |
.via(delegate) | |
.runWith(Sink.fromSubscriber(subscriber)) | |
} | |
def manyToOne[TReq, TResp](obs: StreamObserver[TResp], delegate: Flow[TReq, TResp, _])(implicit mat: Materializer): StreamObserver[TReq] = { | |
val publisher = new ReactiveStreamObserverPublisher[TReq](obs.asInstanceOf[CallStreamObserver[TResp]]) | |
Source | |
.fromPublisher(publisher) | |
.via(delegate) | |
.limit(1) | |
.filterNot(_ => publisher.isCanceled) | |
.runForeach(obs.onNext) | |
.onComplete { | |
case Success(_) => obs.onCompleted() | |
case Failure(t: StatusException) => obs.onError(t) | |
case Failure(t: StatusRuntimeException) => obs.onError(t) | |
case Failure(t: Throwable) => obs.onError(Status.fromThrowable(t).asException()) | |
}(ReactiveExecutionContext.getExecutionContext) | |
new StreamObserver[TReq] { | |
override def onError(t: Throwable): Unit = publisher.onError(t) | |
override def onCompleted(): Unit = publisher.onCompleted() | |
override def onNext(value: TReq): Unit = publisher.onNext(value) | |
} | |
} | |
def manyToMany[TReq, TResp](obs: StreamObserver[TResp], delegate: Flow[TReq, TResp, _])(implicit mat: Materializer): StreamObserver[TReq] = { | |
val publisher = new ReactiveStreamObserverPublisher[TReq](obs.asInstanceOf[CallStreamObserver[TResp]]) | |
val subscriber = new ReactivePublisherBackpressureOnReadyHandler(obs.asInstanceOf[ServerCallStreamObserver[TResp]]) | |
Source | |
.fromPublisher(publisher) | |
.via(delegate) | |
.filterNot(_ => publisher.isCanceled) | |
.runWith(Sink.fromSubscriber(subscriber)) | |
new StreamObserver[TReq] { | |
override def onError(t: Throwable): Unit = publisher.onError(t) | |
override def onCompleted(): Unit = publisher.onCompleted() | |
override def onNext(value: TReq): Unit = publisher.onNext(value) | |
} | |
} | |
} | |
object ReactiveExecutionContext { | |
lazy val executor: ExecutorService = GrpcUtil.SHARED_CHANNEL_EXECUTOR.create() | |
def getExecutionContext = ExecutionContext.fromExecutor(new SerializingExecutor(executor)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@assiotis Do you have example of SBT project which compile
proto
files usingRxGrpcGenerator
? I'm having problems with it :( salesforce/reactive-grpc#94