Skip to content

Instantly share code, notes, and snippets.

@ankurcha
Last active August 29, 2015 14:07
Show Gist options
  • Select an option

  • Save ankurcha/002fba62fbd87b90e59f to your computer and use it in GitHub Desktop.

Select an option

Save ankurcha/002fba62fbd87b90e59f to your computer and use it in GitHub Desktop.
Account level QoSFilter for concurrent connections
package analytics.api;
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.continuation.ContinuationSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.*;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class QoSFilter implements Filter {
static final Logger logger = LoggerFactory.getLogger(QoSFilter.class);
final static int DEFAULT_WAIT_MS = 50;
final static long DEFAULT_SUSPEND_MS = 1000;
final static int MAX_CONNECTIONS_PER_ACCOUNT = 1;
final static String MAX_WAIT_INIT_PARAM = "waitMs";
final static String SUSPEND_INIT_PARAM = "suspendMs";
final static Pattern reportURLPattern = Pattern.compile("\\/.+\\/.+\\/accounts?\\/(\\d+|all)\\/.*");
private long waitMs;
private long suspendMs;
private ConcurrentLinkedHashMap<String, Semaphore> passes = new ConcurrentLinkedHashMap.Builder<String, Semaphore>()
.maximumWeightedCapacity(2000).build();
public static String extractAccount(ServletRequest request) {
try {
if (request instanceof HttpServletRequest) {
HttpServletRequest req = (HttpServletRequest) request;
String path = req.getPathInfo();
Matcher m = reportURLPattern.matcher(path);
if (!m.matches()) {
// url pattern did not match
return null;
} else {
return m.group(1);
}
}
} catch (Exception e) {
// ignore
logger.debug("error while extracting authtoken", e);
}
return null;
}
public void init(FilterConfig filterConfig) {
waitMs = DEFAULT_WAIT_MS;
if (filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM) != null) {
waitMs = Integer.parseInt(filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM));
}
suspendMs = DEFAULT_SUSPEND_MS;
if (filterConfig.getInitParameter(SUSPEND_INIT_PARAM) != null) {
suspendMs = Integer.parseInt(filterConfig.getInitParameter(SUSPEND_INIT_PARAM));
}
}
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
try {
String account = extractAccount(request);
int max = MAX_CONNECTIONS_PER_ACCOUNT;
if (account == null || max <= 0) {
chain.doFilter(request, response);
return;
}
Semaphore pass = passes.putIfAbsent(account, new Semaphore(max, true));
if (pass == null) {
pass = passes.get(account);
}
if (pass.tryAcquire(waitMs, TimeUnit.MILLISECONDS)) {
try {
chain.doFilter(request, response);
} finally {
pass.release();
}
} else {
Continuation continuation = ContinuationSupport.getContinuation(request);
logger.warn("Exceeded the max requests limit. Suspending " + continuation);
continuation.setTimeout(suspendMs);
continuation.suspend();
}
} catch (InterruptedException e) {
((HttpServletResponse) response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
}
}
@Override
public void destroy() {
}
}
<filter>
<filter-name>QoSFilter</filter-name>
<filter-class>analytics.api.QoSFilter</filter-class>
<init-param>
<param-name>waitMs</param-name>
<param-value>50</param-value>
</init-param>
<init-param>
<param-name>suspendMs</param-name>
<param-value>1000</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>QoSFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment