Skip to content

Instantly share code, notes, and snippets.

@sunng87
Created May 8, 2012 08:16
Show Gist options
  • Save sunng87/2633503 to your computer and use it in GitHub Desktop.
Save sunng87/2633503 to your computer and use it in GitHub Desktop.
A combined future impl
/**
*
*/
package info.sunng.util.concurrent;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @author nsun
*
*/
public class MultiFuture<T> implements Future<List<T>> {
private List<Future<T>> futures = new ArrayList<Future<T>>();
public MultiFuture(Collection<Future<T>> futures) {
this.futures.addAll(futures);
}
public MultiFuture(Future<T>... futures) {
for (Future<T> f: futures) {
this.futures.add(f);
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean result = true;
for(Future<T> f : this.futures) {
result = f.cancel(mayInterruptIfRunning) && result;
if (!result) {
break;
}
}
return result;
}
@Override
public boolean isCancelled() {
boolean result = true;
for (Future<T> f: this.futures) {
result = f.isCancelled() && result;
if (!result) {
break;
}
}
return result;
}
@Override
public boolean isDone() {
boolean result = true;
for (Future<T> f: this.futures) {
result = f.isDone() && result;
if (! result) {
break;
}
}
return result;
}
@Override
public List<T> get() throws InterruptedException, ExecutionException {
List<T> results = new ArrayList<T>(this.futures.size());
for (Future<T> f: this.futures) {
results.add(f.get());
}
return results;
}
@Override
public List<T> get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
List<T> results = new ArrayList<T>();
boolean alreadyTimeout = false;
long t0 = 0;
long millisRemaining = unit.toMillis(timeout);
for (Future<T> f: this.futures) {
if (f == null) {
results.add(null);
}
try {
if (millisRemaining <= 0) {
alreadyTimeout = true;
}
if (!alreadyTimeout) {
t0 = System.currentTimeMillis();
T r = f.get(millisRemaining, TimeUnit.MILLISECONDS);
millisRemaining = millisRemaining - (System.currentTimeMillis() - t0);
results.add(r);
} else {
if (f.isDone()) {
T r = f.get(5, TimeUnit.MILLISECONDS);
results.add(r);
}
}
} catch (TimeoutException e) {
millisRemaining = millisRemaining - (System.currentTimeMillis() - t0);
}
}
if (results.isEmpty()) {
throw new TimeoutException();
}
return results;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment