Skip to content

Instantly share code, notes, and snippets.

Last active February 11, 2017 21:08
Show Gist options
  • Save aikar/9010136 to your computer and use it in GitHub Desktop.
Save aikar/9010136 to your computer and use it in GitHub Desktop.
TaskChain Java Flow Control System - This is an archaic v1. V3 is now released at
import org.bukkit.Bukkit;
import org.bukkit.plugin.Plugin;
import java.util.concurrent.ConcurrentLinkedQueue;
* Facilitates Control Flow for the Bukkit Scheduler to easily jump between
* Async and Sync tasks without deeply nested callbacks, passing the response of the
* previous task to the next task to use.
* Usage example: TaskChain.newChain()
* .add(new TaskChain.AsyncTask {})
* .add(new TaskChain.Task {})
* .add(new AsyncTask {})
* .execute();
public class TaskChain {
* Utility helpers for Task returns. Changes the behavior of the Chain when these are returned.
// Tells a task it will perform call back later.
public static final Object ASYNC = new Object();
// Abort executing the chain
public static final Object ABORT = new Object();
* =============================================================================================
ConcurrentLinkedQueue<BaseTask> chainQueue = new ConcurrentLinkedQueue<BaseTask>();
boolean executed = false;
Object previous = null;
boolean async;
private final Plugin plugin;
public TaskChain() {
this.plugin = MyPlugin.getInstance(); // TODO: Change to get an instance to your plugin!
this.async = !Bukkit.isPrimaryThread();
* =============================================================================================
* Starts a new chain.
* @return
public static TaskChain newChain() {
return new TaskChain();
* Adds a delay to the chain execution
* @param ticks # of ticks to delay before next task (20 = 1 second)
* @return
public TaskChain delay(final int ticks) {
add(new GenericTask() {
// Prevent switching between sync/async
final BaseTask peek = TaskChain.this.chainQueue.peek();
this.async = peek != null ? peek.async : TaskChain.this.async;
public void run() {
final GenericTask task = this;
task.chain.async = false;
Bukkit.getScheduler().runTaskLater(plugin, new Runnable() {
public void run() {;
}, ticks);
return this;
* Adds a step to the chain execution. Async*Task will run off of main thread,
* *Task will run sync with main thread
* @param task
* @return
public TaskChain add(BaseTask task) {
synchronized (this) {
if (executed) {
throw new RuntimeException("TaskChain is executing");
return this;
* Finished adding tasks, begins executing them.
public void execute() {
synchronized (this) {
if (executed) {
throw new RuntimeException("Already executed");
executed = true;
* Fires off the next task, and switches between Async/Sync as necessary.
private void nextTask() {
final TaskChain chain = this;
final BaseTask task = chainQueue.poll();
if (task == null) {
// done!
if (task.async) {
if (async) {;
} else {
Bukkit.getScheduler().runTaskAsynchronously(plugin, new Runnable() {
public void run() {
chain.async = true;;
} else {
if (async) {
Bukkit.getScheduler().runTask(plugin, new Runnable() {
public void run() {
chain.async = false;;
} else {;
* Provides foundation of a task with what the previous task type should return
* to pass to this and what this task will return.
* @param <R> Return Type
* @param <A> Argument Type Expected
private abstract static class BaseTask<R, A> {
TaskChain chain = null;
boolean async = false;
boolean executed = false;
* Task Type classes will implement this
* @param arg
* @return
protected abstract R runTask(A arg);
* Called internally by Task Chain to facilitate executing the task and then the next task.
* @param chain
private void run(TaskChain chain) {
final Object arg = chain.previous;
chain.previous = null;
this.chain = chain;
R ret = this.runTask((A) arg);
if (chain.previous == null) {
chain.previous = ret;
if (chain.previous != ASYNC && chain.previous != ABORT) {
synchronized (this) {
executed = true;
* Tells the TaskChain to abort processing any more tasks.
public R abort() {
chain.previous = ABORT;
return null;
* Tells the TaskChain you will manually invoke the next task manually using;
public R async() {
chain.previous = ASYNC;
return null;
* Only to be used when paired with return this.async(); Must be called to execute the next task.
* To be used inside a callback of another operation that is performed async.
* @param resp
public void next(R resp) {
synchronized (this) {
if (executed) {
throw new RuntimeException(
"This task has already been executed. return this.async()");
chain.async = !Bukkit.isPrimaryThread(); // We don't know where the task called this from.
chain.previous = resp;
* General abstract classes to be used for various tasks in the chain.
* First Tasks are for when you do not have or do not care about the return
* value of a previous task.
* Last Tasks are for when you do not need to use a return type.
* A Generic task simply does not care about Previous Return or return
* anything itself.
* Async Tasks will not run on the Minecraft Thread and should not use the
* Bukkit API unless it is thread safe.
public abstract static class Task<R, A> extends BaseTask<R, A> {
protected abstract R run(A arg);
protected R runTask(A arg) {
return run(arg);
public abstract static class GenericTask extends BaseTask<Object, Object> {
protected abstract void run();
protected Object runTask(Object arg) {
return null;
public void next() {
public abstract static class FirstTask<R> extends BaseTask<R, Object> {
protected abstract R run();
protected R runTask(Object arg) {
return run();
public abstract static class LastTask<A> extends BaseTask<Object, A> {
protected abstract void run(A arg);
protected Object runTask(A arg) {
return null;
public void next() {
// Async helpers
public abstract static class AsyncTask<R, A> extends Task<R, A> {{async = true;}}
public abstract static class AsyncGenericTask extends GenericTask {{async = true;}}
public abstract static class AsyncFirstTask<R> extends FirstTask<R> {{async = true;}}
public abstract static class AsyncLastTask<A> extends LastTask<A> {{async = true;}}
public class Mail extends BukkitUtil.Listener {
public static void sendMail(final Player player, final String from, final String to,
final String subject, final Integer delay, final ItemStack... items) {
final Long toUserId = EmpireUser.getUserId(to);
final Long fromUserId = EmpireUser.getUserId(from);
int size = Util.roundUp(items.length, 9);
.add(Package.newPackageAsync(toUserId, subject, "MAIL", size))
.add(new AsyncTask<Package, Package>() {
public Package run(Package pkg) {
if (pkg == null) {
Util.sendMsg(player, "&cError");
returnItems(player, items);
return abort();
for (ItemStack item : items) {
if (item != null && item.getType() != Material.AIR) {
try {
EmpireDb.executeUpdate("INSERT INTO mail " +
"(user_id, package_id, sender_id, subject, send_time, open_delay) " +
"VALUES (?, ?, ?, ?, UNIX_TIMESTAMP(), ?)",
toUserId, pkg.getPackageId(), fromUserId, subject, delay);
} catch (SQLException e) {
Util.sendMsg(player, "&cDatabase Error");
returnItems(player, items);
return abort();
if (player != null) {
Util.sendMsg(player, "&aMail successfully sent!");
return pkg;
public static void readMail(final Player player, final Long mailId) {
final EmpireUser user = EmpireUser.getUser(player);
final Long userId = user.userId;
.add(new AsyncFirstTask<Message>() {
public Message run() {
try {
return Mail.getMessage(user, mailId);
} catch (SQLException e) {
Util.sendMsg(player, "&cUnknown Error");
return null;
.add(new LastTask<Message>() {
public void run(Message msg) {
if (msg == null) {
Util.sendMsg(player, MailLang.MAIL_NOT_FOUND);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment