Last active
January 27, 2019 03:18
-
-
Save rmichela/470880b2d67858700dad7dc244a81ad8 to your computer and use it in GitHub Desktop.
gRPC backpressure
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
syntax = "proto3"; | |
package servicelibs; | |
option java_package = "servicelibs"; | |
option java_outer_classname = "NumberProto"; | |
import "google/protobuf/empty.proto"; | |
service Numbers { | |
rpc RequestPressure (stream Number) returns (google.protobuf.Empty) {} | |
} | |
message Number { | |
int32 number = 1; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.protobuf.Empty; | |
import io.grpc.Channel; | |
import io.grpc.Server; | |
import io.grpc.inprocess.InProcessChannelBuilder; | |
import io.grpc.inprocess.InProcessServerBuilder; | |
import io.grpc.stub.ClientCallStreamObserver; | |
import io.grpc.stub.ClientResponseObserver; | |
import io.grpc.stub.ServerCallStreamObserver; | |
import io.grpc.stub.StreamObserver; | |
import java.util.Iterator; | |
public class BackpressureTest { | |
public static void main(String[] args) throws Exception { | |
final Object lock = new Object(); | |
Server server = InProcessServerBuilder.forName("svc").addService(new NumbersImpl()).build().start(); | |
Channel channel = InProcessChannelBuilder.forName("svc").build(); | |
NumbersGrpc.NumbersStub stub = NumbersGrpc.newStub(channel); | |
final Iterator<Integer> seq = new Sequence(50).iterator(); | |
StreamObserver<NumberProto.Number> requestObserver = stub.requestPressure(new ClientResponseObserver<NumberProto.Number, Empty>() { | |
public void beforeStart(final ClientCallStreamObserver<NumberProto.Number> requestStream) { | |
requestStream.setOnReadyHandler(new Runnable() { | |
public void run() { | |
if (seq.hasNext()) { | |
int i = seq.next(); | |
System.out.println(i + " -->"); | |
requestStream.onNext(asNum(i)); | |
} else { | |
requestStream.onCompleted(); | |
} | |
} | |
}); | |
} | |
public void onNext(Empty value) { | |
System.out.println("Got empty"); | |
} | |
public void onError(Throwable t) { | |
t.printStackTrace(); | |
} | |
public void onCompleted() { | |
synchronized (lock) { | |
lock.notify(); | |
} | |
} | |
}); | |
synchronized (lock) { | |
lock.wait(); | |
} | |
} | |
private static class NumbersImpl extends NumbersGrpc.NumbersImplBase { | |
@Override | |
public StreamObserver<NumberProto.Number> requestPressure(final StreamObserver<Empty> responseObserver) { | |
final ServerCallStreamObserver<Empty> serverCallStreamObserver = (ServerCallStreamObserver<Empty>) responseObserver; | |
serverCallStreamObserver.disableAutoInboundFlowControl(); | |
serverCallStreamObserver.request(1); | |
return new StreamObserver<NumberProto.Number>() { | |
public void onNext(NumberProto.Number value) { | |
try { Thread.sleep(500); } catch (InterruptedException e) {} | |
System.out.println(" --> " + value.getNumber()); | |
serverCallStreamObserver.request(1); | |
} | |
public void onError(Throwable t) { | |
} | |
public void onCompleted() { | |
responseObserver.onNext(Empty.getDefaultInstance()); | |
responseObserver.onCompleted(); | |
} | |
}; | |
} | |
} | |
private static NumberProto.Number asNum(int i) { | |
return NumberProto.Number.newBuilder().setNumber(i).build(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Iterator; | |
public class Sequence implements Iterable<Integer> { | |
private final int max; | |
public Sequence(int max) { | |
this.max = max; | |
} | |
public Iterator<Integer> iterator() { | |
return new Iterator<Integer>() { | |
int i = 0; | |
public void remove() { | |
throw new UnsupportedOperationException(); | |
} | |
public boolean hasNext() { | |
return i < max; | |
} | |
public Integer next() { | |
try { Thread.sleep(250); } catch (InterruptedException e) {} | |
return i++; | |
} | |
}; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment