Skip to content

Instantly share code, notes, and snippets.

@mindscratch
Forked from MustafaHaddara/DebouncedRunnable.java
Last active July 16, 2021 14:40
Show Gist options
  • Save mindscratch/b2661c89acf06d504e091e18ac2d3396 to your computer and use it in GitHub Desktop.
Save mindscratch/b2661c89acf06d504e091e18ac2d3396 to your computer and use it in GitHub Desktop.
Java Debouncer
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* DebouncedRunnable provides a "debounced" version of a given Runnable.
*/
public class DebouncedRunnable implements Runnable, Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(DebouncedRunnable.class);
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final Runnable operation;
private final String name;
private final long delayMillis;
private final Strategy strategy;
// state
private long lastRunTime = -1;
private boolean isQueued = false;
private ScheduledFuture<?> task = null;
/**
* Creates a thread-safe "debounced" version of the given Runnable. This means that when the client calls `run()`
* it will perform the `strategy` that has been configured.
* <p>
* Note that `Runnable` accepts no params, meaning each invocation should be interchangeable
* If we want to extend this mechanism to support calls with args, we will need to decide which params get used
* when we end up invoking the Runnable (the first set? the last?)
*/
public DebouncedRunnable(Runnable operation, String name, long delayMillis) {
this(operation, name, delayMillis, Strategy.RUN_NOW_OR_QUEUE_IF_EMPTY);
}
public DebouncedRunnable(Runnable operation, String name, long delayMillis, Strategy strategy) {
this.operation = operation;
this.name = name;
this.delayMillis = delayMillis;
this.strategy = strategy;
}
@Override
public void close() throws IOException {
scheduler.shutdown();
}
public synchronized void run() {
switch (strategy) {
case DELAYED_RUN:
delayedRun();
break;
case RUN_NOW_OR_QUEUE_MOST_RECENT:
runNowOrQueueMostRecent();
break;
case RUN_NOW_OR_QUEUE_IF_EMPTY:
runNowOrQueueIfEmpty();
break;
default:
LOGGER.warn("unsupported strategy {} for {}", strategy, name);
}
}
/**
* package-private for unit testing purposes
*/
void schedule(Runnable call, long delayMillis) {
this.task = scheduler.schedule(call, delayMillis, TimeUnit.MILLISECONDS);
}
/**
* package-private for unit testing purposes
*/
long getCurrentTimeMillis() {
return System.currentTimeMillis();
}
/**
* package-private for unit testing purposes
*/
synchronized void cancelQueuedTask() {
if (this.task != null) {
this.task.cancel(false);
this.task = null;
}
}
/**
* Performs a task after waiting the `delayMillis` regardless if another task is queued already or not.
* <p>
* - if task is already queued, then cancel it
* - schedule the new task to run in `delayMillis`
* </p>
*/
private void delayedRun() {
if (isQueued) {
// we've already got a call queued, cancel it and add the new one
LOGGER.warn("cancelling previously queued {} ", name);
cancelQueuedTask();
}
// queue this up in to be run `delayMillis` milliseconds, and any incoming calls will get ignored
LOGGER.warn("queueing {} to be called in {} ms", name, delayMillis);
isQueued = true;
schedule(this::scheduledRun, delayMillis);
}
/**
* Performs a task now or queues it if one is already running, cancelling any other task that is already queued
* <p>
* - if it hasn't been called within the past `delayMillis` ms (and there isn't a call queued),
* then the wrapped Runnable gets called immediately
* - if there IS a recent call, then we check to see if one is queued
* - if there is a call queued, we cancel that call and queue up the current call
* - if there is not, we queue up the current call to be called after `delayMillis` ms pass
* </p>
*/
private void runNowOrQueueMostRecent() {
long currentTime = getCurrentTimeMillis();
if (isQueued) {
// we've already got a call queued, cancel it and add this one
LOGGER.warn("cancelling previously queued {} ", name);
cancelQueuedTask();
// queue this up in to be run `delayMillis` milliseconds, to ensure the most recent request does not
// get ignored
LOGGER.warn("queueing {} to be called in {} ms", name, delayMillis);
isQueued = true;
schedule(this::scheduledRun, delayMillis);
} else if (shouldRunNow(currentTime)) {
// we've never called this before, call it now
lastRunTime = currentTime;
LOGGER.warn("calling {} immediately", name);
operation.run();
} else {
// we've called it recently, which suggests that we might have more of these incoming
// queue this up in to be run `delayMillis` milliseconds, and any incoming calls will get ignored
LOGGER.warn("queueing {} to be called in {} ms", name, delayMillis);
isQueued = true;
schedule(this::scheduledRun, delayMillis);
}
}
/**
* Performs a task now or queues it if the queue is empty, if the queue is not empty then drop the new task.
* <p>
* - if it hasn't been called within the past `delayMillis` ms (and there isn't a call queued),
* then the wrapped Runnable gets called immediately
* - if there IS a recent call, then we check to see if one is queued
* - if there is a call queued, we drop the current call
* - if there is not, we queue up the current call to be called after `delayMillis` ms pass
* <p>
*/
private void runNowOrQueueIfEmpty() {
long currentTime = getCurrentTimeMillis();
if (isQueued) {
// we've already got a call queued, ignore this current one
LOGGER.debug("dropping {} because it is already queued", name);
} else if (shouldRunNow(currentTime)) {
// we've never called this before, call it now
lastRunTime = currentTime;
LOGGER.debug("calling {} immediately", name);
operation.run();
} else {
// we've called it recently, which suggests that we might have more of these incoming
// queue this up in to be run `delayMillis` milliseconds, and any incoming calls will get ignored
LOGGER.debug("queueing {} to be called in {} ms", name, delayMillis);
isQueued = true;
schedule(this::scheduledRun, delayMillis);
}
}
private synchronized void scheduledRun() {
LOGGER.warn("calling queued task {} after waiting {} ms", name, delayMillis);
lastRunTime = getCurrentTimeMillis();
isQueued = false;
operation.run();
}
/**
* Should run now if we've never run it before or we've run it more than `delayMillis` ms in the past
*/
private boolean shouldRunNow(long currentTime) {
return lastRunTime == -1 || lastRunTime + delayMillis < currentTime;
}
/**
* Strategy defines different behavior that can be performed by the debouncer.
*/
public enum Strategy {
DELAYED_RUN,
RUN_NOW_OR_QUEUE_MOST_RECENT,
RUN_NOW_OR_QUEUE_IF_EMPTY;
}
}
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
public class DebouncedRunnableTest {
private final AtomicInteger callCount = new AtomicInteger(0);
private final DebouncedRunnable debouncedIncrement = Mockito.spy(new DebouncedRunnable(callCount::incrementAndGet, "mock", 10));
private final List<Runnable> queued = new ArrayList<>();
@Before
public void setup() {
// capture all of the Runnables that would get queued
doAnswer(invocation -> {
queued.add(invocation.getArgument(0));
return null;
}).when(debouncedIncrement).schedule(any(), anyLong());
}
@Test
public void testDebounceQueueing() {
// set current time
setCurrentTime(0);
debouncedIncrement.run(); // time = 0, this is the first, it should be called right away
debouncedIncrement.run(); // time = 0, should be queued
// expect only one call + one queued
assertEquals(1, callCount.get());
assertEquals(1, queued.size());
// advance time, call the queued Runnable, verify it did what it's supposed to
setCurrentTime(15);
queued.remove(0).run();
assertEquals(2, callCount.get());
debouncedIncrement.run(); // time = 15, last call time was t=10, this one should get queued
// it was queued
assertEquals(2, callCount.get());
assertEquals(1, queued.size());
}
@Test
public void testDebounceBigDelay() {
// init time
setCurrentTime(0);
debouncedIncrement.run(); // time = 0, this is the first, it should be called right away
// advance time far
setCurrentTime(50);
debouncedIncrement.run(); // time = 50, last call time was t=0, this one should get called right away
// it was called right away, not queued
assertEquals(2, callCount.get());
assertEquals(0, queued.size());
}
@Test
public void testDebounceDrop() {
// set current time
setCurrentTime(0);
debouncedIncrement.run(); // time = 0, this is the first, it should be called right away
debouncedIncrement.run(); // time = 0, should be queued
debouncedIncrement.run(); // time = 0, should be dropped!
// expect only one call + one queued
assertEquals(1, callCount.get());
assertEquals(1, queued.size());
}
@Test
public void testDebounceDelayedDrop() {
// set current time
setCurrentTime(0);
debouncedIncrement.run(); // time = 0, this is the first, it should be called right away
debouncedIncrement.run(); // time = 0, should be queued
// expect only one call + one queued
assertEquals(1, callCount.get());
assertEquals(1, queued.size());
// advance time, but DO NOT call the queued Runnable, verify it did what it's supposed to
setCurrentTime(11);
debouncedIncrement.run(); // time = 11, last call time was t=0, but we have a call already queued, this one should get ignored
// expect only one call + one queued
assertEquals(1, callCount.get());
assertEquals(1, queued.size());
}
private void setCurrentTime(long currentTime) {
doReturn(currentTime).when(debouncedIncrement).getCurrentTimeMillis();
}
}
private static final Logger LOGGER = LoggerFactory.getLogger(DebouncedRunnable.class);
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final Runnable operation;
private final String name;
private final long delayMillis;
// state
private long lastRunTime = -1;
private boolean isQueued = false;
public DebouncedRunnable(Runnable operation, String name, long delayMillis) {
this.operation = operation;
this.name = name;
this.delayMillis = delayMillis;
}
private final AtomicInteger callCount = new AtomicInteger(0);
private final DebouncedRunnable debouncedIncrement = Mockito.spy(new DebouncedRunnable(callCount::incrementAndGet, "mock", 10));
private final List<Runnable> queued = new ArrayList<>();
public class MyService {
private static final long VOLUNTEER_DELAY_MILLIS = 1000L;
private final Runnable DEBOUNCED_VOLUNTEER;
public MyService() {
this.DEBOUNCED_VOLUNTEER = new DebouncedRunnable(
this::volunteer_yesIKnowWhatImDoing,
"VOLUNTEER",
VOLUNTEER_DELAY_MILLIS
);
}
public void volunteer() {
this.DEBOUNCED_VOLUNTEER.run();
}
@Deprecated // DO NOT CALL THIS YOURSELF! YOU ALMOST CERTAINLY WANT `volunteer()`
private void volunteer_yesIKnowWhatImDoing() {
// same API request as volunteer() above
}
}
public class MyService {
public void volunteer() {
// make API request
}
}
private boolean shouldRunNow(long currentTime) {
return lastRunTime == -1 || lastRunTime + delayMillis < currentTime;
}
void schedule(Runnable call, long delayMillis) {
scheduler.schedule(call, delayMillis, TimeUnit.MILLISECONDS);
}
long getCurrentTimeMillis() {
return System.currentTimeMillis();
}
public synchronized void run() {
long currentTime = getCurrentTimeMillis();
if (isQueued) {
// we've already got a call queued, ignore this current one
LOGGER.debug("dropping {} because it is already queued", name);
} else if (shouldRunNow(currentTime)) {
// we've never called this before, call it now
lastRunTime = currentTime;
LOGGER.debug("calling {} immediately", name);
operation.run();
} else {
// we've called it recently, which suggests that we might have more of these incoming
// queue this up in to be run `delayMillis` milliseconds, and any incoming calls will get ignored
LOGGER.debug("queueing {} to be called in {} ms", name, delayMillis);
isQueued = true;
schedule(this::scheduledRun, delayMillis);
}
}
private synchronized void scheduledRun() {
LOGGER.debug("calling queued task {} after waiting {} ms", name, delayMillis);
lastRunTime = getCurrentTimeMillis();
isQueued = false;
operation.run();
}
private void setCurrentTime(long currentTime) {
doReturn(currentTime).when(debouncedIncrement).getCurrentTimeMillis();
}
@Before
public void setup() {
// capture all of the Runnables that would get queued
doAnswer(invocation -> {
queued.add(invocation.getArgument(0));
return null;
}).when(debouncedIncrement).schedule(any(), anyLong());
}
@Test
public void testDebounceDelayedDrop() {
// set current time
setCurrentTime(0);
debouncedIncrement.run(); // time = 0, this is the first, it should be called right away
debouncedIncrement.run(); // time = 0, should be queued
// expect only one call + one queued
assertEquals(1, callCount.get());
assertEquals(1, queued.size());
// advance time, but DO NOT call the queued Runnable, verify it did what it's supposed to
setCurrentTime(11);
debouncedIncrement.run(); // time = 11, last call time was t=0, but we have a call already queued, this one should get ignored
// expect only one call + one queued
assertEquals(1, callCount.get());
assertEquals(1, queued.size());
}
@Test
public void testDebounceQueueing() {
// set current time
setCurrentTime(0);
debouncedIncrement.run(); // time = 0, this is the first, it should be called right away
debouncedIncrement.run(); // time = 0, should be queued
// expect only one call + one queued
assertEquals(1, callCount.get());
assertEquals(1, queued.size());
// advance time, call the queued Runnable, verify it did what it's supposed to
setCurrentTime(15);
queued.remove(0).run();
assertEquals(2, callCount.get());
debouncedIncrement.run(); // time = 15, last call time was t=15, this one should get queued
// it was queued
assertEquals(2, callCount.get());
assertEquals(1, queued.size());
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment