Created
November 3, 2020 17:36
-
-
Save nsivabalan/f97862d58c2ce7c59ef164d7d59d51fe to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.grpc.examples.routeguide; | |
import com.google.common.annotations.VisibleForTesting; | |
import com.google.protobuf.Message; | |
import io.grpc.Channel; | |
import io.grpc.ManagedChannel; | |
import io.grpc.ManagedChannelBuilder; | |
import io.grpc.Status; | |
import io.grpc.StatusRuntimeException; | |
import io.grpc.examples.routeguide.RouteGuideGrpc.RouteGuideBlockingStub; | |
import io.grpc.examples.routeguide.RouteGuideGrpc.RouteGuideStub; | |
import io.grpc.stub.StreamObserver; | |
import java.io.IOException; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.Random; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.TimeUnit; | |
import java.util.logging.Level; | |
import java.util.logging.Logger; | |
/** | |
* Sample client code that makes gRPC calls to the server. | |
*/ | |
public class RouteGuideClient { | |
private static final Logger logger = Logger.getLogger(RouteGuideClient.class.getName()); | |
private final RouteGuideBlockingStub blockingStub; | |
private final RouteGuideStub asyncStub; | |
private Random random = new Random(); | |
private TestHelper testHelper; | |
/** Construct client for accessing RouteGuide server using the existing channel. */ | |
public RouteGuideClient() { | |
blockingStub = RouteGuideGrpc.newBlockingStub(); | |
asyncStub = RouteGuideGrpc.newStub(); | |
} | |
/** | |
* Blocking unary call example. Calls getFeature and prints the response. | |
*/ | |
public void getFeature(int lat, int lon) { | |
info("*** GetFeature: lat={0} lon={1}", lat, lon); | |
Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build(); | |
Feature feature; | |
try { | |
feature = blockingStub.getFeature(request); | |
if (testHelper != null) { | |
testHelper.onMessage(feature); | |
} | |
} catch (StatusRuntimeException e) { | |
warning("RPC failed: {0}", e.getStatus()); | |
if (testHelper != null) { | |
testHelper.onRpcError(e); | |
} | |
return; | |
} | |
if (RouteGuideUtil.exists(feature)) { | |
info("Found feature called \"{0}\" at {1}, {2}", | |
feature.getName(), | |
RouteGuideUtil.getLatitude(feature.getLocation()), | |
RouteGuideUtil.getLongitude(feature.getLocation())); | |
} else { | |
info("Found no feature at {0}, {1}", | |
RouteGuideUtil.getLatitude(feature.getLocation()), | |
RouteGuideUtil.getLongitude(feature.getLocation())); | |
} | |
} | |
/** | |
* Blocking server-streaming example. Calls listFeatures with a rectangle of interest. Prints each | |
* response feature as it arrives. | |
*/ | |
public void listFeatures(int lowLat, int lowLon, int hiLat, int hiLon) { | |
info("*** ListFeatures: lowLat={0} lowLon={1} hiLat={2} hiLon={3}", lowLat, lowLon, hiLat, | |
hiLon); | |
Rectangle request = | |
Rectangle.newBuilder() | |
.setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build()) | |
.setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build(); | |
Iterator<Feature> features; | |
try { | |
features = blockingStub.listFeatures(request); | |
for (int i = 1; features.hasNext(); i++) { | |
Feature feature = features.next(); | |
info("Result #" + i + ": {0}", feature); | |
if (testHelper != null) { | |
testHelper.onMessage(feature); | |
} | |
} | |
} catch (StatusRuntimeException e) { | |
warning("RPC failed: {0}", e.getStatus()); | |
if (testHelper != null) { | |
testHelper.onRpcError(e); | |
} | |
} | |
} | |
/** | |
* Async client-streaming example. Sends {@code numPoints} randomly chosen points from {@code | |
* features} with a variable delay in between. Prints the statistics when they are sent from the | |
* server. | |
*/ | |
public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException { | |
info("*** RecordRoute"); | |
final CountDownLatch finishLatch = new CountDownLatch(1); | |
StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() { | |
@Override | |
public void onNext(RouteSummary summary) { | |
info("Finished trip with {0} points. Passed {1} features. " | |
+ "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(), | |
summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime()); | |
if (testHelper != null) { | |
testHelper.onMessage(summary); | |
} | |
} | |
@Override | |
public void onError(Throwable t) { | |
warning("RecordRoute Failed: {0}", Status.fromThrowable(t)); | |
if (testHelper != null) { | |
testHelper.onRpcError(t); | |
} | |
finishLatch.countDown(); | |
} | |
@Override | |
public void onCompleted() { | |
info("Finished RecordRoute"); | |
finishLatch.countDown(); | |
} | |
}; | |
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver); | |
try { | |
// Send numPoints points randomly selected from the features list. | |
for (int i = 0; i < numPoints; ++i) { | |
int index = random.nextInt(features.size()); | |
Point point = features.get(index).getLocation(); | |
info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point), | |
RouteGuideUtil.getLongitude(point)); | |
requestObserver.onNext(point); | |
// Sleep for a bit before sending the next one. | |
Thread.sleep(random.nextInt(1000) + 500); | |
if (finishLatch.getCount() == 0) { | |
// RPC completed or errored before we finished sending. | |
// Sending further requests won't error, but they will just be thrown away. | |
return; | |
} | |
} | |
} catch (RuntimeException e) { | |
// Cancel RPC | |
requestObserver.onError(e); | |
throw e; | |
} | |
// Mark the end of requests | |
requestObserver.onCompleted(); | |
// Receiving happens asynchronously | |
if (!finishLatch.await(1, TimeUnit.MINUTES)) { | |
warning("recordRoute can not finish within 1 minutes"); | |
} | |
} | |
/** | |
* Bi-directional example, which can only be asynchronous. Send some chat messages, and print any | |
* chat messages that are sent from the server. | |
*/ | |
public CountDownLatch routeChat() { | |
info("*** RouteChat"); | |
final CountDownLatch finishLatch = new CountDownLatch(1); | |
StreamObserver<RouteNote> requestObserver = | |
asyncStub.routeChat(new StreamObserver<RouteNote>() { | |
@Override | |
public void onNext(RouteNote note) { | |
info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation() | |
.getLatitude(), note.getLocation().getLongitude()); | |
if (testHelper != null) { | |
testHelper.onMessage(note); | |
} | |
} | |
@Override | |
public void onError(Throwable t) { | |
warning("RouteChat Failed: {0}", Status.fromThrowable(t)); | |
if (testHelper != null) { | |
testHelper.onRpcError(t); | |
} | |
finishLatch.countDown(); | |
} | |
@Override | |
public void onCompleted() { | |
info("Finished RouteChat"); | |
finishLatch.countDown(); | |
} | |
}); | |
try { | |
RouteNote[] requests = | |
{newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000), | |
newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)}; | |
for (RouteNote request : requests) { | |
info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation() | |
.getLatitude(), request.getLocation().getLongitude()); | |
requestObserver.onNext(request); | |
} | |
} catch (RuntimeException e) { | |
// Cancel RPC | |
requestObserver.onError(e); | |
throw e; | |
} | |
// Mark the end of requests | |
requestObserver.onCompleted(); | |
// return the latch while receiving happens asynchronously | |
return finishLatch; | |
} | |
/** Issues several different requests and then exits. */ | |
public static void main(String[] args) throws InterruptedException { | |
List<Feature> features; | |
try { | |
features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile()); | |
} catch (IOException ex) { | |
ex.printStackTrace(); | |
return; | |
} | |
RouteGuideClient client = new RouteGuideClient(); | |
// Looking for a valid feature | |
client.getFeature(409146138, -746188906); | |
// Feature missing. | |
client.getFeature(0, 0); | |
// Looking for features between 40, -75 and 42, -73. | |
client.listFeatures(400000000, -750000000, 420000000, -730000000); | |
// Record a few randomly selected points from the features file. | |
client.recordRoute(features, 10); | |
// Send and receive some notes. | |
CountDownLatch finishLatch = client.routeChat(); | |
if (!finishLatch.await(1, TimeUnit.MINUTES)) { | |
client.warning("routeChat can not finish within 1 minutes"); | |
} | |
} | |
private void info(String msg, Object... params) { | |
logger.log(Level.INFO, msg, params); | |
} | |
private void warning(String msg, Object... params) { | |
logger.log(Level.WARNING, msg, params); | |
} | |
private RouteNote newNote(String message, int lat, int lon) { | |
return RouteNote.newBuilder().setMessage(message) | |
.setLocation(Point.newBuilder().setLatitude(lat).setLongitude(lon).build()).build(); | |
} | |
/** | |
* Only used for unit test, as we do not want to introduce randomness in unit test. | |
*/ | |
@VisibleForTesting | |
void setRandom(Random random) { | |
this.random = random; | |
} | |
/** | |
* Only used for helping unit test. | |
*/ | |
@VisibleForTesting | |
interface TestHelper { | |
/** | |
* Used for verify/inspect message received from server. | |
*/ | |
void onMessage(Message message); | |
/** | |
* Used for verify/inspect error received from server. | |
*/ | |
void onRpcError(Throwable exception); | |
} | |
@VisibleForTesting | |
void setTestHelper(TestHelper testHelper) { | |
this.testHelper = testHelper; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment