Skip to content

Instantly share code, notes, and snippets.

@jcmm33
Last active January 26, 2016 16:02
Show Gist options
  • Save jcmm33/b46cc76aca2643d475bc to your computer and use it in GitHub Desktop.
Save jcmm33/b46cc76aca2643d475bc to your computer and use it in GitHub Desktop.
Reactive Extension Retry with Condition. Usage of Materialize could possibly reduce some of the leg work here but it seems to work for my scenario anyway. Similarly there may be issues related to observables which just complete and don't emit values, but that isn't a scenario we don't have.
public static class RetryWithConditionMixins
{
/// <summary>
/// Retry with condition observable.
/// </summary>
/// <typeparam name="TIn"></typeparam>
/// <typeparam name="TOut"></typeparam>
/// <param name="source"></param>
/// <param name="selector">The selector function which potentially can be retried</param>
/// <param name="retryObservable">The observable function indicating whether another retry should be performed</param>
/// <returns></returns>
public static IObservable<TOut> Retry<TIn, TOut>(this IObservable<TIn> source,
Func<TIn, IObservable<TOut>> selector, Func<Exception, IObservable<bool>> retryObservable)
{
if (selector == null)
{
throw new ArgumentNullException(nameof(selector));
}
if (retryObservable == null)
{
throw new ArgumentNullException(nameof(retryObservable));
}
// perform the retry step
// concat to ensure correct sequencing
// if get back an exception then we need to re-raise it.
return source.
Select(sourceItem => DoRetryStep(sourceItem, selector, retryObservable)).
Concat().
Select(v =>
{
if (v.Kind == AttemptResult<TOut>.AttemptKind.OnError) throw v.Exception;
return v.Value;
});
}
/// <summary>
/// Retry with condition.
/// </summary>
/// <typeparam name="TIn"></typeparam>
/// <typeparam name="TOut"></typeparam>
/// <param name="source"></param>
/// <param name="selector">The selector function which potentially can be retried</param>
/// <param name="retry">The function indicating whether another retry should be performed</param>
/// <returns></returns>
public static IObservable<TOut> Retry<TIn, TOut>(this IObservable<TIn> source,
Func<TIn, IObservable<TOut>> selector, Func<Exception, bool> retry)
{
return source.Retry(selector, (e) => Observable.Return(retry(e)));
}
/// <summary>
/// Perform the retry sequence, always returning an observable reflecting the outcome.
/// </summary>
/// <typeparam name="TIn"></typeparam>
/// <typeparam name="TOut"></typeparam>
/// <param name="sourceItem"></param>
/// <param name="selector"></param>
/// <param name="retryObservable"></param>
/// <returns></returns>
private static IObservable<AttemptResult<TOut>> DoRetryStep<TIn, TOut>(TIn sourceItem,
Func<TIn, IObservable<TOut>> selector, Func<Exception, IObservable<bool>> retryObservable)
{
// effectively our queue for retry attempts
var trySubject = new Subject<Exception>();
return trySubject.
AsObservable().
Select(retryObservable). // select the retry logic
Switch().
StartWith(true). // prime the pumps to ensure at least one execution
TakeWhile(shouldTry => shouldTry). // whilst we should try again
Select(g => RetrySelector(sourceItem, selector)). // get the result of the selector
Switch().
Do((v) =>
{
// we errored, thus add the exception to the subject, which will cause a check on whether
// to run again
if (v.Kind == AttemptResult<TOut>.AttemptKind.OnError)
{
trySubject.OnNext(v.Exception);
}
else
{
// we didnt error, thus we must have completed successfully
trySubject.OnCompleted();
}
}
).
TakeLast(1); // the last output is the actual output provided
}
/// <summary>
/// Utility class, to convert a potentially exception raising selector always into an <see cref="AttemptResult{TOut}"/> output
/// </summary>
/// <typeparam name="TIn"></typeparam>
/// <typeparam name="TOut"></typeparam>
/// <param name="value">The value to invoke the selector with</param>
/// <param name="selector">The selector </param>
/// <returns></returns>
private static IObservable<AttemptResult<TOut>> RetrySelector<TIn, TOut>(TIn value,
Func<TIn, IObservable<TOut>> selector)
{
return Observable.Create<AttemptResult<TOut>>((s) =>
{
IDisposable sub = null;
try
{
sub = selector(value).
Subscribe(v =>
{
s.OnNext(AttemptResult<TOut>.OnNext(v));
s.OnCompleted();
},
(Exception exe) =>
{
s.OnNext(AttemptResult<TOut>.OnError(exe));
s.OnCompleted();
},
() =>
{
s.OnNext(AttemptResult<TOut>.OnCompleted());
s.OnCompleted();
}
);
}
catch (Exception ex)
{
s.OnNext(AttemptResult<TOut>.OnError(ex));
s.OnCompleted();
}
return sub;
});
}
/// <summary>
/// Represents the result of a selector being used.
/// </summary>
/// <typeparam name="TOut"></typeparam>
public class AttemptResult<TOut>
{
/// <summary>
/// Get the exception (if relevant)
/// </summary>
public Exception Exception { get; private set; }
/// <summary>
/// Get the selectors output value (if relevant)
/// </summary>
public TOut Value { get; private set; }
public enum AttemptKind
{
OnNext,
OnCompleted,
OnError
};
/// <summary>
/// Get the Kind of result seen from the selector
/// </summary>
public AttemptKind Kind { get; private set; }
public static AttemptResult<TOut> OnNext(TOut value)
{
return new AttemptResult<TOut>() {Value = value, Kind = AttemptKind.OnNext};
}
public static AttemptResult<TOut> OnCompleted()
{
return new AttemptResult<TOut>() {Kind = AttemptKind.OnCompleted};
}
public static AttemptResult<TOut> OnError(Exception exception)
{
return new AttemptResult<TOut>() {Exception = exception, Kind = AttemptKind.OnError};
}
}
}
@mteper
Copy link

mteper commented Jan 12, 2016

In AttemptResult, what is the difference between OnNext and OnCompleted? In other words, when would OnCompleted tick and why is it useful?

@jcmm33
Copy link
Author

jcmm33 commented Jan 26, 2016

You could have a sequence for which no values are emitted but just completes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment