Last active
August 29, 2015 14:10
-
-
Save vbezhenar/cc49928995c03c1289bb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.util.ArrayDeque; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.Queue; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.TimeUnit; | |
import java.util.function.Consumer; | |
/** | |
* This class provides interface similar to ScheduledExecutorService. An important difference is | |
* that each submitted command has an identifier. GroupingScheduler guarantees that commands with | |
* the same identifier will be executed sequentially in correct order. Commands with different identifiers | |
* will be executed in parallel. | |
* | |
* schedule/submit methods returns Cancellable so user is able to cancel submitted task. Task can be cancelled only | |
* before it started to execute. Cancelling task which is executing is not supported. | |
* | |
* @param <TId> type of the identifier. Must provide correct equals/hashCode methods. | |
*/ | |
public class GroupingScheduler<TId> { | |
private static final Logger logger = LoggerFactory.getLogger(GroupingScheduler.class); | |
public interface Cancellable { | |
void cancel(); | |
} | |
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); | |
private final ExecutorService executor = Executors.newCachedThreadPool(); | |
private final ConcurrentMapQueue<TId, CancellableTask> tasks = new ConcurrentMapQueue<>(); | |
public Cancellable schedule(TId id, Runnable command, long delay, TimeUnit unit) { | |
CancellableTask task = new CancellableTask(id, command); | |
SchedulerRunnable schedulerRunnable = new SchedulerRunnable(task); | |
scheduler.schedule(schedulerRunnable, delay, unit); | |
return task; | |
} | |
public Cancellable submit(TId id, Runnable command) { | |
CancellableTask task = new CancellableTask(id, command); | |
tasks.putAndIfWasEmpty(id, task, () -> GroupingScheduler.this.executor.execute(task)); | |
return task; | |
} | |
public void shutdown() { | |
scheduler.shutdown(); | |
executor.shutdown(); | |
} | |
private class SchedulerRunnable implements Runnable { | |
private final CancellableTask task; | |
private SchedulerRunnable(CancellableTask task) { | |
this.task = task; | |
} | |
@Override | |
public void run() { | |
tasks.putAndIfWasEmpty(task.id, task, () -> GroupingScheduler.this.executor.execute(task)); | |
} | |
} | |
private class CancellableTask implements Cancellable, Runnable { | |
private final TId id; | |
private final Runnable command; | |
private volatile boolean cancelled = false; | |
private CancellableTask(TId id, Runnable command) { | |
this.id = id; | |
this.command = command; | |
} | |
@Override | |
public void cancel() { | |
cancelled = true; | |
} | |
@Override | |
public void run() { | |
if (!cancelled) { | |
try { | |
command.run(); | |
} catch (Exception e) { | |
GroupingScheduler.logger.warn(e.getMessage(), e); | |
} | |
} | |
GroupingScheduler.this.tasks.removeAndTryPeek(id, GroupingScheduler.this.executor::execute); | |
} | |
} | |
private static class ConcurrentMapQueue<K, V> { | |
private final Map<K, Queue<V>> map = new HashMap<>(); | |
public void removeAndTryPeek(K key, Consumer<V> consumer) { | |
synchronized (map) { | |
Queue<V> values = map.get(key); | |
if (values == null) { | |
throw new IllegalStateException("No values for key " + key); | |
} | |
values.remove(); | |
if (values.isEmpty()) { | |
map.remove(key); | |
} else { | |
consumer.accept(values.element()); | |
} | |
} | |
} | |
public void putAndIfWasEmpty(K key, V value, Runnable commandIfEmpty) { | |
synchronized (map) { | |
boolean wasEmpty; | |
Queue<V> values = map.get(key); | |
if (values == null) { | |
values = new ArrayDeque<>(); | |
map.put(key, values); | |
wasEmpty = true; | |
} else { | |
wasEmpty = false; | |
} | |
values.add(value); | |
if (wasEmpty) { | |
commandIfEmpty.run(); | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment