-
-
Save mkristian/242836 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.abhinavsarkar.util; | |
import java.util.Iterator; | |
import java.util.concurrent.BrokenBarrierException; | |
import java.util.concurrent.CyclicBarrier; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.SynchronousQueue; | |
import java.util.concurrent.ThreadFactory; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.TimeoutException; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import com.google.common.base.Preconditions; | |
import com.google.common.collect.AbstractIterator; | |
/** | |
* Provides a Python-style Generator in Java. Subclass this class and | |
* implement the run method in subclass. Use {@link Generator#yield(Object)} to | |
* return a value and {@link Generator#yield()} to receive a value. | |
* This class uses Preconditions and AbstractIterator classes from the Google | |
* Collections library. | |
* | |
* @author Abhinav Sarkar | |
*/ | |
public abstract class Generator<T> implements Iterable<T> { | |
private final int generatorNormalExitTimeout; | |
private final int generatorAbnormalExitTimeout; | |
private final SynchronousQueue<T> queue = new SynchronousQueue<T>(); | |
private final ExecutorService executorService = | |
Executors.newSingleThreadExecutor(new DaemonThreadFactory()); | |
private volatile Throwable throwable = null; | |
private final AtomicBoolean started = new AtomicBoolean(false); | |
private final CyclicBarrier barrier = new CyclicBarrier(2); | |
private final Iterator<T> iterator = new AbstractIterator<T>() { | |
@Override | |
protected T computeNext() { | |
T next = Generator.this.get(); | |
return next != null ? next : endOfData(); | |
}}; | |
/** | |
* Creates a Generator with defaults. | |
*/ | |
public Generator() { | |
this(1, 2); | |
} | |
/** | |
* Creates a Generator. | |
* | |
* @param generatorNormalExitTimeout Timeout in seconds for the generator | |
* thread when the generator exits normally, either by finishing the run | |
* or by throwing a {@link GeneratorExit} exception. | |
* @param generatorAbnormalExitTimeout Timeout in seconds for the generator | |
* thread when the generator exits abnormally by throwing any exception | |
* other than {@link GeneratorExit} exception. | |
*/ | |
public Generator(int generatorNormalExitTimeout, | |
int generatorAbnormalExitTimeout) { | |
this.generatorNormalExitTimeout = generatorNormalExitTimeout; | |
this.generatorAbnormalExitTimeout = generatorAbnormalExitTimeout; | |
} | |
/* (non-Javadoc) | |
* @see java.lang.Iterable#iterator() | |
*/ | |
public final Iterator<T> iterator() { | |
return iterator; | |
} | |
/** | |
* Gets a value from the generator. The generator must call | |
* {@link Generator#yield(Object)} to return this value inside the | |
* {@link Generator#run()} method. | |
* | |
* @return The value yielded by the generator. | |
*/ | |
public final T get() { | |
try { | |
return next(); | |
} catch (Throwable e) { | |
executorService.shutdownNow(); | |
if (!(e instanceof GeneratorExit)) { | |
throw new RuntimeException(e); | |
} else { | |
return null; | |
} | |
} | |
} | |
/** | |
* Sends a value to the generator. The generator must call | |
* {@link Generator#yield()} to receive this value inside the | |
* {@link Generator#run()} method. | |
* | |
* @param sent The value sent to the generator. | |
*/ | |
public void send(T sent) { | |
try { | |
next(Preconditions.checkNotNull(sent)); | |
} catch (Throwable e) { | |
executorService.shutdownNow(); | |
if (e instanceof GeneratorExit) { | |
return; | |
} else { | |
throw new RuntimeException(e); | |
} | |
} | |
} | |
/** | |
* Stops the generator and frees up the background thread. | |
*/ | |
public final void stop() { | |
executorService.shutdownNow(); | |
} | |
/** | |
* Call this method inside {@link Generator#run()} method to yield a value. | |
* | |
* @param result Value to be yielded. | |
* @throws InterruptedException | |
* @throws BrokenBarrierException | |
*/ | |
protected final void yield(T result) | |
throws InterruptedException, BrokenBarrierException { | |
barrier.await(); | |
queue.put(result); | |
} | |
/** | |
* Call this method inside {@link Generator#run()} method to receive a value. | |
* | |
* @return The value sent to generator using | |
* {@link Generator#send(Object)} method. | |
* @throws InterruptedException | |
* @throws BrokenBarrierException | |
*/ | |
protected final T yield() | |
throws InterruptedException, BrokenBarrierException { | |
barrier.await(); | |
return queue.take(); | |
} | |
private T next() throws Throwable { | |
initialize(); | |
return queue.take(); | |
} | |
private void next(T sent) throws Throwable { | |
initialize(); | |
queue.put(sent); | |
} | |
private void initialize() throws Throwable { | |
if (executorService.isShutdown()) { | |
if (throwable != null) { | |
throw new IllegalStateException( | |
"Generator has exited", throwable); | |
} else { | |
throw new IllegalStateException("Generator has exited"); | |
} | |
} | |
if (!started.get()) { | |
executorService.execute(new Runnable() { | |
public void run() { | |
try { | |
Generator.this.run(); | |
throw new GeneratorExit(); | |
} catch (Exception e) { | |
Generator.this.throwable = e; | |
try { | |
if (e instanceof GeneratorExit) { | |
barrier.await(generatorNormalExitTimeout, | |
TimeUnit.SECONDS); | |
} else { | |
barrier.await(generatorAbnormalExitTimeout, | |
TimeUnit.SECONDS); | |
} | |
} catch (InterruptedException e1) { | |
return; | |
} catch (BrokenBarrierException e1) { | |
return; | |
} catch (TimeoutException e1) { | |
executorService.shutdownNow(); | |
} | |
} | |
}}); | |
started.set(true); | |
} | |
barrier.await(); | |
if (throwable != null) { | |
throw throwable; | |
} | |
} | |
/** | |
* Implement this method inside the subclass to provide the logic for the | |
* generator. | |
* | |
* @throws Exception | |
*/ | |
protected abstract void run() throws Exception; | |
private static final class DaemonThreadFactory | |
implements ThreadFactory { | |
private final ThreadFactory defaultThreadFactory = | |
Executors.defaultThreadFactory(); | |
public Thread newThread(Runnable r) { | |
Thread thread = defaultThreadFactory.newThread(r); | |
thread.setDaemon(true); | |
return thread; | |
} | |
} | |
/** | |
* The Exception thrown to signal the exit of the generator. | |
* | |
* @author Abhinav Sarkar | |
*/ | |
public static class GeneratorExit extends RuntimeException { | |
private static final long serialVersionUID = 1L; | |
} | |
} | |
/** | |
* A (infinite) Fibonacci number generator. | |
* | |
* @author Abhinav Sarkar | |
*/ | |
class Fibonacci extends Generator<Integer> { | |
/** | |
* Override the default constructor to set the timeout parameters. | |
*/ | |
public Fibonacci() { | |
super(1,5); | |
} | |
@Override | |
protected void run() throws Exception { | |
yield(0); | |
int i = 0; | |
int j = 1; | |
while (true) { | |
yield(j); | |
int current = i + j; | |
i = j; | |
j = current; | |
} | |
} | |
} | |
/** | |
* A (infinite) generator which prints the received values to {@link System#out} | |
* stream. | |
* | |
* @author Abhinav Sarkar | |
*/ | |
class Printer extends Generator<Object> { | |
@Override | |
protected void run() throws Exception { | |
while (true) { | |
System.out.println(yield()); | |
} | |
} | |
} | |
/** | |
* A (infinite) generator which returns back the sent value. | |
* | |
* @author Abhinav Sarkar | |
*/ | |
class Identity extends Generator<Object> { | |
@Override | |
protected void run() throws Exception { | |
while (true) | |
yield(yield()); | |
} | |
} | |
/** | |
* A (non-infinite) generator which generates the numbers from 0 to 4. | |
* | |
* @author Abhinav Sarkar | |
*/ | |
class FiveNumberGenerator extends Generator<Integer> { | |
@Override | |
protected void run() throws Exception { | |
for (int i = 0; i < 5; i++) { | |
yield(i); | |
} | |
}; | |
} | |
class Main { | |
public static void main(String[] args) { | |
// Use get method to get the values yielded by the generator. | |
Fibonacci fibonacci = new Fibonacci(); | |
for (int i = 0; i < 5; i++) { | |
System.out.println(fibonacci.get()); | |
} | |
fibonacci.stop(); | |
// Generator which generate values (and not only consume values) | |
// can be iterated directly. | |
Fibonacci fibonacciIter = new Fibonacci(); | |
int j = 0; | |
for (Integer integer : fibonacciIter) { | |
System.out.println(integer); | |
if (j++ > 10) break; | |
} | |
fibonacciIter.stop(); | |
// Use send method to send the value to the generator. | |
Printer printer = new Printer(); | |
for (int i = 1; i <= 5; i++) { | |
printer.send(i); | |
} | |
printer.stop(); | |
// Use both send and get methods together. Their counterparts (yield() | |
// and yield(Object) must be called in the same order as these are | |
// called here. Otherwise it will result in a deadlock. | |
Identity identity = new Identity(); | |
for (int i = 0; i < 5; i++) { | |
identity.send(i); | |
System.out.println(identity.get()); | |
} | |
identity.stop(); | |
// Non infinite generators do not need to be stopped at the end of | |
// iteration. They stop automatically. | |
FiveNumberGenerator fiveNumberGenerator = new FiveNumberGenerator(); | |
for (Integer integer : fiveNumberGenerator) { | |
System.out.println(integer); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment