Created
June 9, 2021 07:30
-
-
Save saswata-dutta/52864b7375aa7a5b0ed14548fe70db89 to your computer and use it in GitHub Desktop.
Emulating delayed scheduled thread executor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Comparator; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.PriorityBlockingQueue; | |
class PqItem { | |
public final long ts; | |
public final Runnable action; | |
PqItem(long _ts, Runnable _action) { | |
ts = _ts; | |
action = _action; | |
} | |
} | |
class TaskScheduler { | |
private final long period; | |
private final PriorityBlockingQueue<PqItem> tasks; | |
private final Thread daemon; | |
private final ExecutorService executor; | |
TaskScheduler(long granularityMs, int poolSize) { | |
period = granularityMs; | |
tasks = new PriorityBlockingQueue<>(16, Comparator.comparingLong(a -> a.ts)); | |
executor = Executors.newFixedThreadPool(poolSize); | |
daemon = new Thread(this::daemonTask); | |
} | |
private static long TS() { | |
return System.currentTimeMillis(); | |
} | |
public void submit(Runnable task, long delayMs) { | |
long ts = delayMs + TS(); | |
PqItem item = new PqItem(ts, task); | |
System.out.println("....Pending for " + ts); | |
tasks.add(item); | |
} | |
public void start() { | |
daemon.start(); | |
} | |
private void daemonTask() { | |
System.out.println("....Daemon started"); | |
while (true) { | |
try { | |
long ts = TS(); | |
System.out.println("....Daemon checking " + ts); | |
checkTasks(); | |
Thread.sleep(period); | |
} catch (InterruptedException e) { | |
System.out.println("....Daemon stopped"); | |
return; | |
} | |
} | |
} | |
public void shutDown() { | |
executor.shutdown(); | |
daemon.interrupt(); | |
} | |
private void checkTasks() { | |
if (tasks.isEmpty()) return; | |
PqItem pqItem = tasks.peek(); | |
while (pqItem != null && TS() >= pqItem.ts) { | |
pqItem = tasks.poll(); | |
System.out.println("....Running Item " + pqItem.ts); | |
runTask(pqItem.action); | |
pqItem = tasks.peek(); | |
} | |
} | |
private void checkTasksRecursive() { | |
PqItem pqItem = tasks.poll(); | |
if (pqItem == null) return; | |
if (TS() >= pqItem.ts) { | |
System.out.println("....Running Item " + pqItem.ts); | |
runTask(pqItem.action); | |
checkTasksRecursive(); | |
} else { | |
tasks.offer(pqItem); | |
} | |
} | |
private void runTask(Runnable action) { | |
executor.submit(action); | |
} | |
public static void main(String[] args) throws InterruptedException { | |
TaskScheduler scheduler = new TaskScheduler(500, 10); | |
scheduler.submit(() -> System.out.println("hi2"), 3_000L); | |
scheduler.submit(() -> System.out.println("hi1"), 1_000L); | |
scheduler.submit(() -> System.out.println("bye1"), 2_000L); | |
scheduler.submit(() -> System.out.println("bye2"), 2_000L); | |
scheduler.start(); | |
Thread.sleep(5_000L); | |
scheduler.shutDown(); | |
Thread.sleep(1_000L); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment