Last active
June 28, 2017 18:31
-
-
Save n1chre/efc8da727a3ca623821ae4d470f82a7c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package hrucc; | |
import java.util.LinkedList; | |
import java.util.Queue; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.function.Consumer; | |
/** | |
* This class should be used when some object is producing stuff for some consumer and consume() is | |
* much slower than produce(). That's why elements are queued and processed when they can be processed. | |
* | |
* To use this class, simply wrap your Consumer into this class. | |
* | |
* !!! TODO this class will leave a thread waiting if null isn't consumed after consuming relevant objects. | |
* Or call finishConsuming() when done. | |
* | |
* Created by fhrenic on 28/06/2017. | |
*/ | |
public class ProxyConsumer<T> implements Consumer<T> { | |
public static void main(String[] args) throws InterruptedException { | |
ProxyConsumer<Integer> proxy = new ProxyConsumer<>(number -> { | |
try { | |
TimeUnit.SECONDS.sleep(1); | |
} catch (InterruptedException e) { | |
System.out.println("can't sleep"); | |
return; | |
} | |
System.out.println("got number " + number); | |
}); | |
for (int i = 0; i < 10; i++) { | |
proxy.accept(i); | |
} | |
TimeUnit.SECONDS.sleep(20); // wait here | |
proxy.accept(420); | |
TimeUnit.SECONDS.sleep(5); | |
proxy.accept(null); | |
} | |
private final Consumer<T> consumer; | |
private final Queue<T> elements; | |
private AtomicBoolean noMoreElements; | |
public ProxyConsumer(Consumer<T> consumer) { | |
this.consumer = consumer; | |
elements = new LinkedList<>(); | |
noMoreElements = new AtomicBoolean(false); | |
new Thread(this::startProcessing).start(); | |
} | |
public synchronized void accept(T element) { | |
if (element != null) { | |
add(element); | |
} else { | |
noMoreElements.set(true); | |
} | |
notify(); | |
} | |
public void finishConsuming() { | |
noMoreElements.set(true); | |
} | |
private synchronized void startProcessing() { | |
while (true) { | |
while (hasElements()) { | |
consumer.accept(take()); | |
} | |
if (noMoreElements.get()) { | |
return; // notify but no elements added | |
} | |
try { | |
wait(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
return; | |
} | |
} | |
} | |
// queue operations | |
private synchronized void add(T element) { | |
elements.add(element); | |
} | |
private synchronized boolean hasElements() { | |
return !elements.isEmpty(); | |
} | |
private synchronized T take() { | |
return elements.poll(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment