Skip to content

Instantly share code, notes, and snippets.

@saswata-dutta
Created June 9, 2021 07:30
Show Gist options
  • Save saswata-dutta/52864b7375aa7a5b0ed14548fe70db89 to your computer and use it in GitHub Desktop.
Save saswata-dutta/52864b7375aa7a5b0ed14548fe70db89 to your computer and use it in GitHub Desktop.
Emulating delayed scheduled thread executor
import java.util.Comparator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
class PqItem {
public final long ts;
public final Runnable action;
PqItem(long _ts, Runnable _action) {
ts = _ts;
action = _action;
}
}
class TaskScheduler {
private final long period;
private final PriorityBlockingQueue<PqItem> tasks;
private final Thread daemon;
private final ExecutorService executor;
TaskScheduler(long granularityMs, int poolSize) {
period = granularityMs;
tasks = new PriorityBlockingQueue<>(16, Comparator.comparingLong(a -> a.ts));
executor = Executors.newFixedThreadPool(poolSize);
daemon = new Thread(this::daemonTask);
}
private static long TS() {
return System.currentTimeMillis();
}
public void submit(Runnable task, long delayMs) {
long ts = delayMs + TS();
PqItem item = new PqItem(ts, task);
System.out.println("....Pending for " + ts);
tasks.add(item);
}
public void start() {
daemon.start();
}
private void daemonTask() {
System.out.println("....Daemon started");
while (true) {
try {
long ts = TS();
System.out.println("....Daemon checking " + ts);
checkTasks();
Thread.sleep(period);
} catch (InterruptedException e) {
System.out.println("....Daemon stopped");
return;
}
}
}
public void shutDown() {
executor.shutdown();
daemon.interrupt();
}
private void checkTasks() {
if (tasks.isEmpty()) return;
PqItem pqItem = tasks.peek();
while (pqItem != null && TS() >= pqItem.ts) {
pqItem = tasks.poll();
System.out.println("....Running Item " + pqItem.ts);
runTask(pqItem.action);
pqItem = tasks.peek();
}
}
private void checkTasksRecursive() {
PqItem pqItem = tasks.poll();
if (pqItem == null) return;
if (TS() >= pqItem.ts) {
System.out.println("....Running Item " + pqItem.ts);
runTask(pqItem.action);
checkTasksRecursive();
} else {
tasks.offer(pqItem);
}
}
private void runTask(Runnable action) {
executor.submit(action);
}
public static void main(String[] args) throws InterruptedException {
TaskScheduler scheduler = new TaskScheduler(500, 10);
scheduler.submit(() -> System.out.println("hi2"), 3_000L);
scheduler.submit(() -> System.out.println("hi1"), 1_000L);
scheduler.submit(() -> System.out.println("bye1"), 2_000L);
scheduler.submit(() -> System.out.println("bye2"), 2_000L);
scheduler.start();
Thread.sleep(5_000L);
scheduler.shutDown();
Thread.sleep(1_000L);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment