Skip to content

Instantly share code, notes, and snippets.

@samhendley
Last active September 1, 2022 16:33
Show Gist options
  • Save samhendley/6030565 to your computer and use it in GitHub Desktop.
Save samhendley/6030565 to your computer and use it in GitHub Desktop.
FutureWatcher utility class as alternative Observable.from(future)
import com.sensus.util.threading.DelayedWorkExecutor;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.BooleanSubscription;
import rx.util.functions.Func1;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
/**
* The default Observerable.from(Future) implementation effectively makes the first subscriber block until
* the future is ready. This alternative implementation uses more threads to make this truly asynchronous
* and allow outside code to really be async without any accidental blocking.
*/
public class FutureWatcher {
private final Thread watcher;
private final List<ObservablePumper<?>> pendingFutures = new CopyOnWriteArrayList<ObservablePumper<?>>();
public FutureWatcher() {
watcher = new Thread(new ThreadWatcher());
watcher.setDaemon(true);
watcher.start();
}
private class ThreadWatcher implements Runnable {
@Override
public void run() {
while (true) {
try {
for (ObservablePumper<?> f : pendingFutures) {
if (f.isDone()) {
pendingFutures.remove(f);
f.fire();
}
}
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public <T> Observable<T> from(Future<T> future, DelayedWorkExecutor executor) {
ObservablePumper<T> pumper = new ObservablePumper<T>(future, executor);
Observable<T> observable = Observable.create(pumper);
pendingFutures.add(pumper);
return observable;
}
private static class ObservablePumper<T> implements Runnable, Func1<Observer<T>, Subscription> {
private final Future<T> future;
private final DelayedWorkExecutor executor;
private Observer<T> observer;
private ObservablePumper(Future<T> future, DelayedWorkExecutor executor) {
this.future = future;
this.executor = executor;
}
public boolean isDone() {
return observer != null && future.isDone();
}
public void fire() {
assert observer != null;
assert future.isDone();
executor.submit(this);
}
@Override
public synchronized Subscription call(Observer<T> tObservable) {
this.observer = tObservable;
return new BooleanSubscription();
}
@Override
public synchronized void run() {
assert observer != null;
assert future.isDone();
try {
observer.onNext(future.get());
observer.onCompleted();
} catch (Exception e) {
observer.onError(e);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment