Last active
August 29, 2015 14:03
-
-
Save aztecrex/267ab17b0a57a8ac1506 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package demo; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import rx.Observable; | |
import rx.Observable.OnSubscribe; | |
import rx.Subscriber; | |
import rx.functions.Func0; | |
import rx.util.async.Async; | |
/** | |
** <p> | |
* Demonstrate attempts to get RxJava retry for asynchronous work chain. The use | |
* case that exposed this problem is reading and writing data with versioning | |
* for optimistic concurrency. The work is a series of async I/O operations that | |
* must be re-assembled from scratch if a stale version is detected on write. | |
* </p> | |
* | |
* <p> | |
* Four cases are demonstrated in this class: | |
* </p> | |
* <ul> | |
* <li>Case 1: perform the work and naiively apply a retry operator to the | |
* asynchronous work. This fails because the work itself is not retried on | |
* re-subscribe.</li> | |
* <li>Case 2: wrap the work in an observer that performs it on every | |
* subscription. A retry operator applied to the wrapper correctly re-attempts | |
* the work on failure. However, every subsequent subscriber to the result | |
* causes the work to be performed again.</li> | |
* <li>Case 3: Apply the cache operator to the result of the retry operator. | |
* This performs as desired.</li> | |
* <li>Case 4: Generalize the approach of case 3 and encapsulate it in an | |
* observable generator method. This shows that it is difficult to generalize | |
* this behavior because each retry operator form (number, predicate, perpetual) | |
* will require its own generator method.</li> | |
* </ul> | |
* | |
* <p> | |
* NOTE: this code does not work if compiled by the Eclipse (Keppler) compiler | |
* for Java 8. I have to compile with javac for it to work. There is some | |
* problem with Lambda class naming in the code generated by Eclipse. | |
* </p> | |
* | |
* | |
*/ | |
public class AsyncRetryDemo { | |
public static void main(final String[] args) throws Exception { | |
new AsyncRetryDemo().case1(); | |
new AsyncRetryDemo().case2(); | |
new AsyncRetryDemo().case3(); | |
new AsyncRetryDemo().case4(); | |
// output is: | |
// | |
// case 1, sub 1: fail (max retries, called=1) | |
// case 1, sub 2: fail (max retries, called=1) | |
// case 2, sub 1: pass (called=2) | |
// case 2, sub 2: fail (called=3) | |
// case 3, sub 1: pass (called=2) | |
// case 3, sub 2: pass (called=2) | |
// case 4, sub 1: pass (called=2) | |
// case 4, sub 2: pass (called=2) | |
} | |
private static <R> Observable<R> retryAndCache( | |
final Func0<Observable<R>> binder, final int retries) { | |
return Observable.create(new OnSubscribe<R>() { | |
@Override | |
public void call(final Subscriber<? super R> child) { | |
binder.call().subscribe(child); | |
} | |
}) | |
.retry(retries) | |
.cache(); | |
} | |
private final AtomicInteger called = new AtomicInteger(); | |
private final CountDownLatch done = new CountDownLatch(2); | |
/** | |
* This represents a sequence of interdependent asynchronous operations that | |
* might fail in a way that prescribes a retry. In this case, all we are | |
* doing is squaring an integer asynchronously. | |
* | |
* @param input | |
* to the process. | |
* | |
* @return promise to perform the work and produce either a result or a | |
* suggestion to retry (e.g. a stale version error). | |
*/ | |
private Observable<Integer> canBeRetried(final int a) { | |
final Observable<Integer> rval; | |
if (this.called.getAndIncrement() == 0) { | |
rval = Observable.error(new RuntimeException( | |
"we always fail the first time")); | |
} else { | |
rval = Async.start(() -> a * a); | |
} | |
return rval; | |
} | |
private void case1() throws InterruptedException { | |
/* | |
* In this case, we invoke the observable-creator to get the async | |
* promise. Of course, if it fails, any retry will fail as well because | |
* the failed result is computed one time and pushed to all subscribers | |
* forever. | |
* | |
* Thus this case fails because the first invocation of canBeRetried(..) | |
* always fails. | |
*/ | |
final Observable<Integer> o = canBeRetried(2) | |
.retry(2); | |
check("case 1", o); | |
this.done.await(); | |
} | |
private void case2() throws InterruptedException { | |
/* | |
* In this case, we wrap canBeRetried(..) inside an observer that | |
* invokes it on every subscription. So, we get past the retry problem. | |
* But every new subscriber after the retry succeeds causes the work to | |
* restart. | |
*/ | |
final Observable<Integer> o = Observable.create( | |
new OnSubscribe<Integer>() { | |
@Override | |
public void call(final Subscriber<? super Integer> child) { | |
canBeRetried(2).subscribe(child); | |
} | |
}) | |
.retry(2); | |
check("case 2", o); | |
this.done.await(); | |
} | |
private void case3() throws InterruptedException { | |
/* | |
* In this case, we wrap canBeRetried(..) inside an observer that | |
* invokes it on every subscription. So, we get past the retry problem. | |
* We cache the result of the retry to solve the extra work problem. | |
*/ | |
final Observable<Integer> o = Observable.create( | |
new OnSubscribe<Integer>() { | |
@Override | |
public void call(final Subscriber<? super Integer> child) { | |
canBeRetried(2).subscribe(child); | |
} | |
}) | |
.retry(2) | |
.cache(); | |
check("case 3", o); | |
this.done.await(); | |
} | |
private void case4() throws InterruptedException { | |
/* | |
* Same as case 3 but we use the retryAndCache(..) to do the work for | |
* us. | |
*/ | |
final Observable<Integer> o = retryAndCache(() -> canBeRetried(2), 2); | |
check("case 4", o); | |
this.done.await(); | |
} | |
private void check(final String label, final Observable<Integer> promise) { | |
// does the work get retried on failure? | |
promise.subscribe( | |
v -> { | |
System.out.println(label + ", sub 1: " | |
+ (this.called.get() == 2 ? "pass" : "fail") | |
+ " (called=" + this.called.get() + ")"); | |
}, | |
x -> { | |
System.out.println(label | |
+ ", sub 1: fail (max retries, called=" | |
+ this.called.get() + ")"); | |
this.done.countDown(); | |
}, () -> { | |
this.done.countDown(); | |
}); | |
// do subsequent subscribers avoid invoking the work again? | |
promise.subscribe( | |
v -> { | |
System.out.println(label + ", sub 2: " | |
+ (this.called.get() == 2 ? "pass" : "fail") | |
+ " (called=" + this.called.get() + ")"); | |
}, | |
x -> { | |
System.out.println(label | |
+ ", sub 2: fail (max retries, called=" | |
+ this.called.get() + ")"); | |
this.done.countDown(); | |
}, () -> { | |
this.done.countDown(); | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment