Last active
February 19, 2018 14:37
-
-
Save americanstone/841b92416d72a811d543fba91e96ac7c to your computer and use it in GitHub Desktop.
OrderedParallelLoading BiFunction process list
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.dealer.modules.cms.promotions.services; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.concurrent.RecursiveAction; | |
import java.util.function.BiFunction; | |
/*** | |
* A recursive resultless ForkJoinTask. The BiFunction knows how to take a list of feed and context to generate a ordered results. | |
* context is shared by all child thread. caller need to make sure the context is thread safe | |
* there is example to demonstrate the usage in PromotionsService. | |
* @param <T> the type of the first argument to the function | |
* @param <U> the type of the second argument to the function | |
* @param <R> the type of the result of the function | |
*/ | |
public class OrderedParallelLoading<T, U, R> extends RecursiveAction { | |
private static Log LOG = LogFactory.getLog(OrderedParallelLoading.class); | |
private List<T> feeds; | |
private U context; | |
private R[] results; | |
private BiFunction<List<T>, U, List<R>> fn; | |
private int start; | |
private int end; | |
private int threshold; | |
OrderedParallelLoading(List<T> feeds, U context, BiFunction<List<T>, U, List<R>> fn, R[] results, int threshold){ | |
this.feeds = feeds; | |
this.context = context; | |
this.fn = fn; | |
this.results = results; | |
this.start = 0; | |
this.end = feeds.size(); | |
this.threshold = threshold == 0 ? 5 : threshold; | |
} | |
OrderedParallelLoading(List<T> feeds, U context, BiFunction<List<T>, U, List<R>> fn, int start, int end, R[] results, int threshold) { | |
this.feeds = feeds; | |
this.context = context; | |
this.fn = fn; | |
this.start = start; | |
this.end = end; | |
this.results = results; | |
this.threshold = threshold; | |
} | |
@Override | |
protected void compute(){ | |
int length = end - start; | |
if(length <= threshold ){ | |
// do the actual work based on threshold default is 5 | |
task(feeds.subList(start, end), start, end); | |
}else{ | |
// divide and conquer to small chunk | |
OrderedParallelLoading leftTask = new OrderedParallelLoading(feeds, context, fn, start, start + length/2, results, threshold); | |
leftTask.fork(); | |
OrderedParallelLoading rightTask = new OrderedParallelLoading(feeds, context, fn, start + length/2, end, results, threshold); | |
rightTask.compute(); | |
leftTask.join(); | |
} | |
} | |
private void task(List<T> feeds, int start, int end){ | |
try { | |
List<R> result = fn.apply(feeds, context); // be careful context is shared by all thread! | |
if(result != null){ | |
// thread safe and keep the order | |
for(int i = 0 ; i < result.size(); i++){ | |
results[start] = result.get(i); | |
start++; | |
} | |
} | |
}catch(Exception e){ | |
LOG.error(e); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment