Last active
January 26, 2016 16:02
-
-
Save jcmm33/b46cc76aca2643d475bc to your computer and use it in GitHub Desktop.
Reactive Extension Retry with Condition. Usage of Materialize could possibly reduce some of the leg work here but it seems to work for my scenario anyway. Similarly there may be issues related to observables which just complete and don't emit values, but that isn't a scenario we don't have.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static class RetryWithConditionMixins | |
{ | |
/// <summary> | |
/// Retry with condition observable. | |
/// </summary> | |
/// <typeparam name="TIn"></typeparam> | |
/// <typeparam name="TOut"></typeparam> | |
/// <param name="source"></param> | |
/// <param name="selector">The selector function which potentially can be retried</param> | |
/// <param name="retryObservable">The observable function indicating whether another retry should be performed</param> | |
/// <returns></returns> | |
public static IObservable<TOut> Retry<TIn, TOut>(this IObservable<TIn> source, | |
Func<TIn, IObservable<TOut>> selector, Func<Exception, IObservable<bool>> retryObservable) | |
{ | |
if (selector == null) | |
{ | |
throw new ArgumentNullException(nameof(selector)); | |
} | |
if (retryObservable == null) | |
{ | |
throw new ArgumentNullException(nameof(retryObservable)); | |
} | |
// perform the retry step | |
// concat to ensure correct sequencing | |
// if get back an exception then we need to re-raise it. | |
return source. | |
Select(sourceItem => DoRetryStep(sourceItem, selector, retryObservable)). | |
Concat(). | |
Select(v => | |
{ | |
if (v.Kind == AttemptResult<TOut>.AttemptKind.OnError) throw v.Exception; | |
return v.Value; | |
}); | |
} | |
/// <summary> | |
/// Retry with condition. | |
/// </summary> | |
/// <typeparam name="TIn"></typeparam> | |
/// <typeparam name="TOut"></typeparam> | |
/// <param name="source"></param> | |
/// <param name="selector">The selector function which potentially can be retried</param> | |
/// <param name="retry">The function indicating whether another retry should be performed</param> | |
/// <returns></returns> | |
public static IObservable<TOut> Retry<TIn, TOut>(this IObservable<TIn> source, | |
Func<TIn, IObservable<TOut>> selector, Func<Exception, bool> retry) | |
{ | |
return source.Retry(selector, (e) => Observable.Return(retry(e))); | |
} | |
/// <summary> | |
/// Perform the retry sequence, always returning an observable reflecting the outcome. | |
/// </summary> | |
/// <typeparam name="TIn"></typeparam> | |
/// <typeparam name="TOut"></typeparam> | |
/// <param name="sourceItem"></param> | |
/// <param name="selector"></param> | |
/// <param name="retryObservable"></param> | |
/// <returns></returns> | |
private static IObservable<AttemptResult<TOut>> DoRetryStep<TIn, TOut>(TIn sourceItem, | |
Func<TIn, IObservable<TOut>> selector, Func<Exception, IObservable<bool>> retryObservable) | |
{ | |
// effectively our queue for retry attempts | |
var trySubject = new Subject<Exception>(); | |
return trySubject. | |
AsObservable(). | |
Select(retryObservable). // select the retry logic | |
Switch(). | |
StartWith(true). // prime the pumps to ensure at least one execution | |
TakeWhile(shouldTry => shouldTry). // whilst we should try again | |
Select(g => RetrySelector(sourceItem, selector)). // get the result of the selector | |
Switch(). | |
Do((v) => | |
{ | |
// we errored, thus add the exception to the subject, which will cause a check on whether | |
// to run again | |
if (v.Kind == AttemptResult<TOut>.AttemptKind.OnError) | |
{ | |
trySubject.OnNext(v.Exception); | |
} | |
else | |
{ | |
// we didnt error, thus we must have completed successfully | |
trySubject.OnCompleted(); | |
} | |
} | |
). | |
TakeLast(1); // the last output is the actual output provided | |
} | |
/// <summary> | |
/// Utility class, to convert a potentially exception raising selector always into an <see cref="AttemptResult{TOut}"/> output | |
/// </summary> | |
/// <typeparam name="TIn"></typeparam> | |
/// <typeparam name="TOut"></typeparam> | |
/// <param name="value">The value to invoke the selector with</param> | |
/// <param name="selector">The selector </param> | |
/// <returns></returns> | |
private static IObservable<AttemptResult<TOut>> RetrySelector<TIn, TOut>(TIn value, | |
Func<TIn, IObservable<TOut>> selector) | |
{ | |
return Observable.Create<AttemptResult<TOut>>((s) => | |
{ | |
IDisposable sub = null; | |
try | |
{ | |
sub = selector(value). | |
Subscribe(v => | |
{ | |
s.OnNext(AttemptResult<TOut>.OnNext(v)); | |
s.OnCompleted(); | |
}, | |
(Exception exe) => | |
{ | |
s.OnNext(AttemptResult<TOut>.OnError(exe)); | |
s.OnCompleted(); | |
}, | |
() => | |
{ | |
s.OnNext(AttemptResult<TOut>.OnCompleted()); | |
s.OnCompleted(); | |
} | |
); | |
} | |
catch (Exception ex) | |
{ | |
s.OnNext(AttemptResult<TOut>.OnError(ex)); | |
s.OnCompleted(); | |
} | |
return sub; | |
}); | |
} | |
/// <summary> | |
/// Represents the result of a selector being used. | |
/// </summary> | |
/// <typeparam name="TOut"></typeparam> | |
public class AttemptResult<TOut> | |
{ | |
/// <summary> | |
/// Get the exception (if relevant) | |
/// </summary> | |
public Exception Exception { get; private set; } | |
/// <summary> | |
/// Get the selectors output value (if relevant) | |
/// </summary> | |
public TOut Value { get; private set; } | |
public enum AttemptKind | |
{ | |
OnNext, | |
OnCompleted, | |
OnError | |
}; | |
/// <summary> | |
/// Get the Kind of result seen from the selector | |
/// </summary> | |
public AttemptKind Kind { get; private set; } | |
public static AttemptResult<TOut> OnNext(TOut value) | |
{ | |
return new AttemptResult<TOut>() {Value = value, Kind = AttemptKind.OnNext}; | |
} | |
public static AttemptResult<TOut> OnCompleted() | |
{ | |
return new AttemptResult<TOut>() {Kind = AttemptKind.OnCompleted}; | |
} | |
public static AttemptResult<TOut> OnError(Exception exception) | |
{ | |
return new AttemptResult<TOut>() {Exception = exception, Kind = AttemptKind.OnError}; | |
} | |
} | |
} |
You could have a sequence for which no values are emitted but just completes.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
In
AttemptResult
, what is the difference betweenOnNext
andOnCompleted
? In other words, when wouldOnCompleted
tick and why is it useful?