Skip to content

Instantly share code, notes, and snippets.

@debop
Created January 11, 2013 10:01
Show Gist options
  • Save debop/4509435 to your computer and use it in GitHub Desktop.
Save debop/4509435 to your computer and use it in GitHub Desktop.
병렬 실행
public static <T, V> List<V> run(final Iterable<T> elements, final Function1<T, V> function) {
shouldNotBeNull(elements, "elements");
shouldNotBeNull(function, "function");
ExecutorService executor = Executors.newFixedThreadPool(getProcessCount());
if (log.isDebugEnabled())
log.debug("작업을 병렬로 수행합니다. 작업 스레드 수=[{}]", getProcessCount());
try {
List<T> elemList = Lists.newArrayList(elements);
int partitionSize = getPartitionSize(elemList.size(), getProcessCount());
List<List<T>> partitions = Lists.partition(elemList, partitionSize);
final Map<Integer, List<V>> localResults = Maps.newLinkedHashMap();
List<Callable<List<V>>> tasks = Lists.newLinkedList(); // False Sharing을 방지하기 위해
for (int p = 0; p < partitions.size(); p++) {
final List<T> partition = partitions.get(p);
final List<V> localResult = Lists.newArrayListWithCapacity(partition.size());
localResults.put(p, localResult);
Callable<List<V>> task = new Callable<List<V>>() {
@Override
public List<V> call() throws Exception {
for (final T element : partition)
localResult.add(function.execute(element));
return localResult;
}
};
tasks.add(task);
}
executor.invokeAll(tasks);
List<V> results = Lists.newArrayListWithCapacity(elemList.size());
for (int i = 0; i < partitions.size(); i++) {
results.addAll(localResults.get(i));
}
if (log.isDebugEnabled())
log.debug("모든 작업을 병렬로 완료했습니다. partition size=[{}]", partitions.size());
return results;
} catch (Exception e) {
log.error("데이터에 대한 병렬 작업 중 예외가 발생했습니다.", e);
throw new RuntimeException(e);
} finally {
executor.shutdown();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment