Skip to content

Instantly share code, notes, and snippets.

@americanstone
Last active February 19, 2018 14:37
Show Gist options
  • Save americanstone/841b92416d72a811d543fba91e96ac7c to your computer and use it in GitHub Desktop.
Save americanstone/841b92416d72a811d543fba91e96ac7c to your computer and use it in GitHub Desktop.
OrderedParallelLoading BiFunction process list
package com.dealer.modules.cms.promotions.services;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.RecursiveAction;
import java.util.function.BiFunction;
/***
* A recursive resultless ForkJoinTask. The BiFunction knows how to take a list of feed and context to generate a ordered results.
* context is shared by all child thread. caller need to make sure the context is thread safe
* there is example to demonstrate the usage in PromotionsService.
* @param <T> the type of the first argument to the function
* @param <U> the type of the second argument to the function
* @param <R> the type of the result of the function
*/
public class OrderedParallelLoading<T, U, R> extends RecursiveAction {
private static Log LOG = LogFactory.getLog(OrderedParallelLoading.class);
private List<T> feeds;
private U context;
private R[] results;
private BiFunction<List<T>, U, List<R>> fn;
private int start;
private int end;
private int threshold;
OrderedParallelLoading(List<T> feeds, U context, BiFunction<List<T>, U, List<R>> fn, R[] results, int threshold){
this.feeds = feeds;
this.context = context;
this.fn = fn;
this.results = results;
this.start = 0;
this.end = feeds.size();
this.threshold = threshold == 0 ? 5 : threshold;
}
OrderedParallelLoading(List<T> feeds, U context, BiFunction<List<T>, U, List<R>> fn, int start, int end, R[] results, int threshold) {
this.feeds = feeds;
this.context = context;
this.fn = fn;
this.start = start;
this.end = end;
this.results = results;
this.threshold = threshold;
}
@Override
protected void compute(){
int length = end - start;
if(length <= threshold ){
// do the actual work based on threshold default is 5
task(feeds.subList(start, end), start, end);
}else{
// divide and conquer to small chunk
OrderedParallelLoading leftTask = new OrderedParallelLoading(feeds, context, fn, start, start + length/2, results, threshold);
leftTask.fork();
OrderedParallelLoading rightTask = new OrderedParallelLoading(feeds, context, fn, start + length/2, end, results, threshold);
rightTask.compute();
leftTask.join();
}
}
private void task(List<T> feeds, int start, int end){
try {
List<R> result = fn.apply(feeds, context); // be careful context is shared by all thread!
if(result != null){
// thread safe and keep the order
for(int i = 0 ; i < result.size(); i++){
results[start] = result.get(i);
start++;
}
}
}catch(Exception e){
LOG.error(e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment