Created
August 21, 2020 14:33
-
-
Save nsivabalan/870d0748d8cb1dc8be8e4f9475f01505 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Adaptor for Grpc which does all the translation from generic uber types to grpc entities and | |
* places the call to a grpc ManagedChannel. This class manages multiple channels and routes new | |
* calls based on host fetched from UberInternalCallOptions. | |
*/ | |
public class UbergRPCAdaptor implements UberChannel { | |
private static final Logger LOGGER = Logger.getLogger(UbergRPCAdaptor.class.getName()); | |
// static variable singleInstance of type Singleton | |
@Nullable private static UbergRPCAdaptor singleInstance; | |
private final Map<HostPortPair, ManagedChannel> channelMap = new HashMap<>(); | |
private HostPortPair curHostPortPair; | |
private UbergRPCAdaptor(HostPortPair hostPortPair) { | |
createAndAddChannel(hostPortPair); | |
} | |
@VisibleForTesting | |
public UbergRPCAdaptor(ManagedChannel channel) { | |
curHostPortPair = | |
new HostPortPair( | |
UberInternalCallOptions.DEFAULT_USER_DEFINED_HOST, | |
UberInternalCallOptions.DEFAULT_USER_DEFINED_PORT); | |
channelMap.put(curHostPortPair, channel); | |
} | |
private void createAndAddChannel(HostPortPair hostPortPair) { | |
ManagedChannel channel = | |
ManagedChannelBuilder.forAddress(hostPortPair.getHost(), hostPortPair.getPort()) | |
.usePlaintext() | |
.build(); | |
channelMap.put(hostPortPair, channel); | |
curHostPortPair = hostPortPair; | |
/* | |
Yet to integrate ssl socket factory and hostname verifier. Will do it in a different patch. | |
ManagedChannel channel = OkHttpChannelBuilder.forAddress(host, port) | |
.sslSocketFactory(factory) | |
.hostnameVerifier(hostnameVerifier) | |
.build();*/ | |
} | |
/** | |
* Instantiates {@link UbergRPCAdaptor}. | |
* | |
* @param hostPortPair initial hostport pair to initialize {@link UbergRPCAdaptor} with. | |
* @return the singleton instance of {@link UbergRPCAdaptor}. | |
*/ | |
public static UbergRPCAdaptor getInstance(HostPortPair hostPortPair) { | |
if (singleInstance == null) { | |
singleInstance = new UbergRPCAdaptor(hostPortPair); | |
} | |
return singleInstance; | |
} | |
@Override | |
public synchronized <RequestT, ResponseT, T> UberClientCall<RequestT, ResponseT> newCall( | |
MethodInfo<T> methodInfo, UberInternalCallOptions uberInternalCallOptions) { | |
HostPortPair newHostPortPair = | |
new HostPortPair(uberInternalCallOptions.getHost(), uberInternalCallOptions.getPort()); | |
if (!channelMap.containsKey(newHostPortPair) | |
|| channelMap.get(newHostPortPair).isTerminated()) { | |
createAndAddChannel(newHostPortPair); | |
} | |
ManagedChannel channel = channelMap.get(newHostPortPair); | |
if (channel != null) { | |
curHostPortPair = newHostPortPair; | |
return new UberClientCall<>( | |
channel.newCall( | |
(MethodDescriptor<RequestT, ResponseT>) methodInfo.getTransportMethodInfo(), | |
uberInternalCallOptions.getCallOptions())); | |
} else { | |
throw new IllegalStateException( | |
"Channel should have been created for new host " + newHostPortPair); | |
} | |
} | |
@Override | |
public String authority() { | |
ManagedChannel channel = channelMap.get(curHostPortPair); | |
if (channel != null) { | |
return channel.authority(); | |
} else { | |
throw new IllegalStateException("No channel found for current host and port"); | |
} | |
} | |
@Override | |
@Nullable | |
public ManagedChannel shutdown() { | |
ManagedChannel channel = channelMap.get(curHostPortPair); | |
if (channel != null) { | |
return channel.shutdown(); | |
} else { | |
throw new IllegalStateException("No channel found for current host and port"); | |
} | |
} | |
@Override | |
public boolean isShutdown() { | |
ManagedChannel channel = channelMap.get(curHostPortPair); | |
if (channel != null) { | |
return channel.isShutdown(); | |
} else { | |
throw new IllegalStateException("No channel found for current host and port"); | |
} | |
} | |
@Override | |
public boolean isTerminated() { | |
ManagedChannel channel = channelMap.get(curHostPortPair); | |
if (channel != null) { | |
return channel.isTerminated(); | |
} else { | |
throw new IllegalStateException("No channel found for current host and port"); | |
} | |
} | |
@Override | |
@Nullable | |
public ManagedChannel shutdownNow() { | |
ManagedChannel channel = channelMap.get(curHostPortPair); | |
if (channel != null) { | |
return channel.shutdownNow(); | |
} else { | |
throw new IllegalStateException("No channel found for current host and port"); | |
} | |
} | |
@Override | |
public boolean awaitTermination(long timeout, TimeUnit unit) { | |
ManagedChannel channel = channelMap.get(curHostPortPair); | |
if (channel != null) { | |
return channel.isTerminated(); | |
} else { | |
throw new IllegalStateException("No channel found for current host and port"); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment