Skip to content

Instantly share code, notes, and snippets.

@klepto
Created September 14, 2016 16:03
Show Gist options
  • Select an option

  • Save klepto/703c711bac06afc6821d9c413f2fb0cb to your computer and use it in GitHub Desktop.

Select an option

Save klepto/703c711bac06afc6821d9c413f2fb0cb to your computer and use it in GitHub Desktop.
Continuations implementation in Java using Thread Parking
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.LockSupport;
/**
* Executor implementation with pausable {@link Task} execution. Imitates single thread behavior
* with usage of {@link LockSupport} method.
*
* @author kLeptO <http://www.rune-server.org/members/klepto/>
*/
public class Executor {
private Thread executorThread;
private Queue<Task> tasksQueue = new LinkedList<Task>();
private Thread workerThread;
private volatile long unparkId;
private final ExecutorService threadPool = Executors.newCachedThreadPool();
private final Map<Task, Thread> pausedTasks = new HashMap<Task, Thread>();
/**
* Executes a list of {@link Task}s. Any of the following {@link Task}s may call
* {@link #pauseTask(Task)} method in order to pause itself till the next call of this method.
*
* @param tasks
* A list of tasks to be executed.
*/
public void execute(List<Task> tasks) {
// gets called by scheduler
// save instance of current thread
executorThread = Thread.currentThread();
// construct tasks queue
tasksQueue.addAll(tasks);
// start tasks execution
continueWorkerThread(true);
}
/**
* Wakes-up the worker thread. If there is no worker thread available, a new one gets invoked
* from the thread pool.
*
* @param parkSelf
* Indicates if caller thread should be parked.
*/
private void continueWorkerThread(boolean parkSelf) {
if (workerThread == null) {
// if there no is set worker thread, tell thread pool to execute on a new one
threadPool.execute(new Runnable() {
@Override
public void run() {
workerThread = Thread.currentThread();
executeRemainingTasks();
}
});
} else {
// there is active worker thread, continue it
unpark(workerThread);
}
if (parkSelf) {
// park caller thread if required
park();
}
}
/**
* Executes remaining tasks. This method should only be invoked from a worker thread.
*/
private void executeRemainingTasks() {
while (true) {
// if another worker is dealing with remaining tasks, continue to that worker
if (workerThread != Thread.currentThread()) {
continueWorkerThread(false);
break;
}
// get next task
Task task = tasksQueue.poll();
// if there are no tasks available, return to executor
if (task == null) {
unpark(executorThread);
park();
// we want to continue executing tasks after waking up
continue;
}
if (pausedTasks.get(task) == null) {
// this task is not paused, execute it normally
task.execute(this);
} else {
// this task is paused, switch to task's worker thread
unpark(pausedTasks.get(task));
park();
// we want to grab a new task since another worker deals with this one
continue;
}
// if task was paused and we got to here, it means it's not paused anymore
if (pausedTasks.get(task) != null) {
pausedTasks.remove(task);
}
}
}
/**
* Pauses {@link Task} execution. Should only be called by currently executed task.
*
* @param task
* The task to be paused.
*/
public void pauseTask(Task task) {
if (workerThread == Thread.currentThread()) {
// this is newly paused task
pausedTasks.put(task, workerThread);
// current worker is paused, indicate that we need a new one from the thread pool
workerThread = null;
}
// this worker is paused, continue to next one to execute remaining tasks
continueWorkerThread(true);
}
/**
* Parks the current thread.
*/
private void park() {
// fix for spurious wake-ups
while (Thread.currentThread().getId() != unparkId) {
LockSupport.park();
}
unparkId = -1;
}
/**
* Unparks specified thread.
*
* @param thread
* The thread to be unparked.
*/
private void unpark(Thread thread) {
// have to keep track of which thread needs to wake-up since they may wake-up randomly
unparkId = thread.getId();
LockSupport.unpark(thread);
}
}
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Scheduled {@link Task} executor.
*
* @author kLeptO <http://www.rune-server.org/members/klepto/>
*/
public class Scheduler implements Runnable {
private final Executor executor = new Executor();
private final List<Task> tasks = new LinkedList<Task>();
private final Queue<Task> newTasks = new LinkedList<Task>();
/**
* Starts the scheduler. This method should only be invoked after the initial tasks are already
* added to the scheduler. Scheduler is intended to be used in single-thread environment and is
* not thread-safe.
*/
public void init() {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this, 0,
INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS);
}
/**
* Gets invoked periodically by underlying scheduled executor.
*/
@Override
public void run() {
// add new tasks to the list
while (!newTasks.isEmpty()) {
tasks.add(newTasks.poll());
}
// execute tasks
executor.execute(tasks);
// remove canceled tasks
Iterator<Task> it$ = tasks.iterator();
for (Task task; it$.hasNext();) {
task = it$.next();
if (task.isCanceled()) {
it$.remove();
}
}
}
/**
* Submits a new {@link Task} to the scheduler. The {@link Task} will begin it's execution at
* the next scheduler cycle.
*
* @param task
* The task to be executed.
*
* @return True if {@link Task} was added successfully.
*/
public boolean schedule(Task task) {
return newTasks.add(task);
}
/**
* The interval in milliseconds between each scheduler cycle.
*/
public static final int INTERVAL_IN_MILLIS = 600;
}
/**
* Abstract task implementation.
*
* @author kLeptO <http://www.rune-server.org/members/klepto/>
*/
public abstract class Task {
private Executor executor;
private boolean canceled;
/**
* Sets the current task executor and executes the task.
*
* @param executor
* The task executor.
*/
protected void execute(Executor executor) {
this.executor = executor;
execute();
}
/**
* Executes the task.
*/
public abstract void execute();
/**
* Pauses the task for a given amount of {@link Scheduler} cycles.
*
* @param delay
* The amount of scheduler cycles.
*/
public void pause(int delay) {
for (; delay > 0; delay--) {
executor.pauseTask(this);
}
}
/**
* Cancels the task from being executed in the upcoming {@link Scheduler} cycles.
*/
public void cancel() {
canceled = true;
}
/**
* Indicates if this task is canceled.
*
* @return True if this task was canceled.
*/
public boolean isCanceled() {
return canceled;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment