Skip to content

Instantly share code, notes, and snippets.

@rponte
Last active November 29, 2021 14:52
Show Gist options
  • Select an option

  • Save rponte/790fbda2a77307137b531c21905e0af0 to your computer and use it in GitHub Desktop.

Select an option

Save rponte/790fbda2a77307137b531c21905e0af0 to your computer and use it in GitHub Desktop.
Processamento em Lote concorrente com Java - API Concurrent: ExecutorService, ForkJoinPool (thread pool)
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class HugeCalculationInMultiThread {
public static void main(String[] args) throws Exception {
new TimeWatch().watch(new UnitOfWork() {
@Override
public void work() throws Exception {
int NUMBER_OF_THREADS = 8; // Runtime.getRuntime().availableProcessors();
List<Sum> subranges = new Sum(0, 10000000000L).split(NUMBER_OF_THREADS);
ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
List<Future<Long>> results = executor.invokeAll(subranges);
executor.shutdown();
long bigNumber = 0;
for (Future<Long> result : results) {
bigNumber += result.get();
}
System.out.println("Result: " + bigNumber);
}
});
}
}
import java.util.concurrent.ForkJoinPool;
public class HugeCalculationInMultiThreadWithForkJoin {
public static void main(String[] args) throws Exception {
new TimeWatch().watch(new UnitOfWork() {
@Override
public void work() throws Exception {
ForkJoinPool pool = new ForkJoinPool();
Long result = pool.invoke(new Sum(0, 10_000_000_000L));
System.out.println("Result: " + result);
}
});
}
}
public class HugeCalculationInSingleThread {
public static void main(String[] args) throws Exception {
new TimeWatch().watch(new UnitOfWork() {
@Override
public void work() throws Exception {
Sum sum = new Sum(0, 10000000000L);
Long result = sum.call();
System.out.println("Result: " + result);
}
});
}
}
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.RecursiveTask;
public class Sum extends RecursiveTask<Long> implements Callable<Long> {
private static final long serialVersionUID = 1L;
private static final int THRESHOLD = 1000000;
private final long from;
private final long to;
public Sum(long from, long to) {
this.from = from;
this.to = to;
}
@Override
public Long call() throws Exception {
long acc = 0;
for (long i = from; i <= to; i++) {
acc = acc + i;
}
return acc;
}
@Override
protected Long compute() {
boolean tooSmallComputation = (to - from) < THRESHOLD;
if (tooSmallComputation) {
try {
return call();
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
long result = 0;
Collection<Sum> results = invokeAll(split(4));
for (Sum sum : results) {
result += sum.join();
}
return result;
}
public List<Sum> split(int numberOfRanges) {
List<Sum> subranges = new ArrayList<>();
long totalLength = to - from;
long subrangeLength = totalLength / numberOfRanges;
long currentStart = from;
for (int i = 0; i < (numberOfRanges - 1); ++i) {
subranges.add(new Sum(currentStart, (currentStart + subrangeLength)));
currentStart += subrangeLength + 1;
}
subranges.add(new Sum(currentStart, to));
return subranges;
}
@Override
public String toString() {
return String.format("[%d, %d]", from, to);
}
public static void main(String[] args) {
int processors = Runtime.getRuntime().availableProcessors();
System.out.println(Integer.toString(processors) + " processor" + (processors != 1 ? "s are " : " is ")
+ "available");
Sum bigRange = new Sum(0, 10000000000L);
List<Sum> subranges = bigRange.split(1);
for (Sum sum : subranges) {
System.out.println(sum.toString());
}
}
}
import static java.util.concurrent.TimeUnit.*;
public class TimeWatch {
public void watch(UnitOfWork job) throws Exception {
long starts = System.nanoTime();
job.work();
long ends = System.nanoTime();
long timeInMilliseconds = MILLISECONDS.convert((ends-starts), NANOSECONDS);
System.out.println("Elapse time: " + timeInMilliseconds);
}
}
public interface UnitOfWork {
public void work() throws Exception;
}
@rponte
Copy link
Author

rponte commented Mar 12, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment