Created
June 23, 2016 00:49
-
-
Save terabyte/fdbb6469703da51cf605daf650cefb72 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.cloudera; | |
import com.google.common.collect.ImmutableList; | |
import com.google.common.collect.ImmutableSet; | |
import com.google.common.collect.Sets; | |
import misc1.commons.ExceptionUtils; | |
import misc1.commons.options.OptionsFragment; | |
import misc1.commons.options.OptionsLibrary; | |
import misc1.commons.options.OptionsResults; | |
import org.apache.http.Header; | |
import org.apache.http.client.methods.CloseableHttpResponse; | |
import org.apache.http.client.methods.HttpGet; | |
import org.apache.http.client.methods.HttpHead; | |
import org.apache.http.client.protocol.HttpClientContext; | |
import org.apache.http.impl.client.CloseableHttpClient; | |
import org.apache.http.impl.client.HttpClients; | |
import org.apache.http.impl.client.RedirectLocations; | |
import org.apache.http.protocol.HttpContext; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import qbt.HelpTier; | |
import qbt.QbtCommand; | |
import qbt.QbtCommandName; | |
import qbt.QbtCommandOptions; | |
import java.io.BufferedReader; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.InputStreamReader; | |
import java.net.URI; | |
import java.util.LinkedList; | |
import java.util.ListIterator; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.TimeUnit; | |
import java.util.regex.Matcher; | |
import java.util.regex.Pattern; | |
public class VerifyUrls extends QbtCommand<VerifyUrls.Options> { | |
private static final Logger LOGGER = LoggerFactory.getLogger(VerifyUrls.class); | |
private static final String ETAG_HEADER_NAME = "ETag"; | |
private static final Pattern PARENT_DIR_REGEX = Pattern.compile(".*<a.*[Hh][Rr][Ee][Ff]=\"[^\"]+\".*>Parent Directory</a>.*"); | |
private static final Pattern FILENAME_REGEX = Pattern.compile(".*<a.*[Hh][Rr][Ee][Ff]=\"([^\"]+)\".*>\\1</a>.*"); | |
private static final String MISSING_ETAG_STR = "MISSING ETAG: "; | |
private static final String MISMATCH_ETAG_STR = "DIFFERENT CONTENT: "; | |
private static final String SRC_ONLY_STR = "SOURCE ONLY: "; | |
private static final String DEST_ONLY_STR = "DESTINATION ONLY: "; | |
private static final String DEFAULT_TIMEOUT = "86400"; // 1 day | |
private static final Integer EXECUTOR_SERVICE_TIMEOUT_MILLIS = 1000; | |
@QbtCommandName("verifyUrls") | |
public static interface Options extends QbtCommandOptions { | |
public static final OptionsLibrary<Options> o = OptionsLibrary.of(); | |
//public static final OptionsFragment<Options, Integer> timeout = o.oneArg("timeout", "t").transform(o.singleton(DEFAULT_TIMEOUT)).transform(o.parseInt()).helpDesc("Timeout before harun time in seconds before giving up (Default " + DEFAULT_TIMEOUT + ")"); | |
public static final OptionsFragment<Options, Integer> parallelism = o.oneArg("parallelism", "j").transform(o.singleton(null)).transform(o.parseInt()).helpDesc("Parallelize up to this width"); | |
public static final OptionsFragment<Options, Boolean> infiniteParallelism = o.zeroArg("infinite-parallelism", "J").transform(o.flag()).helpDesc("Parallelize as widely as possible"); | |
public static final OptionsFragment<Options, URI> sourceUrl = o.oneArg("src").transform(o.singleton()).transform((String h, String s) -> { return URI.create(s); }).helpDesc("Source URL to compare"); | |
public static final OptionsFragment<Options, URI> destUrl = o.oneArg("dest").transform(o.singleton()).transform((String h, String s) -> { return URI.create(s); }).helpDesc("Destination URL to compare"); | |
} | |
@Override | |
public Class<Options> getOptionsClass() { | |
return Options.class; | |
} | |
@Override | |
public HelpTier getHelpTier() { | |
return HelpTier.COMMON; | |
} | |
@Override | |
public String getDescription() { | |
return "Compare two URLs contents"; | |
} | |
@Override | |
public int run(OptionsResults<? extends Options> options) throws IOException { | |
ExecutorService es = configureExecutorService(options); | |
URI src = options.get(Options.sourceUrl); | |
URI dest = options.get(Options.destUrl); | |
LOGGER.debug("Verifying URL " + src + " content matches URL " + dest); | |
LinkedList<String> differences = new LinkedList<>(); | |
LinkedList<Future<?>> futures = new LinkedList<>(); | |
try (CloseableHttpClient hc = HttpClients.createDefault()) { | |
Future<?> f = es.submit(() -> processPath(src, dest, differences, futures, es, hc, getLastPathComponent(src))); | |
synchronized (futures) { | |
futures.add(f); | |
futures.notifyAll(); | |
} | |
try { | |
synchronized (futures) { | |
while(true) { | |
boolean stillRunning = false; | |
ListIterator<Future<?>> listIterator = futures.listIterator(); | |
LOGGER.debug("futures size: " + futures.size()); | |
while(listIterator.hasNext()) { | |
if (!listIterator.next().isDone()) { | |
stillRunning = true; | |
continue; | |
} | |
// job is done, don't care anymore | |
listIterator.remove(); | |
} | |
LOGGER.debug("Still Running? " + stillRunning); | |
if (!stillRunning) { | |
break; | |
} | |
futures.wait(EXECUTOR_SERVICE_TIMEOUT_MILLIS); | |
} | |
} | |
es.shutdown(); | |
es.awaitTermination(EXECUTOR_SERVICE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); | |
} catch (InterruptedException e) { | |
ExceptionUtils.commute(e); | |
} | |
} | |
if(!differences.isEmpty()) { | |
differences.forEach(LOGGER::error); | |
return differences.size(); | |
} | |
return 0; | |
} | |
private static String getLastPathComponent(URI path) { | |
String pathStr = path.getPath(); | |
if (pathStr.isEmpty()) { | |
return ""; | |
} | |
return ImmutableList.copyOf(pathStr.split("/")).reverse().get(0); | |
} | |
private static ImmutableSet<String> getPathsFromDirectoryListing(String path, InputStream input) { | |
LOGGER.debug("Getting directory listing for path " + path); | |
BufferedReader br = new BufferedReader(new InputStreamReader(input)); | |
ImmutableSet.Builder<String> b = ImmutableSet.builder(); | |
String line; | |
try { | |
// Ok, I know this is MEGA stupid. It's gonna be fine for now, but what we should really do here is put in | |
// an HTML parser. | |
boolean dirListingStarted = false; | |
while((line = br.readLine()) != null) { | |
if (!dirListingStarted) { | |
// looking for start of listing | |
if (PARENT_DIR_REGEX.matcher(line).matches()) { | |
dirListingStarted = true; | |
} | |
continue; | |
} | |
// listing has started | |
Matcher m = FILENAME_REGEX.matcher(line); | |
if (m.find()) { | |
String newPath = path + "/" + m.group(1); | |
if (newPath.endsWith("/")) { | |
// we detect directories by noticing that "http://example.com/foo" gets redirected to | |
// "http://example.com/foo/", so we need to remove all trailing '/'. | |
newPath = newPath.substring(0, newPath.length() - 1); | |
} | |
LOGGER.trace("Adding path " + newPath); | |
b.add(newPath); | |
continue; | |
} | |
// after end of listing, we'll just have a bunch of lines not match. | |
// The main danger with this approach is if the "footer" contains a URL that happens to match: | |
// <a href="$X">$X</a> for some value of $X. This definitely doesn't happen for the servers I've looked | |
// at so far. | |
} | |
} catch (IOException e) { | |
LOGGER.error("Error while reading stream", e); | |
} | |
return b.build(); | |
} | |
private void processPath(URI src, URI dest, LinkedList<String> differences, LinkedList<Future<?>> futures, ExecutorService es, CloseableHttpClient hc, String path) { | |
LOGGER.debug("Processing path " + src.resolve(path)); | |
HttpHead srcHead = new HttpHead(src.resolve(path)); | |
HttpHead destHead = new HttpHead(dest.resolve(path)); | |
HttpContext srcContext = HttpClientContext.create(); | |
HttpContext destContext = HttpClientContext.create(); | |
try ( | |
CloseableHttpResponse srcRes = hc.execute(srcHead, srcContext); | |
CloseableHttpResponse destRes = hc.execute(destHead, destContext); | |
) { | |
RedirectLocations rl = (RedirectLocations) srcContext.getAttribute(HttpClientContext.REDIRECT_LOCATIONS); | |
if (rl != null) { | |
if (!rl.getAll().isEmpty()) { | |
LOGGER.trace("Redirect path => " + rl.getAll().get(0).getPath()); | |
if (rl.getAll().get(0).getPath().endsWith("/")) { | |
// if there was a redirect and the last redirect URL ends in '/' assume it is a directory | |
LOGGER.debug("Found directory for path " + path); | |
// instead of HEAD, we need to do a full get to get the directory contents | |
HttpGet srcGet = new HttpGet(src.resolve(path)); | |
HttpGet destGet = new HttpGet(dest.resolve(path)); | |
try ( | |
CloseableHttpResponse srcDirRes = hc.execute(srcGet); | |
CloseableHttpResponse destDirRes = hc.execute(destGet); | |
) { | |
ImmutableSet<String> srcPaths = getPathsFromDirectoryListing(path, srcDirRes.getEntity().getContent()); | |
ImmutableSet<String> destPaths = getPathsFromDirectoryListing(path, destDirRes.getEntity().getContent()); | |
for (String newPath : Sets.union(srcPaths, destPaths)) { | |
LOGGER.debug("Adding job for path " + newPath); | |
es.execute(() -> processPath(src, dest, differences, futures, es, hc, newPath)); | |
Future<?> f = es.submit(() -> processPath(src, dest, differences, futures, es, hc, newPath)); | |
synchronized (futures) { | |
futures.add(f); | |
futures.notifyAll(); | |
} | |
} | |
} | |
return; | |
} | |
} | |
} | |
LOGGER.debug("Found file for path " + path); | |
Header srcETag = srcRes.getFirstHeader(ETAG_HEADER_NAME); | |
Header destETag = destRes.getFirstHeader(ETAG_HEADER_NAME); | |
if (srcETag == null || destETag == null) { | |
LOGGER.debug("Missing ETag for path " + path); | |
synchronized (differences) { | |
differences.add(MISSING_ETAG_STR + path); | |
} | |
return; | |
} | |
if (!srcETag.getValue().equals(destETag.getValue())) { | |
LOGGER.debug("ETag mismatch for path " + path); | |
synchronized (differences) { | |
differences.add(MISMATCH_ETAG_STR + path); | |
} | |
return; | |
} | |
LOGGER.debug("ETags match: " + srcETag.getValue()); | |
} catch (IOException e) { | |
ExceptionUtils.commute(e); | |
} | |
} | |
private ExecutorService configureExecutorService(OptionsResults<? extends Options> options) { | |
if (options.get(Options.parallelism) != null) { | |
return Executors.newFixedThreadPool(options.get(Options.parallelism)); | |
} | |
if (options.get(Options.infiniteParallelism)) { | |
return Executors.newCachedThreadPool(); | |
} | |
// we do a lot of network I/O so let's default to nCpu * 2 threads | |
return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment