Skip to content

Instantly share code, notes, and snippets.

@chefhoobajoob
Last active October 27, 2015 21:58
Show Gist options
  • Save chefhoobajoob/eb473f26f7ad342be28f to your computer and use it in GitHub Desktop.
Save chefhoobajoob/eb473f26f7ad342be28f to your computer and use it in GitHub Desktop.
This gist provides a JUnit test showing that retryWhen() does not retry its immediate parent Observable, but instead retries the root observable in the chain.
package ot.publishing.vertx.service.publisher.tests;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
import org.junit.Test;
import rx.Observable;
public class RetryWhenRetriesRootObservable {
@Test
public void retryWhenUsesRoot() {
String[] rootResult = { "root-stuff" };
String[] childResult = { "child-stuff" };
final class Counter {
private int _value = 0;
public int next() { return ++_value; }
public int current() { return _value; }
}
Counter rootUsageCounter = new Counter();
Counter childUsageCounter = new Counter();
Counter childFailureCounter = new Counter();
Observable.defer( () -> {
rootUsageCounter.next();
System.out.println("Creating a root observable");
return Observable.from(rootResult);
})
.retry()
.flatMap( result -> {
if ( result.equals("root-stuff") ) {
return Observable.defer( () -> {
childUsageCounter.next();
System.out.println("Creating a child observable");
if ( childFailureCounter.next() < 3 ) {
System.out.println("Child will issue a recoverable failure");
return Observable.error( new RuntimeException() );
}
else {
System.out.println("Child will issue a result");
return Observable.from(childResult);
}
});
}
else {
System.out.println("Child will issue an unrecoverable failure");
return Observable.error(new IllegalStateException());
}
})
.retryWhen( attempts -> attempts.flatMap( error -> {
if ( error instanceof RuntimeException ) {
System.out.println("retryWhen() returning a timer observable");
return Observable.timer(1, TimeUnit.SECONDS);
}
else {
System.out.println("retryWhen() returning an error observable");
return Observable.error(error);
}
}))
.toBlocking()
.forEach( result -> System.out.println("Got a result: " + result));
// If the retryWhen() operator resubscribes to its parent Observable,
// the root observable will only be subscribed-to once, and this test
// will fail. If retryWhen() resubscribes to the root Observable, then
// this test will succeed.
assertTrue( rootUsageCounter.current() > 1 );
}
}
@chefhoobajoob
Copy link
Author

Standard output for this test is:

Creating a root observable
Creating a child observable
Child will issue a recoverable failure
retryWhen() returning a timer observable <-- retryWhen() invoked
Creating a root observable <-- retryWhen() re-subscribes to the root and not the child
Creating a child observable
Child will issue a recoverable failure
retryWhen() returning a timer observable
Creating a root observable
Creating a child observable
Child will issue a result
Got a result: child-stuff

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment