Skip to content

Instantly share code, notes, and snippets.

@SCP002
Last active April 22, 2021 09:14
Show Gist options
  • Save SCP002/4b5a65aff312d9edcd79cdc4a58df028 to your computer and use it in GitHub Desktop.
Save SCP002/4b5a65aff312d9edcd79cdc4a58df028 to your computer and use it in GitHub Desktop.
Java 8: concurrent Filter funtion example. Using Worker Pool.
package org.example;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
abstract class ConcurrentFilter {
/**
* Concurrently removes elements from the given list based on {@link #keepFn(int, T)}.
* @param inp List to filter.
* @param <T> Type of objects in the list to filter.
* @return Filtered list.
* @throws InterruptedException {@inheritDoc}
* @throws ExecutionException {@inheritDoc}
*/
@SuppressWarnings("SameParameterValue")
<T> List<T> filter(List<T> inp) throws InterruptedException, ExecutionException {
// Create tasks.
List<Callable<T>> tasks = new ArrayList<>();
for (int i = 0; i < inp.size(); ++i) {
int idx = i; // Re-assign to use in lambda expression.
T val = inp.get(idx);
Callable<T> task = () -> {
// TODO: Printing status can be safely removed, used only for demonstration purposes.
System.out.println("Started " + val);
boolean keep = keepFn(idx, val);
System.out.println("Done " + val);
return (keep) ? val : null;
};
tasks.add(task);
}
// Run tasks.
ExecutorService threadPool = Executors.newCachedThreadPool();
List<Future<T>> futures = threadPool.invokeAll(tasks);
// Wait for all tasks to complete.
threadPool.shutdown();
//noinspection ResultOfMethodCallIgnored
threadPool.awaitTermination(60, TimeUnit.SECONDS);
// Collect results.
List<T> out = new ArrayList<>();
for (Future<T> fut : futures) {
T elm = fut.get();
if (elm != null) {
out.add(elm);
}
}
return out;
}
/**
* Method used by {@link #filter(List)} to decide keep input element or not.
* @param idx List element index to optionally filter basing on it.
* @param val List element value to optionally filter basing on it.
* @param <T> Type of list element.
* @return True to keep element, false to remove.
* @throws InterruptedException {@inheritDoc}
*/
abstract <T> boolean keepFn(int idx, T val) throws InterruptedException;
}
public class FilterExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
final List<String> input = Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10");
ConcurrentFilter cf = new ConcurrentFilter() {
@Override
<T> boolean keepFn(int idx, T val) throws InterruptedException {
// Sleep for 2 seconds to simulate heavy task.
Thread.sleep(2000);
// Keep every even element.
return idx % 2 != 0;
}
};
long start = System.currentTimeMillis();
// Launch filter.
List<String> output = cf.filter(input);
long end = System.currentTimeMillis();
long elapsed = end - start;
System.out.println("Filtering took: " + elapsed + " ms");
System.out.println("Result: " + output);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment