Last active
February 16, 2016 09:56
-
-
Save chbatey/0a0d6f5ad34a548abb04 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package info.batey.examples.observables; | |
import com.google.common.util.concurrent.ListenableFuture; | |
import com.google.common.util.concurrent.ListeningExecutorService; | |
import com.google.common.util.concurrent.MoreExecutors; | |
import info.batey.examples.cf.CompletableFutureTest; | |
import org.junit.Test; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import rx.Observable; | |
import rx.observable.ListenableFutureObservable; | |
import rx.schedulers.Schedulers; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Future; | |
public class ObservablesTest { | |
private static Logger LOG = LoggerFactory.getLogger(CompletableFutureTest.class); | |
private static ExecutorService executor = Executors.newSingleThreadExecutor(); | |
private static ListeningExecutorService lExcutor = MoreExecutors.listeningDecorator(executor); | |
@Test | |
public void creation() throws Exception { | |
List<String> names = Arrays.asList("Chris", "Alex"); | |
Observable<String> obs = Observable.from(names); | |
obs.subscribe(s -> LOG.info(s)); | |
} | |
@Test | |
public void blocking() throws Exception { | |
Future<?> future = executor.submit((Runnable) () -> { | |
fiveSeconds(); | |
}); | |
LOG.info("Creating an Observable"); | |
Observable<Object> observable = Observable.from(future); | |
LOG.info("Created an Observable"); | |
observable.subscribe(o -> { | |
LOG.info("Observable has delivered"); | |
}); | |
LOG.info("Finished subscribing"); | |
observable.toBlocking().first(); | |
LOG.info("Observable done"); | |
} | |
@Test | |
public void notSoBlockingButIStillUseAThread() throws Exception { | |
Future<?> future = executor.submit(this::fiveSeconds); | |
LOG.info("Creating an Observable"); | |
Observable<Object> observable = Observable.from(future, Schedulers.io()); | |
LOG.info("Created an Observable"); | |
observable.subscribe(o -> { | |
LOG.info("Observable has delivered"); | |
}); | |
LOG.info("Finished subscribing"); | |
observable.toBlocking().first(); | |
LOG.info("Observable done"); | |
} | |
// @Test | |
// public void listeNotSoBlocking() throws Exception { | |
// ListenableFuture<?> future = lExcutor.submit(this::fiveSeconds); | |
// LOG.info("Creating an Observable"); | |
// Observable<?> observable = ListenableFutureObservable.from(future, executor); | |
// LOG.info("Created an Observable"); | |
// observable.subscribe(o -> { | |
// LOG.info("Observable has delivered"); | |
// }); | |
// LOG.info("Finished subscribing"); | |
// observable.toBlocking().first(); | |
// LOG.info("Observable done"); | |
// } | |
private void fiveSeconds() { | |
try { | |
Thread.sleep(5000); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
private void block(Observable<?> o) { | |
try { | |
o.toBlocking().toFuture().get(); | |
} catch (Exception e) { | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment