Created
November 30, 2011 17:05
-
-
Save drstevens/1409829 to your computer and use it in GitHub Desktop.
Globally handle fatal exceptions in Rx.Net subscribers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* This is how I am globally handling fatal exceptions thrown from Rx subscribers. | |
* It is what I came up with in response to my stackoverflow question here | |
* http://stackoverflow.com/questions/7210051/catching-exceptions-which-may-be-thrown-from-a-subscription-onnext-action | |
* This is far from ideal. From what I understand, exception handling has been improved greately in Rx for .NET 4.5 | |
*/ | |
using System; | |
using System.Collections.Generic; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using NLog; | |
using NUnit.Framework; | |
namespace RxExceptionHandling | |
{ | |
/// <summary> | |
/// I toyed with making this take a type param TException | |
/// but decided against it because there are many places this is used. | |
/// Each would have to duplicate this type parameter of the instance. | |
/// I instead used Exception and require that this be used for fatal events only. | |
/// Non fatal events must be caught in the action parameter | |
/// </summary> | |
public interface IExceptionCatcher | |
{ | |
Action<T> Catch<T>(Action<T> action); | |
void HandleException(Exception e); | |
} | |
/// <summary> | |
/// This provides a way for exceptions to be handled from asynchronously executed code | |
/// </summary> | |
public class ExceptionCatcher : IExceptionCatcher, IDisposable | |
{ | |
private static readonly Logger Logger = LogManager.GetCurrentClassLogger(); | |
private readonly object _lockObject = new object(); | |
private readonly ManualResetEvent _resetEvent; | |
private AggregateException _exceptions; | |
private readonly Task _task; | |
private volatile bool _isExceptionCaught; | |
private readonly bool _continueOnError; | |
public ExceptionCatcher(Action<AggregateException> errorAction, bool continueOnError) | |
{ | |
_continueOnError = continueOnError; | |
_resetEvent = new ManualResetEvent(false); | |
_task = Task.Factory.StartNew(() => | |
{ | |
try | |
{ | |
_resetEvent.WaitOne(); | |
AggregateException exceptions; | |
lock (_lockObject) | |
{ | |
exceptions = _exceptions; | |
} | |
if (exceptions != null) | |
{ | |
errorAction(exceptions); | |
} | |
} | |
catch (Exception ex) | |
{ | |
Logger.ErrorException("Error In Exception Action:" + ex.Message, | |
ex); | |
throw; | |
} | |
finally | |
{ | |
Logger.Trace("Exiting..."); | |
} | |
}); | |
} | |
public Action<T> Catch<T>(Action<T> action) | |
{ | |
return arg => | |
{ | |
try | |
{ | |
if (!IsExecutionPrevented()) | |
action(arg); | |
} | |
catch (Exception e) | |
{ | |
HandleException(e); | |
} | |
}; | |
} | |
public void HandleException(Exception e) | |
{ | |
_isExceptionCaught = true; | |
lock (_lockObject) | |
{ | |
var exceptions = new List<Exception> {e}; | |
if (_exceptions != null) | |
{ | |
exceptions.AddRange(_exceptions.InnerExceptions); | |
} | |
_exceptions = new AggregateException(String.Format("Exceptions handled by {0}", GetType().FullName), | |
exceptions); | |
_resetEvent.Set(); //Signal task to wake up | |
} | |
} | |
public void Dispose() | |
{ | |
try | |
{ | |
//wake up error handling task in case it hasn't been yet | |
_resetEvent.Set(); | |
_task.Wait(TimeSpan.FromSeconds(10)); | |
} | |
catch (Exception e) | |
{ | |
//eat exception in Dispose | |
Logger.ErrorException("Exception handling task completed exceptionally: " + e.Message, e); | |
} | |
} | |
private bool IsExecutionPrevented() | |
{ | |
return _isExceptionCaught && !_continueOnError; | |
} | |
} | |
public static class Observables | |
{ | |
/// <summary> | |
/// Delegate to <see cref="System.ObservableExtensions.Subscribe{TSource}"/> | |
/// but catch any exceptions resulting from calls to <paramref name="onNext"/>. | |
/// Handle these exceptions using <paramref name="onError"/> | |
/// </summary> | |
/// <typeparam name="TSource"></typeparam> | |
/// <param name="source"></param> | |
/// <param name="onNext"></param> | |
/// <param name="onError"></param> | |
/// <param name="onCompleted"></param> | |
/// <returns></returns> | |
public static IDisposable SubscribeWithExceptionCatching<TSource>(this IObservable<TSource> source, | |
Action<TSource> onNext, | |
Action<Exception> onError, | |
Action onCompleted) | |
{ | |
return source.Subscribe(item => | |
{ | |
try | |
{ | |
onNext(item); | |
} | |
catch (Exception e) | |
{ | |
onError(e); | |
} | |
}, onError, onCompleted); | |
} | |
/// <summary> | |
/// Delegate to <see cref="System.ObservableExtensions.Subscribe{TSource}"/> | |
/// but catch any exceptions resulting from calls to <paramref name="onNext"/>. | |
/// Handle these exceptions using <paramref name="exceptionCatcher"/> | |
/// </summary> | |
/// <typeparam name="TSource"></typeparam> | |
/// <param name="source"></param> | |
/// <param name="onNext"></param> | |
/// <param name="exceptionCatcher">This is used to catch the exceptions</param> | |
/// <param name="onCompleted"></param> | |
/// <returns></returns> | |
public static IDisposable SubscribeWithExceptionCatching<TSource>(this IObservable<TSource> source, | |
Action<TSource> onNext, | |
IExceptionCatcher exceptionCatcher, | |
Action onCompleted) | |
{ | |
return source.Subscribe(exceptionCatcher.Catch(onNext), exceptionCatcher.HandleException, onCompleted); | |
} | |
/// <summary> | |
/// Delegate to <see cref="System.ObservableExtensions.Subscribe{TSource}"/> | |
/// but catch any exceptions resulting from calls to <paramref name="onNext"/>. | |
/// Handle these exceptions using <paramref name="exceptionCatcher"/> | |
/// </summary> | |
/// <typeparam name="TSource"></typeparam> | |
/// <param name="source"></param> | |
/// <param name="onNext"></param> | |
/// <param name="exceptionCatcher">This is used to catch the exceptions</param> | |
/// <returns></returns> | |
public static IDisposable SubscribeWithExceptionCatching<TSource>(this IObservable<TSource> source, | |
Action<TSource> onNext, | |
IExceptionCatcher exceptionCatcher) | |
{ | |
return source.Subscribe(exceptionCatcher.Catch(onNext), exceptionCatcher.HandleException); | |
} | |
} | |
[TestFixture] | |
public class ExceptionCatcherFixture | |
{ | |
private static readonly IEnumerable<int> ONE_TO_TEN = Enumerable.Range(1, 10); | |
private static readonly Exception EXPECTED_EXCEPTION = new Exception("BobSagat!"); | |
[Test] | |
public void TestHandleException() | |
{ | |
//Arrange | |
AggregateException actualException = null; | |
//Act | |
using (var handler = new ExceptionCatcher(e => actualException = e, false)) | |
{ | |
var startNew = Task.Factory.StartNew(() => handler.HandleException(EXPECTED_EXCEPTION)); | |
startNew.Wait(TimeSpan.FromSeconds(5)); //need to wait for above task to complete | |
} | |
//Assert | |
Assert.That(actualException, Is.Not.Null); | |
Assert.That(actualException.InnerExceptions, Contains.Item(EXPECTED_EXCEPTION)); | |
} | |
[Test] | |
public void TestExecuteWithNoException() | |
{ | |
//Arrange | |
AggregateException actualException = null; | |
//Act | |
using (var handler = new ExceptionCatcher(e => actualException = e, false)) | |
{ | |
var action = handler.Catch<Exception>(Throw); | |
action(EXPECTED_EXCEPTION); | |
} | |
//Assert | |
Assert.That(actualException != null); | |
Assert.That(actualException.InnerExceptions, Contains.Item(EXPECTED_EXCEPTION)); | |
} | |
[Test(Description = "Verify that the subsequent action calls are prevented after exception handled")] | |
public void TestThatActionIsPrevented() | |
{ | |
//Arrange | |
var lastActionValue = 0; | |
//Act | |
using (var handler = new ExceptionCatcher(DoNothing, false)) | |
{ | |
var tryAction = handler.Catch<int>(i => | |
{ | |
lastActionValue = i; | |
throw EXPECTED_EXCEPTION; | |
}); | |
ONE_TO_TEN.ForEach(tryAction); | |
} | |
//Assert | |
Assert.That(lastActionValue, Is.EqualTo(1)); | |
} | |
[Test(Description = "Verify that the subsequent action calls are NOT prevented after exception handled")] | |
public void TestThatActionIsNotPrevented() | |
{ | |
//Arrange | |
var lastActionValue = 0; | |
//Act | |
using (var handler = new ExceptionCatcher(DoNothing, true)) | |
{ | |
var tryAction = handler.Catch<int>(i => | |
{ | |
lastActionValue = i; | |
throw EXPECTED_EXCEPTION; | |
}); | |
ONE_TO_TEN.ForEach(tryAction); | |
} | |
//Assert | |
Assert.That(lastActionValue, Is.EqualTo(ONE_TO_TEN.Last())); | |
} | |
[Test(Description = "Verify that exceptions thrown from the error action are handled")] | |
public void TestThatExceptionsFromErrorActionHandled() | |
{ | |
//Arrange | |
//Act | |
using (var handler = new ExceptionCatcher(_ => Throw(EXPECTED_EXCEPTION), true)) | |
{ | |
handler.HandleException(new Exception("Non Expected Exception")); | |
} | |
//Assert | |
Assert.True(true); | |
Thread.Sleep(TimeSpan.FromMilliseconds(100)); | |
} | |
public static void DoNothing<T>(T _) | |
{ | |
} | |
/// <summary> | |
/// convert a throw into an expression for use in lambdas | |
/// </summary> | |
/// <param name="e"></param> | |
public static void Throw(Exception e) | |
{ | |
throw e; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment