Created
December 12, 2023 05:25
-
-
Save krmahadevan/df96ffb180cb92f670b38d531f5a8949 to your computer and use it in GitHub Desktop.
Sample that demonstrates how to use CompletableFuture along with callbacks for listening to task completions which uses a ThreadPoolExecutor which is backed by a PriorityBlockingQueue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.lang.reflect.Field; | |
import java.util.ArrayList; | |
import java.util.Comparator; | |
import java.util.List; | |
import java.util.concurrent.*; | |
//Works | |
public class CompletableFuturePriorityQueueExample { | |
static class Task implements Comparable<Task> { | |
private final String name; | |
private final int priority; | |
public Task(String name, int priority) { | |
this.name = name; | |
this.priority = priority; | |
} | |
public String getName() { | |
return name; | |
} | |
public int getPriority() { | |
return priority; | |
} | |
@Override | |
public int compareTo(Task other) { | |
return Integer.compare(this.priority, other.priority); | |
} | |
} | |
static class MyRunnable implements Runnable, Comparable<MyRunnable> { | |
private final Task task; | |
public MyRunnable(Task task) { | |
this.task = task; | |
} | |
@Override | |
public void run() { | |
System.out.println("Processing task: " + task.getName() + " with priority " + task.getPriority() | |
+ " in thread: " + Thread.currentThread().getId()); | |
} | |
public Task getTask() { | |
return task; | |
} | |
@Override | |
public int compareTo(MyRunnable o) { | |
return task.compareTo(o.task); | |
} | |
} | |
private static Runnable extract(Object object) { | |
try { | |
Field field = object.getClass().getDeclaredField("fn"); | |
field.setAccessible(true); | |
return (Runnable) field.get(object); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
public static void main(String[] args) { | |
// Create a custom executor that executes tasks in the calling thread | |
Executor sameThreadExecutor = Runnable::run; | |
// Create a priority blocking queue with a custom comparator | |
PriorityBlockingQueue<Runnable> priorityQueue = new PriorityBlockingQueue<>(10, Comparator.comparing(it -> ((MyRunnable) extract(it)).getTask())); | |
// Create a thread pool executor with the priority queue | |
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, priorityQueue); | |
// Create a list to store CompletableFuture instances | |
List<CompletableFuture<Void>> futures = new ArrayList<>(); | |
// Enqueue tasks with different priorities | |
List<Task> tasks = List.of( | |
new Task("Task1", 3), | |
new Task("Task2", 1), | |
new Task("Task3", 2) | |
); | |
// Create CompletableFuture and associate runAsync with each task | |
for (Task task : tasks) { | |
CompletableFuture<Void> future = CompletableFuture.runAsync(new MyRunnable(task), executor); | |
// Attach a whenCompleteAsync callback for cleanup with the same executor | |
future.whenCompleteAsync((result, throwable) -> { | |
if (throwable == null) { | |
// Simulate cleanup logic | |
System.out.println("Cleanup completed for task: " + task.getName() + " with priority " + task.getPriority() | |
+ " in thread: " + Thread.currentThread().getId()); | |
} else { | |
// Handle exceptions if any | |
throwable.printStackTrace(); | |
} | |
}, sameThreadExecutor); | |
// Add the CompletableFuture to the list | |
futures.add(future); | |
} | |
// Wait for all CompletableFuture tasks to complete | |
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); | |
allOf.join(); | |
// Shut down the executor | |
executor.shutdown(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This was a result of interacting with ChatGPT. For the full conversation check here