Created
November 18, 2011 13:37
-
-
Save jgyonzo/1376482 to your computer and use it in GitHub Desktop.
ForkApiCall
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.ml; | |
| import grails.util.GrailsUtil; | |
| import groovy.lang.Closure; | |
| import java.util.Map; | |
| import java.util.Map.Entry; | |
| import java.util.concurrent.ConcurrentHashMap; | |
| import java.util.concurrent.ConcurrentMap; | |
| import java.util.concurrent.CountDownLatch; | |
| import java.util.concurrent.ExecutorService; | |
| import java.util.concurrent.Executors; | |
| import org.apache.log4j.Logger; | |
| import com.google.common.base.Preconditions; | |
| public class ForkApiCall{ | |
| static final int MAX_THREADS = 200; | |
| static ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS); | |
| static final Logger log = Logger.getLogger(ForkApiCall.class); | |
| public static final Boolean NULL_OBJECT = new Boolean(false); | |
| /** | |
| * Does not return null keys or values. If a closure execution returned null, it returns NULL_OBJECT. | |
| * If an exception is thrown during closure execution, the exception is returned as value in the return map | |
| * | |
| * @param apiCalls map with closures to be executed parallel, no null keys nor values are allowed | |
| * @return map with same key and the closure execution result | |
| */ | |
| @Deprecated | |
| public static Map<String,Object> fork(final Map<String,Closure> apiCalls){ | |
| Preconditions.checkNotNull(apiCalls, "apiCalls map cannot be null"); | |
| final CountDownLatch ready = new CountDownLatch(apiCalls.size()); | |
| final CountDownLatch start = new CountDownLatch(1); | |
| final CountDownLatch done = new CountDownLatch(apiCalls.size()); | |
| final ConcurrentMap<String,Object> results = new ConcurrentHashMap<String,Object>(); | |
| for (final Entry<String,Closure> apiCall : apiCalls.entrySet()) { | |
| Preconditions.checkNotNull(apiCall.getKey(),"keys cannot be null"); | |
| Preconditions.checkNotNull(apiCall.getValue(),"values cannot be null"); | |
| executor.execute(new Runnable(){ | |
| public void run() { | |
| ready.countDown(); // Tell timer we're ready | |
| long startI = 0; | |
| try { | |
| start.await(); // Wait till peers are ready | |
| startI = System.currentTimeMillis(); | |
| Object closureResult = apiCall.getValue().call(); | |
| if(closureResult != null){ | |
| results.putIfAbsent(apiCall.getKey(),closureResult); | |
| }else{ | |
| log.info(String.format("Call of key=%s returned null, putting NULL_OBJECT",apiCall.getKey())); | |
| results.putIfAbsent(apiCall.getKey(),NULL_OBJECT); | |
| } | |
| } catch (Exception e) { | |
| log.error(String.format("error in key %s ",String.valueOf(apiCall.getKey())),GrailsUtil.sanitize(e)); | |
| results.putIfAbsent(apiCall.getKey(),e); //tell the caller that this apiCall threw exception | |
| Thread.currentThread().interrupt(); | |
| } finally { | |
| long end = System.currentTimeMillis() - startI; | |
| log.info(String.format("%s finished in %d",String.valueOf(apiCall.getKey()), end)); | |
| done.countDown(); // Tell timer we're done | |
| } | |
| } | |
| }); | |
| } | |
| try { | |
| ready.await(); | |
| // Wait for all workers to be ready | |
| long startMillis = System.currentTimeMillis(); | |
| start.countDown(); // And they're off! | |
| log.info("start and wait for done"); | |
| done.await(); | |
| // Wait for all workers to finish | |
| log.info("done! waiting time: " + (System.currentTimeMillis() - startMillis)); | |
| return results; | |
| } catch (Exception e) { | |
| throw new RuntimeException("callable",e); | |
| } | |
| } | |
| /** | |
| * Does not return null keys or values. If a closure execution returned null, it returns NULL_OBJECT. | |
| * If an exception is thrown during closure execution, it is put in errors Map as value with the key associated to the closure | |
| * | |
| * @param apiCalls map with closures to be executed parallel, no null keys nor values are allowed | |
| * @param errors here we return the closure executions that threw exception | |
| * @return map with same key and the closure execution result | |
| */ | |
| public static Map<String,Object> fork(final Map<String,Closure> apiCalls, final ConcurrentMap<String,Exception> errors){ | |
| Preconditions.checkNotNull(apiCalls, "apiCalls map cannot be null"); | |
| Preconditions.checkNotNull(errors, "errors map cannot be null"); | |
| final CountDownLatch ready = new CountDownLatch(apiCalls.size()); | |
| final CountDownLatch start = new CountDownLatch(1); | |
| final CountDownLatch done = new CountDownLatch(apiCalls.size()); | |
| final ConcurrentMap<String,Object> results = new ConcurrentHashMap<String,Object>(); | |
| for (final Entry<String,Closure> apiCall : apiCalls.entrySet()) { | |
| Preconditions.checkNotNull(apiCall.getKey(),"keys cannot be null"); | |
| Preconditions.checkNotNull(apiCall.getValue(),"values cannot be null"); | |
| executor.execute(new Runnable(){ | |
| public void run() { | |
| ready.countDown(); // Tell timer we're ready | |
| long startI = 0; | |
| try { | |
| start.await(); // Wait till peers are ready | |
| startI = System.currentTimeMillis(); | |
| Object closureResult = apiCall.getValue().call(); | |
| if(closureResult != null){ | |
| results.putIfAbsent(apiCall.getKey(),closureResult); | |
| }else{ | |
| log.info(String.format("Call of key=%s returned null, putting NULL_OBJECT",apiCall.getKey())); | |
| results.putIfAbsent(apiCall.getKey(),NULL_OBJECT); | |
| } | |
| } catch (Exception e) { | |
| log.error(String.format("error in key %s ",String.valueOf(apiCall.getKey())),GrailsUtil.sanitize(e)); | |
| errors.putIfAbsent(apiCall.getKey(),e); //tell the caller that this apiCall threw exception | |
| Thread.currentThread().interrupt(); | |
| } finally { | |
| long end = System.currentTimeMillis() - startI; | |
| log.info(String.format("%s finished in %d",String.valueOf(apiCall.getKey()), end)); | |
| done.countDown(); // Tell timer we're done | |
| } | |
| } | |
| }); | |
| } | |
| try { | |
| ready.await(); | |
| // Wait for all workers to be ready | |
| long startMillis = System.currentTimeMillis(); | |
| start.countDown(); // And they're off! | |
| log.info("start and wait for done"); | |
| done.await(); | |
| // Wait for all workers to finish | |
| log.info("done! waiting time: " + (System.currentTimeMillis() - startMillis)); | |
| return results; | |
| } catch (Exception e) { | |
| throw new RuntimeException("callable",e); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment