Skip to content

Instantly share code, notes, and snippets.

@jgyonzo
Created November 18, 2011 13:37
Show Gist options
  • Select an option

  • Save jgyonzo/1376482 to your computer and use it in GitHub Desktop.

Select an option

Save jgyonzo/1376482 to your computer and use it in GitHub Desktop.
ForkApiCall
package com.ml;
import grails.util.GrailsUtil;
import groovy.lang.Closure;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.log4j.Logger;
import com.google.common.base.Preconditions;
public class ForkApiCall{
static final int MAX_THREADS = 200;
static ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS);
static final Logger log = Logger.getLogger(ForkApiCall.class);
public static final Boolean NULL_OBJECT = new Boolean(false);
/**
* Does not return null keys or values. If a closure execution returned null, it returns NULL_OBJECT.
* If an exception is thrown during closure execution, the exception is returned as value in the return map
*
* @param apiCalls map with closures to be executed parallel, no null keys nor values are allowed
* @return map with same key and the closure execution result
*/
@Deprecated
public static Map<String,Object> fork(final Map<String,Closure> apiCalls){
Preconditions.checkNotNull(apiCalls, "apiCalls map cannot be null");
final CountDownLatch ready = new CountDownLatch(apiCalls.size());
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch done = new CountDownLatch(apiCalls.size());
final ConcurrentMap<String,Object> results = new ConcurrentHashMap<String,Object>();
for (final Entry<String,Closure> apiCall : apiCalls.entrySet()) {
Preconditions.checkNotNull(apiCall.getKey(),"keys cannot be null");
Preconditions.checkNotNull(apiCall.getValue(),"values cannot be null");
executor.execute(new Runnable(){
public void run() {
ready.countDown(); // Tell timer we're ready
long startI = 0;
try {
start.await(); // Wait till peers are ready
startI = System.currentTimeMillis();
Object closureResult = apiCall.getValue().call();
if(closureResult != null){
results.putIfAbsent(apiCall.getKey(),closureResult);
}else{
log.info(String.format("Call of key=%s returned null, putting NULL_OBJECT",apiCall.getKey()));
results.putIfAbsent(apiCall.getKey(),NULL_OBJECT);
}
} catch (Exception e) {
log.error(String.format("error in key %s ",String.valueOf(apiCall.getKey())),GrailsUtil.sanitize(e));
results.putIfAbsent(apiCall.getKey(),e); //tell the caller that this apiCall threw exception
Thread.currentThread().interrupt();
} finally {
long end = System.currentTimeMillis() - startI;
log.info(String.format("%s finished in %d",String.valueOf(apiCall.getKey()), end));
done.countDown(); // Tell timer we're done
}
}
});
}
try {
ready.await();
// Wait for all workers to be ready
long startMillis = System.currentTimeMillis();
start.countDown(); // And they're off!
log.info("start and wait for done");
done.await();
// Wait for all workers to finish
log.info("done! waiting time: " + (System.currentTimeMillis() - startMillis));
return results;
} catch (Exception e) {
throw new RuntimeException("callable",e);
}
}
/**
* Does not return null keys or values. If a closure execution returned null, it returns NULL_OBJECT.
* If an exception is thrown during closure execution, it is put in errors Map as value with the key associated to the closure
*
* @param apiCalls map with closures to be executed parallel, no null keys nor values are allowed
* @param errors here we return the closure executions that threw exception
* @return map with same key and the closure execution result
*/
public static Map<String,Object> fork(final Map<String,Closure> apiCalls, final ConcurrentMap<String,Exception> errors){
Preconditions.checkNotNull(apiCalls, "apiCalls map cannot be null");
Preconditions.checkNotNull(errors, "errors map cannot be null");
final CountDownLatch ready = new CountDownLatch(apiCalls.size());
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch done = new CountDownLatch(apiCalls.size());
final ConcurrentMap<String,Object> results = new ConcurrentHashMap<String,Object>();
for (final Entry<String,Closure> apiCall : apiCalls.entrySet()) {
Preconditions.checkNotNull(apiCall.getKey(),"keys cannot be null");
Preconditions.checkNotNull(apiCall.getValue(),"values cannot be null");
executor.execute(new Runnable(){
public void run() {
ready.countDown(); // Tell timer we're ready
long startI = 0;
try {
start.await(); // Wait till peers are ready
startI = System.currentTimeMillis();
Object closureResult = apiCall.getValue().call();
if(closureResult != null){
results.putIfAbsent(apiCall.getKey(),closureResult);
}else{
log.info(String.format("Call of key=%s returned null, putting NULL_OBJECT",apiCall.getKey()));
results.putIfAbsent(apiCall.getKey(),NULL_OBJECT);
}
} catch (Exception e) {
log.error(String.format("error in key %s ",String.valueOf(apiCall.getKey())),GrailsUtil.sanitize(e));
errors.putIfAbsent(apiCall.getKey(),e); //tell the caller that this apiCall threw exception
Thread.currentThread().interrupt();
} finally {
long end = System.currentTimeMillis() - startI;
log.info(String.format("%s finished in %d",String.valueOf(apiCall.getKey()), end));
done.countDown(); // Tell timer we're done
}
}
});
}
try {
ready.await();
// Wait for all workers to be ready
long startMillis = System.currentTimeMillis();
start.countDown(); // And they're off!
log.info("start and wait for done");
done.await();
// Wait for all workers to finish
log.info("done! waiting time: " + (System.currentTimeMillis() - startMillis));
return results;
} catch (Exception e) {
throw new RuntimeException("callable",e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment