Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Last active July 16, 2020 16:26
Show Gist options
  • Save nsivabalan/a60789c97e7d67bb743a64ea3396a163 to your computer and use it in GitHub Desktop.
Save nsivabalan/a60789c97e7d67bb743a64ea3396a163 to your computer and use it in GitHub Desktop.
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
public abstract class SimpleClientInterceptor<ReqT, RespT> extends InterceptorChannel {
public SimpleClientInterceptor(ManagedChannel managedChannel,
InterceptorChannel next) {
super(managedChannel, next);
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel channel) {
return new ClientCallHandler<>(
next.newCall(method, callOptions), this, next, method, callOptions);
}
/**
* Invoked with outgoing request headers.
* @param requestHeaders request headers that are sent as part of the request.
*/
protected abstract void onRequestHeaders(Metadata requestHeaders);
/**
* Invoked when a message is sent out from client to server.
* @param message message being sent out.
*/
protected abstract void onRequestMessage(ReqT message);
/**
* Invoked with response headers.
* @param responseHeaders response headers that are received from the server.
*/
protected abstract void onResponseHeaders(Metadata responseHeaders);
/**
* Invoked with message received from the server.
* @param msg message being received.
*/
protected abstract void onResponseMessage(RespT msg);
/**
* Invoked when the call is closed.
* @param status
* @param trailers
*/
protected void onClose(Status status, Metadata trailers) {}
}
import com.google.common.base.MoreObjects;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import javax.annotation.Nullable;
public class ClientCallHandler <ReqT, RespT> extends ClientCall<ReqT, RespT> {
private volatile ClientCall<ReqT, RespT> clientCall;
private SimpleClientInterceptor parentInterceptor;
@Nullable
private volatile InternalListener internalListener;
private MethodDescriptor<ReqT, RespT> method;
private CallOptions callOptions;
private InterceptorChannel next;
@Nullable
private Metadata headers;
public ClientCallHandler(
ClientCall<ReqT, RespT> clientCall,
SimpleClientInterceptor parentInterceptor,
InterceptorChannel next,
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions) {
this.clientCall = clientCall;
this.parentInterceptor = parentInterceptor;
this.callOptions = callOptions;
this.method = method;
this.next = next;
}
@Override
public void start(ClientCall.Listener<RespT> responseListener, Metadata headers) {
this.headers = headers;
parentInterceptor.onRequestHeaders(headers);
internalListener = new InternalListener(parentInterceptor, responseListener, this);
clientCall.start((ClientCall.Listener<RespT>) internalListener, headers);
}
@Override
public void request(int numMessages) {
clientCall.request(numMessages);
}
@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
clientCall.cancel(message, cause);
}
@Override
public void halfClose() {
clientCall.halfClose();
}
@Override
public void sendMessage(ReqT message) {
parentInterceptor.onRequestMessage(message);
clientCall.sendMessage(message);
}
@Override
public boolean isReady() {
return clientCall.isReady();
}
@Override
public Attributes getAttributes() {
return clientCall.getAttributes();
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("delegate", clientCall).toString();
}
}
import io.grpc.ClientCall;
import io.grpc.Metadata;
import io.grpc.Status;
public class InternalListener<T> extends ClientCall.Listener<T> {
SimpleClientInterceptor parentInterceptor;
ClientCall.Listener<T> prevListener;
ClientCallHandler clientCallHandler;
public InternalListener(
SimpleClientInterceptor parentInterceptor,
ClientCall.Listener<T> prevListener,
ClientCallHandler clientCallHandler) {
this.parentInterceptor = parentInterceptor;
this.prevListener = prevListener;
this.clientCallHandler = clientCallHandler;
}
@Override
public void onHeaders(Metadata headers) {
parentInterceptor.onResponseHeaders(headers);
prevListener.onHeaders(headers);
}
@Override
public void onMessage(T message) {
parentInterceptor.onResponseMessage(message);
prevListener.onMessage(message);
}
@Override
public void onClose(Status status, Metadata trailers) {
parentInterceptor.onClose(status, trailers);
prevListener.onClose(status, trailers);
}
}
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
import java.util.concurrent.TimeUnit;
public abstract class InterceptorChannel extends ManagedChannel implements ClientInterceptor {
private final ManagedChannel managedChannel;
protected final InterceptorChannel next;
public InterceptorChannel(ManagedChannel managedChannel, InterceptorChannel next) {
this.managedChannel = managedChannel;
this.next = next;
}
public abstract <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel channel);
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor,
CallOptions callOptions) {
if (next == null) {
return managedChannel.newCall(methodDescriptor, callOptions);
} else {
return this.interceptCall(methodDescriptor, callOptions, managedChannel);
}
}
@Override
public String authority() {
if (next != null) {
return next.authority();
} else {
return managedChannel.authority();
}
}
@Override
public ManagedChannel shutdown() {
if (next != null) {
return next.shutdown();
} else {
return managedChannel.shutdown();
}
}
@Override
public boolean isShutdown() {
if (next != null) {
return next.isShutdown();
} else {
return managedChannel.isShutdown();
}
}
@Override
public boolean isTerminated() {
if (next != null) {
return next.isTerminated();
} else {
return managedChannel.isTerminated();
}
}
@Override
public ManagedChannel shutdownNow() {
if (next != null) {
return next.shutdownNow();
} else {
return managedChannel.shutdownNow();
}
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
if (next != null) {
return next.awaitTermination(timeout, unit);
} else {
return managedChannel.awaitTermination(timeout, unit);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment