Created
June 28, 2014 00:03
-
-
Save NiteshKant/5280bf5149e891f688ba to your computer and use it in GitHub Desktop.
async filter chain
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.netflix.karyon.transport.interceptor; | |
import com.netflix.karyon.transport.RequestRouter; | |
import rx.Observable; | |
import rx.Subscriber; | |
import rx.subscriptions.SerialSubscription; | |
import java.util.ArrayList; | |
import java.util.Collections; | |
import java.util.List; | |
/** | |
* A utility class to execute a chain of interceptors defined by {@link InterceptorSupport} | |
* | |
* @author Nitesh Kant | |
*/ | |
public class InterceptorExecutor<I, O, C extends KeyEvaluationContext> { | |
private final List<InterceptorSupport<I, O, C>.InterceptorHolder<InboundInterceptor<I, O>>> allIn; | |
private final List<InterceptorSupport<I, O, C>.InterceptorHolder<OutboundInterceptor<O>>> allOut; | |
private final RequestRouter<I, O> router; | |
public InterceptorExecutor(InterceptorSupport<I, O, C> support, RequestRouter<I, O> router) { | |
this.router = router; | |
support.finish(); | |
allIn = support.getInboundInterceptors(); | |
allOut = support.getOutboundInterceptors(); | |
} | |
/** | |
* Executes the interceptor chain for the passed request and response. | |
* | |
* @param request Request to be executed. | |
* @param response Response to be populated. | |
* @param keyEvaluationContext The context for {@link InterceptorKey} evaluation. | |
* | |
* @return The final result of execution after executing all the inbound and outbound interceptors and the router. | |
*/ | |
public Observable<Void> execute(final I request, final O response, C keyEvaluationContext) { | |
final ExecutionContext context = new ExecutionContext(request, keyEvaluationContext); | |
InboundInterceptor<I, O> nextIn = context.nextIn(request); | |
Observable<Void> startingPoint; | |
if (null != nextIn) { | |
startingPoint = nextIn.in(request, response); | |
} else if (context.invokeRouter()){ | |
startingPoint = router.route(request, response); | |
} else { | |
return Observable.error(new IllegalStateException("No router defined.")); // No router defined. | |
} | |
return startingPoint.lift(new Observable.Operator<Void, Void>() { | |
@Override | |
public Subscriber<? super Void> call(Subscriber<? super Void> child) { | |
SerialSubscription subscription = new SerialSubscription(); | |
return new ChainSubscriber(subscription, context, request, response, child); | |
} | |
}); | |
} | |
private enum NextExecutionState { NotStarted, NextInHolder, NextInInterceptor, Router, NextOutInterceptor, End} | |
private class ExecutionContext { | |
private final C keyEvaluationContext; | |
private NextExecutionState nextExecutionState = NextExecutionState.NextInHolder; | |
/** | |
* This list is eagerly created by evaluating keys of all out interceptors as we do not want to hold the | |
* request (for key evaluation) till the outbound interceptor execution starts. | |
*/ | |
private List<OutboundInterceptor<O>> applicableOutInterceptors; | |
private int currentHolderIndex; | |
private int currentInterceptorIndex; | |
public ExecutionContext(I request, C keyEvaluationContext) { | |
this.keyEvaluationContext = keyEvaluationContext; | |
applicableOutInterceptors = new ArrayList<OutboundInterceptor<O>>(); // Execution is not multi-threaded. | |
for (InterceptorSupport<I, O, C>.InterceptorHolder<OutboundInterceptor<O>> holder : allOut) { | |
switch (keyEvaluationContext.getEvaluationResult(holder.getKey())) { // Result is cached. | |
case Apply: | |
Collections.addAll(applicableOutInterceptors, holder.getInterceptors()); | |
break; | |
case Skip: | |
break; | |
case NotExecuted: | |
boolean apply = holder.getKey().apply(request, keyEvaluationContext); | |
keyEvaluationContext.updateKeyEvaluationResult(holder.getKey(), apply); | |
if (apply) { | |
Collections.addAll(applicableOutInterceptors, holder.getInterceptors()); | |
} | |
break; | |
} | |
} | |
} | |
public InboundInterceptor<I, O> nextIn(I request) { | |
switch (nextExecutionState) { | |
case NotStarted: | |
nextExecutionState = NextExecutionState.NextInInterceptor; // Index is 0, so we can skip the NextInHolder state. | |
return nextIn(request); | |
case NextInHolder: | |
++currentHolderIndex; | |
currentInterceptorIndex = 0; | |
return nextIn(request); | |
case NextInInterceptor: | |
if (currentHolderIndex >= allIn.size()) { | |
nextExecutionState = NextExecutionState.Router; | |
return null; | |
} else { | |
InterceptorSupport<I, O, C>.InterceptorHolder<InboundInterceptor<I, O>> holder = | |
allIn.get( currentHolderIndex); | |
switch (keyEvaluationContext.getEvaluationResult(holder.getKey())) { // Result is cached. | |
case Apply: | |
return returnNextInterceptor(request, holder); | |
case Skip: | |
nextExecutionState = NextExecutionState.NextInHolder; | |
return nextIn(request); | |
case NotExecuted: | |
boolean apply = holder.getKey().apply(request, keyEvaluationContext); | |
keyEvaluationContext.updateKeyEvaluationResult(holder.getKey(), apply); | |
return returnNextInterceptor(request, holder); | |
} | |
} | |
} | |
return null; | |
} | |
public OutboundInterceptor<O> nextOut() { | |
switch (nextExecutionState) { | |
case NextOutInterceptor: | |
if (currentInterceptorIndex >= applicableOutInterceptors.size()) { | |
nextExecutionState = NextExecutionState.End; | |
return null; | |
} else { | |
return applicableOutInterceptors.get(currentInterceptorIndex); | |
} | |
} | |
return null; | |
} | |
private InboundInterceptor<I, O> returnNextInterceptor(I request, | |
InterceptorSupport<I, O, C>.InterceptorHolder<InboundInterceptor<I, O>> holder) { | |
InboundInterceptor<I, O>[] interceptors = holder.getInterceptors(); | |
if (currentInterceptorIndex >= interceptors.length) { | |
nextExecutionState = NextExecutionState.NextInHolder; | |
return nextIn(request); | |
} | |
return interceptors[currentInterceptorIndex++]; | |
} | |
public boolean invokeRouter() { | |
if (NextExecutionState.Router == nextExecutionState) { | |
currentHolderIndex = 0; | |
nextExecutionState = NextExecutionState.NextOutInterceptor; | |
return true; | |
} else { | |
return false; | |
} | |
} | |
} | |
private class ChainSubscriber extends Subscriber<Void> { | |
private final SerialSubscription subscription; | |
private final ExecutionContext context; | |
private final I request; | |
private final O response; | |
private final Subscriber<? super Void> child; | |
public ChainSubscriber(SerialSubscription subscription, ExecutionContext context, I request, O response, | |
Subscriber<? super Void> child) { | |
this.subscription = subscription; | |
this.context = context; | |
this.request = request; | |
this.response = response; | |
this.child = child; | |
} | |
@Override | |
public void onCompleted() { | |
InboundInterceptor<I, O> nextIn = context.nextIn(request); | |
OutboundInterceptor<O> nextOut; | |
if (null != nextIn) { | |
Observable<Void> interceptorResult = nextIn.in(request, response); | |
handleResult(interceptorResult); | |
} else if (context.invokeRouter()) { | |
handleResult(router.route(request, response)); | |
} else if (null != (nextOut = context.nextOut())) { | |
handleResult(nextOut.out(response)); | |
} else { | |
child.onCompleted(); | |
} | |
} | |
private void handleResult(Observable<Void> aResult) { | |
ChainSubscriber nextSubscriber = new ChainSubscriber(subscription, context, request, response, | |
child); | |
subscription.set(nextSubscriber); | |
aResult.unsafeSubscribe(nextSubscriber); | |
} | |
@Override | |
public void onError(Throwable e) { | |
child.onError(e); | |
} | |
@Override | |
public void onNext(Void aVoid) { | |
child.onNext(aVoid); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment