Skip to content

Instantly share code, notes, and snippets.

@Samehadar
Last active August 5, 2020 07:44
Show Gist options
  • Save Samehadar/dd181f3fab7784604584687ddc90887c to your computer and use it in GitHub Desktop.
Save Samehadar/dd181f3fab7784604584687ddc90887c to your computer and use it in GitHub Desktop.
package xxx;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
@Slf4j
public class SetBlockingQueue<T> extends LinkedBlockingQueue<T> {
private final Set<T> set = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final ReentrantLock addLock = new ReentrantLock();
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private final ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(
() -> log.info("Queue size is " + set.size()),
5L,
1L,
TimeUnit.SECONDS);
@Override
public boolean add(T t) {
ReentrantLock lock = this.addLock;
lock.lock();
try {
if (set.contains(t)) {
return false;
} else {
set.add(t);
return super.add(t);
}
} finally {
lock.unlock();
}
}
@Override
@SneakyThrows
public T take() {
T t = super.take();
set.remove(t);
return t;
}
public boolean isEmpty() {
return set.isEmpty() && super.isEmpty();
}
@SneakyThrows
public synchronized void clear() {
super.clear();
set.clear();
scheduledFuture.cancel(true);
}
public int size() {
return set.size();
}
public void forEach(Consumer<? super T> action) {
super.forEach(action);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment