-
-
Save jrgcubano/da67fe5cbe970e454742660aa12baded to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Licensed under the MIT license with <3 by GitHub | |
/// <summary> | |
/// An exponential back off strategy which starts with 1 second and then 4, 9, 16... | |
/// </summary> | |
[SuppressMessage("Microsoft.Security", "CA2104:DoNotDeclareReadOnlyMutableReferenceTypes")] | |
public static readonly Func<int, TimeSpan> ExponentialBackoff = n => TimeSpan.FromSeconds(Math.Pow(n, 2)); | |
/// <summary> | |
/// Returns a cold observable which retries (re-subscribes to) the source observable on error up to the | |
/// specified number of times or until it successfully terminates. Allows for customizable back off strategy. | |
/// </summary> | |
/// <param name="source">The source observable.</param> | |
/// <param name="retryCount">The number of attempts of running the source observable before failing.</param> | |
/// <param name="strategy">The strategy to use in backing off, exponential by default.</param> | |
/// <param name="retryOnError">A predicate determining for which exceptions to retry. Defaults to all</param> | |
/// <param name="scheduler">The scheduler.</param> | |
/// <returns> | |
/// A cold observable which retries (re-subscribes to) the source observable on error up to the | |
/// specified number of times or until it successfully terminates. | |
/// </returns> | |
[SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")] | |
public static IObservable<T> RetryWithBackoffStrategy<T>( | |
this IObservable<T> source, | |
int retryCount = 3, | |
Func<int, TimeSpan> strategy = null, | |
Func<Exception, bool> retryOnError = null, | |
IScheduler scheduler = null) | |
{ | |
strategy = strategy ?? ExponentialBackoff; | |
scheduler = scheduler ?? RxApp.TaskpoolScheduler; | |
if (retryOnError == null) | |
retryOnError = e => true; | |
int attempt = 0; | |
return Observable.Defer(() => | |
{ | |
return ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1), scheduler)) | |
.Select(Notification.CreateOnNext) | |
.Catch((Exception e) => retryOnError(e) | |
? Observable.Throw<Notification<T>>(e) | |
: Observable.Return(Notification.CreateOnError<T>(e))); | |
}) | |
.Retry(retryCount) | |
.Dematerialize(); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Licensed under the MIT license with <3 by GitHub | |
static readonly Func<int, TimeSpan> LinearMsStrategy = n => TimeSpan.FromMilliseconds(1 * n); | |
[Fact] | |
public void DoesNotRetryInCaseOfSuccess() | |
{ | |
new TestScheduler().With(sched => | |
{ | |
int tryCount = 0; | |
var source = Observable.Defer(() => | |
{ | |
tryCount++; | |
return Observable.Return("yolo"); | |
}); | |
source.RetryWithBackoffStrategy( | |
retryCount: 3, | |
strategy: LinearMsStrategy, | |
scheduler: sched | |
); | |
source.Subscribe(); | |
Assert.Equal(1, tryCount); | |
sched.AdvanceByMs(1); | |
Assert.Equal(1, tryCount); | |
}); | |
} | |
[Fact] | |
public void PropagatesLastObservedExceptionIfAllTriesFail() | |
{ | |
new TestScheduler().With(sched => | |
{ | |
int tryCount = 0; | |
var source = Observable.Defer(() => | |
{ | |
tryCount++; | |
return Observable.Throw<string>(new InvalidOperationException(tryCount.ToString())); | |
}); | |
var observable = source.RetryWithBackoffStrategy( | |
retryCount: 3, | |
strategy: LinearMsStrategy, | |
scheduler: sched | |
); | |
Exception lastError = null; | |
observable.Subscribe(_ => { }, e => { lastError = e; }); | |
Assert.Equal(1, tryCount); | |
sched.AdvanceByMs(1); | |
Assert.Equal(2, tryCount); | |
sched.AdvanceByMs(2); | |
Assert.Equal(3, tryCount); | |
Assert.NotNull(lastError); | |
Assert.Equal("3", lastError.Message); | |
}); | |
} | |
[Fact] | |
public void RetriesOnceIfSuccessBeforeRetriesRunOut() | |
{ | |
new TestScheduler().With(sched => | |
{ | |
int tryCount = 0; | |
var source = Observable.Defer(() => | |
{ | |
if (tryCount++ < 1) return Observable.Throw<string>(new InvalidOperationException()); | |
return Observable.Return("yolo " + tryCount); | |
}); | |
var observable = source.RetryWithBackoffStrategy( | |
retryCount: 5, | |
strategy: LinearMsStrategy, | |
scheduler: sched | |
); | |
string lastValue = null; | |
observable.Subscribe(n => { lastValue = n; }); | |
Assert.Equal(1, tryCount); | |
Assert.Null(lastValue); | |
sched.AdvanceByMs(1); | |
Assert.Equal(2, tryCount); | |
Assert.Equal("yolo 2", lastValue); | |
}); | |
} | |
[Fact] | |
public void UnsubscribingDisposesSource() | |
{ | |
new TestScheduler().With(sched => | |
{ | |
int c = -1; | |
var neverEndingSource = Observable.Defer(() => | |
{ | |
return Observable.Timer(TimeSpan.FromMilliseconds(1), TimeSpan.FromMilliseconds(1), sched) | |
.Do(_ => c++) | |
.Select(_ => Unit.Default); | |
}); | |
var observable = neverEndingSource.RetryWithBackoffStrategy(scheduler: sched); | |
// Cold | |
Assert.Equal(-1, c); | |
var disp = observable | |
.Take(2) | |
.Subscribe(); | |
sched.AdvanceByMs(1); | |
Assert.Equal(0, c); | |
sched.AdvanceByMs(1); | |
Assert.Equal(1, c); | |
sched.AdvanceByMs(10); | |
Assert.Equal(1, c); | |
}); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public IObservable<IWebResponse> MakeWebRequest() { ... } | |
// Retry 4 times with default back off strategy for all exceptions | |
return Observable.Defer(() => MakeWebRequest()) | |
.RetryWithBackoffStrategy(retryCount: 4) | |
// Retry 4 times with default back off strategy for all web exceptions | |
return Observable.Defer(() => MakeWebRequest()) | |
.RetryWithBackoffStrategy(retryCount: 4, retryOnError: e => e is WebException) | |
// Retry 4 times with default back off strategy on 202s | |
return Observable.Defer(() => MakeWebRequest()) | |
.RetryWithBackoffStrategy(retryCount: 4, retryOnError: e => e is WebException && ((WebException)e).StatusCode == 202) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment