Skip to content

Instantly share code, notes, and snippets.

@skhatri
Created May 16, 2012 05:28
Show Gist options
  • Save skhatri/2707724 to your computer and use it in GitHub Desktop.
Save skhatri/2707724 to your computer and use it in GitHub Desktop.
Thread pool Completion service example to load a given URL
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpMethod;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.params.HttpClientParams;
public class HttpFetch {
private static final String SERVICE_TO_INVOKE = "http://www-wiki/";
private final ExecutorService executor;
public HttpFetch(ExecutorService executor) {
this.executor = executor;
}
public List<DataSet> fetchData() throws Exception {
ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(executor);
for (String url : getUrls()) {
Callable<String> fetchTask = new FetchHttpData(url);
completionService.submit(fetchTask);
}
List<DataSet> collectionResult = new ArrayList<DataSet>();
for (int i = 0; i < getUrls().size(); i++) {
try {
Future<String> futureResult = completionService.take();
String result = futureResult.get();
collectionResult.add(new DataSet(result));
} catch (ExecutionException ee) {
collectionResult.add(new DataSet((Exception) ee.getCause()));
} catch (Exception e) {
collectionResult.add(new DataSet(e));
}
}
return collectionResult;
}
class FetchHttpData implements Callable<String> {
private final String url;
private static final int TIMEOUT_IN_MILLIS = 100000;
public FetchHttpData(String url) {
this.url = url;
}
@Override
public String call() throws Exception {
HttpMethod get = new GetMethod(url);
System.out.println("invoking url " + url);
HttpClient client = createHttpClient();
try {
int responseCode = client.executeMethod(get);
if (responseCode != HttpStatus.SC_OK) {
throw new RuntimeException(String.format("error fetching via http, %s status: %s", url,
responseCode));
}
byte[] readBuffer = get.getResponseBody();
return new String(readBuffer, "UTF-8");
} catch (Exception e) {
throw new RuntimeException(String.format("error fetching via http, %s", e.getMessage()));
}
}
private HttpClient createHttpClient() {
HttpClientParams clientParams = new HttpClientParams();
clientParams.setConnectionManagerTimeout(TIMEOUT_IN_MILLIS);
clientParams.setSoTimeout(TIMEOUT_IN_MILLIS);
return new HttpClient(clientParams);
}
}
private List<String> getUrls() {
List<String> urlList = new ArrayList<String>();
for (int i=0; i<10; i++) {
urlList.add(SERVICE_TO_INVOKE);
}
return urlList;
}
class DataSet {
private final boolean success;
private final String data;
private final Exception cause;
public DataSet(String data) {
this(data, true, null);
}
public DataSet(Exception cause) {
this(null, false, cause);
}
private DataSet(String data, boolean success, Exception cause) {
this.data = data;
this.success = success;
this.cause = cause;
}
public boolean isSuccess() {
return success;
}
public String getData() {
return data;
}
public Exception getCause() {
return cause;
}
}
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(Integer.parseInt(System.getProperty("thread.count", "2")));
HttpFetch fetch = new HttpFetch(executor);
List<DataSet> allResults = fetch.fetchData();
for (DataSet data: allResults) {
System.out.println(data.getData());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment