Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Created August 21, 2020 14:33
Show Gist options
  • Save nsivabalan/870d0748d8cb1dc8be8e4f9475f01505 to your computer and use it in GitHub Desktop.
Save nsivabalan/870d0748d8cb1dc8be8e4f9475f01505 to your computer and use it in GitHub Desktop.
/**
* Adaptor for Grpc which does all the translation from generic uber types to grpc entities and
* places the call to a grpc ManagedChannel. This class manages multiple channels and routes new
* calls based on host fetched from UberInternalCallOptions.
*/
public class UbergRPCAdaptor implements UberChannel {
private static final Logger LOGGER = Logger.getLogger(UbergRPCAdaptor.class.getName());
// static variable singleInstance of type Singleton
@Nullable private static UbergRPCAdaptor singleInstance;
private final Map<HostPortPair, ManagedChannel> channelMap = new HashMap<>();
private HostPortPair curHostPortPair;
private UbergRPCAdaptor(HostPortPair hostPortPair) {
createAndAddChannel(hostPortPair);
}
@VisibleForTesting
public UbergRPCAdaptor(ManagedChannel channel) {
curHostPortPair =
new HostPortPair(
UberInternalCallOptions.DEFAULT_USER_DEFINED_HOST,
UberInternalCallOptions.DEFAULT_USER_DEFINED_PORT);
channelMap.put(curHostPortPair, channel);
}
private void createAndAddChannel(HostPortPair hostPortPair) {
ManagedChannel channel =
ManagedChannelBuilder.forAddress(hostPortPair.getHost(), hostPortPair.getPort())
.usePlaintext()
.build();
channelMap.put(hostPortPair, channel);
curHostPortPair = hostPortPair;
/*
Yet to integrate ssl socket factory and hostname verifier. Will do it in a different patch.
ManagedChannel channel = OkHttpChannelBuilder.forAddress(host, port)
.sslSocketFactory(factory)
.hostnameVerifier(hostnameVerifier)
.build();*/
}
/**
* Instantiates {@link UbergRPCAdaptor}.
*
* @param hostPortPair initial hostport pair to initialize {@link UbergRPCAdaptor} with.
* @return the singleton instance of {@link UbergRPCAdaptor}.
*/
public static UbergRPCAdaptor getInstance(HostPortPair hostPortPair) {
if (singleInstance == null) {
singleInstance = new UbergRPCAdaptor(hostPortPair);
}
return singleInstance;
}
@Override
public synchronized <RequestT, ResponseT, T> UberClientCall<RequestT, ResponseT> newCall(
MethodInfo<T> methodInfo, UberInternalCallOptions uberInternalCallOptions) {
HostPortPair newHostPortPair =
new HostPortPair(uberInternalCallOptions.getHost(), uberInternalCallOptions.getPort());
if (!channelMap.containsKey(newHostPortPair)
|| channelMap.get(newHostPortPair).isTerminated()) {
createAndAddChannel(newHostPortPair);
}
ManagedChannel channel = channelMap.get(newHostPortPair);
if (channel != null) {
curHostPortPair = newHostPortPair;
return new UberClientCall<>(
channel.newCall(
(MethodDescriptor<RequestT, ResponseT>) methodInfo.getTransportMethodInfo(),
uberInternalCallOptions.getCallOptions()));
} else {
throw new IllegalStateException(
"Channel should have been created for new host " + newHostPortPair);
}
}
@Override
public String authority() {
ManagedChannel channel = channelMap.get(curHostPortPair);
if (channel != null) {
return channel.authority();
} else {
throw new IllegalStateException("No channel found for current host and port");
}
}
@Override
@Nullable
public ManagedChannel shutdown() {
ManagedChannel channel = channelMap.get(curHostPortPair);
if (channel != null) {
return channel.shutdown();
} else {
throw new IllegalStateException("No channel found for current host and port");
}
}
@Override
public boolean isShutdown() {
ManagedChannel channel = channelMap.get(curHostPortPair);
if (channel != null) {
return channel.isShutdown();
} else {
throw new IllegalStateException("No channel found for current host and port");
}
}
@Override
public boolean isTerminated() {
ManagedChannel channel = channelMap.get(curHostPortPair);
if (channel != null) {
return channel.isTerminated();
} else {
throw new IllegalStateException("No channel found for current host and port");
}
}
@Override
@Nullable
public ManagedChannel shutdownNow() {
ManagedChannel channel = channelMap.get(curHostPortPair);
if (channel != null) {
return channel.shutdownNow();
} else {
throw new IllegalStateException("No channel found for current host and port");
}
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) {
ManagedChannel channel = channelMap.get(curHostPortPair);
if (channel != null) {
return channel.isTerminated();
} else {
throw new IllegalStateException("No channel found for current host and port");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment