Last active
April 22, 2021 09:14
-
-
Save SCP002/4b5a65aff312d9edcd79cdc4a58df028 to your computer and use it in GitHub Desktop.
Java 8: concurrent Filter funtion example. Using Worker Pool.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.example; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.concurrent.*; | |
abstract class ConcurrentFilter { | |
/** | |
* Concurrently removes elements from the given list based on {@link #keepFn(int, T)}. | |
* @param inp List to filter. | |
* @param <T> Type of objects in the list to filter. | |
* @return Filtered list. | |
* @throws InterruptedException {@inheritDoc} | |
* @throws ExecutionException {@inheritDoc} | |
*/ | |
@SuppressWarnings("SameParameterValue") | |
<T> List<T> filter(List<T> inp) throws InterruptedException, ExecutionException { | |
// Create tasks. | |
List<Callable<T>> tasks = new ArrayList<>(); | |
for (int i = 0; i < inp.size(); ++i) { | |
int idx = i; // Re-assign to use in lambda expression. | |
T val = inp.get(idx); | |
Callable<T> task = () -> { | |
// TODO: Printing status can be safely removed, used only for demonstration purposes. | |
System.out.println("Started " + val); | |
boolean keep = keepFn(idx, val); | |
System.out.println("Done " + val); | |
return (keep) ? val : null; | |
}; | |
tasks.add(task); | |
} | |
// Run tasks. | |
ExecutorService threadPool = Executors.newCachedThreadPool(); | |
List<Future<T>> futures = threadPool.invokeAll(tasks); | |
// Wait for all tasks to complete. | |
threadPool.shutdown(); | |
//noinspection ResultOfMethodCallIgnored | |
threadPool.awaitTermination(60, TimeUnit.SECONDS); | |
// Collect results. | |
List<T> out = new ArrayList<>(); | |
for (Future<T> fut : futures) { | |
T elm = fut.get(); | |
if (elm != null) { | |
out.add(elm); | |
} | |
} | |
return out; | |
} | |
/** | |
* Method used by {@link #filter(List)} to decide keep input element or not. | |
* @param idx List element index to optionally filter basing on it. | |
* @param val List element value to optionally filter basing on it. | |
* @param <T> Type of list element. | |
* @return True to keep element, false to remove. | |
* @throws InterruptedException {@inheritDoc} | |
*/ | |
abstract <T> boolean keepFn(int idx, T val) throws InterruptedException; | |
} | |
public class FilterExample { | |
public static void main(String[] args) throws ExecutionException, InterruptedException { | |
final List<String> input = Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"); | |
ConcurrentFilter cf = new ConcurrentFilter() { | |
@Override | |
<T> boolean keepFn(int idx, T val) throws InterruptedException { | |
// Sleep for 2 seconds to simulate heavy task. | |
Thread.sleep(2000); | |
// Keep every even element. | |
return idx % 2 != 0; | |
} | |
}; | |
long start = System.currentTimeMillis(); | |
// Launch filter. | |
List<String> output = cf.filter(input); | |
long end = System.currentTimeMillis(); | |
long elapsed = end - start; | |
System.out.println("Filtering took: " + elapsed + " ms"); | |
System.out.println("Result: " + output); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment