Created
May 4, 2011 05:52
-
-
Save kg/954812 to your computer and use it in GitHub Desktop.
Solving Problems With Asynchrony #3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public interface IFuture { | |
Exception Error { | |
get; | |
} | |
bool Completed { | |
get; | |
} | |
bool Failed { | |
get; | |
} | |
void SetResult (object result, Exception error); | |
void RegisterOnComplete (Action<IFuture> handler); | |
} | |
public class Future<T> : IFuture { | |
private object _Lock = new object(); | |
private bool _Completed = false; | |
private Action<IFuture> _OnComplete = null; | |
private Exception _Error = null; | |
private T _Result = default(T); | |
public Future () { | |
} | |
public Future (T value) { | |
SetResult(value, null); | |
} | |
public void RegisterOnComplete (Action<IFuture> handler) { | |
Action<IFuture> newOnComplete; | |
bool completed; | |
// Acquire the lock, and store our completion state at the time we entered the lock. | |
lock (_Lock) { | |
completed = _Completed; | |
// If we're not complete when we enter the lock, we want to add this handler to the list of handlers. | |
if (!completed) { | |
var oldOnComplete = _OnComplete; | |
// Construct a new OnComplete delegate that will invoke the old list of handlers, then invoke our new handler. | |
if (oldOnComplete != null) { | |
newOnComplete = (f) => { | |
oldOnComplete(f); | |
handler(f); | |
}; | |
} else { | |
newOnComplete = handler; | |
} | |
_OnComplete = newOnComplete; | |
} | |
} | |
// We were complete when we entered the lock, so the list of handlers is already empty. Just call the handler | |
// that we were given immediately. | |
if (completed) | |
handler(this); | |
} | |
public bool Completed { | |
get { | |
lock (_Lock) | |
return _Completed; | |
} | |
} | |
public bool Failed { | |
get { | |
lock (_Lock) | |
return _Completed && (_Error != null) | |
} | |
} | |
public Exception Error { | |
get { | |
lock (_Lock) | |
if (_Completed) | |
return _Error; | |
else | |
throw new FutureHasNoResultException(this); | |
} | |
} | |
public T Result { | |
get { | |
lock (_Lock) | |
if (_Completed) { | |
if (_Error != null) | |
throw new FutureException("The future's result was an error.", _Error); | |
else | |
return _Result; | |
} else | |
throw new FutureHasNoResultException(this); | |
} | |
} | |
void IFuture.SetResult (object result, Exception error) { | |
if ((error != null) && (result != null)) | |
throw new FutureException("Cannot complete a future with both a result and an error.", error); | |
if (result == null) | |
SetResult(default(T), error); | |
else | |
SetResult((T)result, error); | |
} | |
public void SetResult (T result, Exception error) { | |
Action<IFuture> onComplete; | |
lock (_Lock) { | |
// A future should only be completed once. It's important for us to throw an exception in the event that it is completed twice, | |
// since this usually indicates a bug in other code. | |
if (_Completed) | |
throw new FutureAlreadyHasResultException(this); | |
// Store away the list of handlers, and store our new result/error pair. | |
onComplete = _OnComplete; | |
_OnComplete = null; | |
_Result = result; | |
_Error = error; | |
_Completed = true; | |
} | |
// If we had any handlers attached before we were completed, invoke them now. | |
if (onComplete != null) | |
onComplete(this); | |
} | |
public bool GetResult (out T result, out Exception error) { | |
lock (_Lock) { | |
result = _Result; | |
error = _Error; | |
return _Completed; | |
} | |
} | |
} | |
public class FutureException : Exception { | |
public FutureException (string message, Exception innerException) | |
: base(message, innerException) { | |
} | |
} | |
public class FutureAlreadyHasResultException : InvalidOperationException { | |
public readonly IFuture Future; | |
public FutureAlreadyHasResultException (IFuture future) | |
: base("Future already has a result") { | |
Future = future; | |
} | |
} | |
public class FutureHasNoResultException : InvalidOperationException { | |
public readonly IFuture Future; | |
public FutureHasNoResultException (IFuture future) | |
: base("Future does not yet have a result") { | |
Future = future; | |
} | |
} | |
public static partial class Future { | |
public static void Complete (this IFuture future) { | |
future.SetResult(null, null); | |
} | |
public static void Complete<T> (this Future<T> future, T result) { | |
future.SetResult(result, null); | |
} | |
public static void Fail (this IFuture future, Exception error) { | |
future.SetResult(null, error); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading; | |
public static partial class Future { | |
public static Future<T> RunInThreadPool<T> (Func<T> workItem) { | |
var future = new Future<T>(); | |
ThreadPool.QueueUserWorkItem((_) => { | |
try { | |
var result = workItem(); | |
future.Complete(result); | |
} catch (Exception ex) { | |
future.Fail(ex); | |
} | |
}); | |
return future; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static partial class Future { | |
public static Future<IFuture> WaitForFirst (params IFuture[] futures) { | |
return WaitForX(futures, 1); | |
} | |
public static IFuture WaitForAll (params IFuture[] futures) { | |
return WaitForX(futures, futures.Length); | |
} | |
private class WaitHandler { | |
public readonly Future<IFuture> Composite = new Future<IFuture>(); | |
public readonly List<IFuture> State = new List<IFuture>(); | |
public readonly int NumberToWaitFor; | |
public WaitHandler (IFuture[] futures, int numberToWaitFor) { | |
State.AddRange(futures); | |
NumberToWaitFor = numberToWaitFor; | |
} | |
public void OnComplete (IFuture f) { | |
bool completed = false; | |
lock (State) { | |
if (State.Count == NumberToWaitFor) { | |
completed = true; | |
State.Clear(); | |
} else { | |
State.Remove(f); | |
} | |
} | |
if (completed) | |
Composite.Complete(f); | |
} | |
} | |
private static Future<IFuture> WaitForX (IFuture[] futures, int numberToWaitFor) { | |
if ((futures == null) || (futures.Length == 0)) | |
throw new ArgumentException("Must specify at least one future to wait on", "futures"); | |
var h = new WaitHandler(futures, numberToWaitFor); | |
Action<IFuture> oc = h.OnComplete; | |
foreach (IFuture f in futures) | |
f.RegisterOnComplete(oc); | |
return h.Composite; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment