Created
December 4, 2014 12:17
-
-
Save vbezhenar/14a3009843a3f8e8fad2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.f5group.eauckz.util; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.util.*; | |
import java.util.concurrent.*; | |
public class GroupingScheduler<TId> { | |
private static final Logger logger = LoggerFactory.getLogger(GroupingScheduler.class); | |
public interface Cancellable { | |
void cancel(); | |
} | |
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); | |
private final ExecutorService executor = Executors.newCachedThreadPool(); | |
private final ConcurrentMapQueue<TId, CancellableTask> tasks = new ConcurrentMapQueue<>(); | |
public Cancellable schedule(TId id, Runnable command, long delay, TimeUnit unit) { | |
CancellableTask task = new CancellableTask(id, command); | |
SchedulerRunnable schedulerRunnable = new SchedulerRunnable(task); | |
scheduler.schedule(schedulerRunnable, delay, unit); | |
return task; | |
} | |
public void shutdown() { | |
scheduler.shutdown(); | |
executor.shutdown(); | |
} | |
private class SchedulerRunnable implements Runnable { | |
private final CancellableTask task; | |
private SchedulerRunnable(CancellableTask task) { | |
this.task = task; | |
} | |
@Override | |
public void run() { | |
tasks.putAndIfWasEmpty(task.id, task, () -> GroupingScheduler.this.executor.execute(task)); | |
} | |
} | |
private class CancellableTask implements Cancellable, Runnable { | |
private final TId id; | |
private final Runnable command; | |
private volatile boolean cancelled = false; | |
private CancellableTask(TId id, Runnable command) { | |
this.id = id; | |
this.command = command; | |
} | |
@Override | |
public void cancel() { | |
cancelled = true; | |
} | |
@Override | |
public void run() { | |
if (!cancelled) { | |
try { | |
command.run(); | |
} catch (Exception e) { | |
GroupingScheduler.logger.warn(e.getMessage(), e); | |
} | |
} | |
Optional<CancellableTask> nextTask = GroupingScheduler.this.tasks.removeAndPeek(id); | |
if (nextTask.isPresent()) { | |
GroupingScheduler.this.executor.execute(nextTask.get()); | |
} | |
} | |
} | |
private static class ConcurrentMapQueue<K, V> { | |
private final Map<K, Queue<V>> map = new HashMap<>(); | |
public Optional<V> removeAndPeek(K key) { | |
synchronized (map) { | |
Queue<V> values = map.get(key); | |
if (values == null) { | |
throw new IllegalStateException("No values for key " + key); | |
} | |
values.remove(); | |
if (values.isEmpty()) { | |
map.remove(key); | |
return Optional.empty(); | |
} | |
V value = values.element(); | |
return Optional.of(value); | |
} | |
} | |
public void putAndIfWasEmpty(K key, V value, Runnable commandIfEmpty) { | |
synchronized (map) { | |
boolean wasEmpty; | |
Queue<V> values = map.get(key); | |
if (values == null) { | |
values = new ArrayDeque<>(); | |
map.put(key, values); | |
wasEmpty = true; | |
} else { | |
wasEmpty = false; | |
} | |
values.add(value); | |
if (wasEmpty) { | |
commandIfEmpty.run(); | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment