Created
September 14, 2016 16:03
-
-
Save klepto/703c711bac06afc6821d9c413f2fb0cb to your computer and use it in GitHub Desktop.
Continuations implementation in Java using Thread Parking
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.util.HashMap; | |
| import java.util.LinkedList; | |
| import java.util.List; | |
| import java.util.Map; | |
| import java.util.Queue; | |
| import java.util.concurrent.ExecutorService; | |
| import java.util.concurrent.Executors; | |
| import java.util.concurrent.locks.LockSupport; | |
| /** | |
| * Executor implementation with pausable {@link Task} execution. Imitates single thread behavior | |
| * with usage of {@link LockSupport} method. | |
| * | |
| * @author kLeptO <http://www.rune-server.org/members/klepto/> | |
| */ | |
| public class Executor { | |
| private Thread executorThread; | |
| private Queue<Task> tasksQueue = new LinkedList<Task>(); | |
| private Thread workerThread; | |
| private volatile long unparkId; | |
| private final ExecutorService threadPool = Executors.newCachedThreadPool(); | |
| private final Map<Task, Thread> pausedTasks = new HashMap<Task, Thread>(); | |
| /** | |
| * Executes a list of {@link Task}s. Any of the following {@link Task}s may call | |
| * {@link #pauseTask(Task)} method in order to pause itself till the next call of this method. | |
| * | |
| * @param tasks | |
| * A list of tasks to be executed. | |
| */ | |
| public void execute(List<Task> tasks) { | |
| // gets called by scheduler | |
| // save instance of current thread | |
| executorThread = Thread.currentThread(); | |
| // construct tasks queue | |
| tasksQueue.addAll(tasks); | |
| // start tasks execution | |
| continueWorkerThread(true); | |
| } | |
| /** | |
| * Wakes-up the worker thread. If there is no worker thread available, a new one gets invoked | |
| * from the thread pool. | |
| * | |
| * @param parkSelf | |
| * Indicates if caller thread should be parked. | |
| */ | |
| private void continueWorkerThread(boolean parkSelf) { | |
| if (workerThread == null) { | |
| // if there no is set worker thread, tell thread pool to execute on a new one | |
| threadPool.execute(new Runnable() { | |
| @Override | |
| public void run() { | |
| workerThread = Thread.currentThread(); | |
| executeRemainingTasks(); | |
| } | |
| }); | |
| } else { | |
| // there is active worker thread, continue it | |
| unpark(workerThread); | |
| } | |
| if (parkSelf) { | |
| // park caller thread if required | |
| park(); | |
| } | |
| } | |
| /** | |
| * Executes remaining tasks. This method should only be invoked from a worker thread. | |
| */ | |
| private void executeRemainingTasks() { | |
| while (true) { | |
| // if another worker is dealing with remaining tasks, continue to that worker | |
| if (workerThread != Thread.currentThread()) { | |
| continueWorkerThread(false); | |
| break; | |
| } | |
| // get next task | |
| Task task = tasksQueue.poll(); | |
| // if there are no tasks available, return to executor | |
| if (task == null) { | |
| unpark(executorThread); | |
| park(); | |
| // we want to continue executing tasks after waking up | |
| continue; | |
| } | |
| if (pausedTasks.get(task) == null) { | |
| // this task is not paused, execute it normally | |
| task.execute(this); | |
| } else { | |
| // this task is paused, switch to task's worker thread | |
| unpark(pausedTasks.get(task)); | |
| park(); | |
| // we want to grab a new task since another worker deals with this one | |
| continue; | |
| } | |
| // if task was paused and we got to here, it means it's not paused anymore | |
| if (pausedTasks.get(task) != null) { | |
| pausedTasks.remove(task); | |
| } | |
| } | |
| } | |
| /** | |
| * Pauses {@link Task} execution. Should only be called by currently executed task. | |
| * | |
| * @param task | |
| * The task to be paused. | |
| */ | |
| public void pauseTask(Task task) { | |
| if (workerThread == Thread.currentThread()) { | |
| // this is newly paused task | |
| pausedTasks.put(task, workerThread); | |
| // current worker is paused, indicate that we need a new one from the thread pool | |
| workerThread = null; | |
| } | |
| // this worker is paused, continue to next one to execute remaining tasks | |
| continueWorkerThread(true); | |
| } | |
| /** | |
| * Parks the current thread. | |
| */ | |
| private void park() { | |
| // fix for spurious wake-ups | |
| while (Thread.currentThread().getId() != unparkId) { | |
| LockSupport.park(); | |
| } | |
| unparkId = -1; | |
| } | |
| /** | |
| * Unparks specified thread. | |
| * | |
| * @param thread | |
| * The thread to be unparked. | |
| */ | |
| private void unpark(Thread thread) { | |
| // have to keep track of which thread needs to wake-up since they may wake-up randomly | |
| unparkId = thread.getId(); | |
| LockSupport.unpark(thread); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.util.Iterator; | |
| import java.util.LinkedList; | |
| import java.util.List; | |
| import java.util.Queue; | |
| import java.util.concurrent.Executors; | |
| import java.util.concurrent.TimeUnit; | |
| /** | |
| * Scheduled {@link Task} executor. | |
| * | |
| * @author kLeptO <http://www.rune-server.org/members/klepto/> | |
| */ | |
| public class Scheduler implements Runnable { | |
| private final Executor executor = new Executor(); | |
| private final List<Task> tasks = new LinkedList<Task>(); | |
| private final Queue<Task> newTasks = new LinkedList<Task>(); | |
| /** | |
| * Starts the scheduler. This method should only be invoked after the initial tasks are already | |
| * added to the scheduler. Scheduler is intended to be used in single-thread environment and is | |
| * not thread-safe. | |
| */ | |
| public void init() { | |
| Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this, 0, | |
| INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS); | |
| } | |
| /** | |
| * Gets invoked periodically by underlying scheduled executor. | |
| */ | |
| @Override | |
| public void run() { | |
| // add new tasks to the list | |
| while (!newTasks.isEmpty()) { | |
| tasks.add(newTasks.poll()); | |
| } | |
| // execute tasks | |
| executor.execute(tasks); | |
| // remove canceled tasks | |
| Iterator<Task> it$ = tasks.iterator(); | |
| for (Task task; it$.hasNext();) { | |
| task = it$.next(); | |
| if (task.isCanceled()) { | |
| it$.remove(); | |
| } | |
| } | |
| } | |
| /** | |
| * Submits a new {@link Task} to the scheduler. The {@link Task} will begin it's execution at | |
| * the next scheduler cycle. | |
| * | |
| * @param task | |
| * The task to be executed. | |
| * | |
| * @return True if {@link Task} was added successfully. | |
| */ | |
| public boolean schedule(Task task) { | |
| return newTasks.add(task); | |
| } | |
| /** | |
| * The interval in milliseconds between each scheduler cycle. | |
| */ | |
| public static final int INTERVAL_IN_MILLIS = 600; | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Abstract task implementation. | |
| * | |
| * @author kLeptO <http://www.rune-server.org/members/klepto/> | |
| */ | |
| public abstract class Task { | |
| private Executor executor; | |
| private boolean canceled; | |
| /** | |
| * Sets the current task executor and executes the task. | |
| * | |
| * @param executor | |
| * The task executor. | |
| */ | |
| protected void execute(Executor executor) { | |
| this.executor = executor; | |
| execute(); | |
| } | |
| /** | |
| * Executes the task. | |
| */ | |
| public abstract void execute(); | |
| /** | |
| * Pauses the task for a given amount of {@link Scheduler} cycles. | |
| * | |
| * @param delay | |
| * The amount of scheduler cycles. | |
| */ | |
| public void pause(int delay) { | |
| for (; delay > 0; delay--) { | |
| executor.pauseTask(this); | |
| } | |
| } | |
| /** | |
| * Cancels the task from being executed in the upcoming {@link Scheduler} cycles. | |
| */ | |
| public void cancel() { | |
| canceled = true; | |
| } | |
| /** | |
| * Indicates if this task is canceled. | |
| * | |
| * @return True if this task was canceled. | |
| */ | |
| public boolean isCanceled() { | |
| return canceled; | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment