Last active
September 1, 2022 16:33
-
-
Save samhendley/6030565 to your computer and use it in GitHub Desktop.
FutureWatcher utility class as alternative Observable.from(future)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.sensus.util.threading.DelayedWorkExecutor; | |
import rx.Observable; | |
import rx.Observer; | |
import rx.Subscription; | |
import rx.subscriptions.BooleanSubscription; | |
import rx.util.functions.Func1; | |
import java.util.List; | |
import java.util.concurrent.CopyOnWriteArrayList; | |
import java.util.concurrent.Future; | |
/** | |
* The default Observerable.from(Future) implementation effectively makes the first subscriber block until | |
* the future is ready. This alternative implementation uses more threads to make this truly asynchronous | |
* and allow outside code to really be async without any accidental blocking. | |
*/ | |
public class FutureWatcher { | |
private final Thread watcher; | |
private final List<ObservablePumper<?>> pendingFutures = new CopyOnWriteArrayList<ObservablePumper<?>>(); | |
public FutureWatcher() { | |
watcher = new Thread(new ThreadWatcher()); | |
watcher.setDaemon(true); | |
watcher.start(); | |
} | |
private class ThreadWatcher implements Runnable { | |
@Override | |
public void run() { | |
while (true) { | |
try { | |
for (ObservablePumper<?> f : pendingFutures) { | |
if (f.isDone()) { | |
pendingFutures.remove(f); | |
f.fire(); | |
} | |
} | |
Thread.sleep(10); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
} | |
public <T> Observable<T> from(Future<T> future, DelayedWorkExecutor executor) { | |
ObservablePumper<T> pumper = new ObservablePumper<T>(future, executor); | |
Observable<T> observable = Observable.create(pumper); | |
pendingFutures.add(pumper); | |
return observable; | |
} | |
private static class ObservablePumper<T> implements Runnable, Func1<Observer<T>, Subscription> { | |
private final Future<T> future; | |
private final DelayedWorkExecutor executor; | |
private Observer<T> observer; | |
private ObservablePumper(Future<T> future, DelayedWorkExecutor executor) { | |
this.future = future; | |
this.executor = executor; | |
} | |
public boolean isDone() { | |
return observer != null && future.isDone(); | |
} | |
public void fire() { | |
assert observer != null; | |
assert future.isDone(); | |
executor.submit(this); | |
} | |
@Override | |
public synchronized Subscription call(Observer<T> tObservable) { | |
this.observer = tObservable; | |
return new BooleanSubscription(); | |
} | |
@Override | |
public synchronized void run() { | |
assert observer != null; | |
assert future.isDone(); | |
try { | |
observer.onNext(future.get()); | |
observer.onCompleted(); | |
} catch (Exception e) { | |
observer.onError(e); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment