Skip to content

Instantly share code, notes, and snippets.

@kg
Created May 4, 2011 05:52
Show Gist options
  • Save kg/954812 to your computer and use it in GitHub Desktop.
Save kg/954812 to your computer and use it in GitHub Desktop.
Solving Problems With Asynchrony #3
public interface IFuture {
Exception Error {
get;
}
bool Completed {
get;
}
bool Failed {
get;
}
void SetResult (object result, Exception error);
void RegisterOnComplete (Action<IFuture> handler);
}
public class Future<T> : IFuture {
private object _Lock = new object();
private bool _Completed = false;
private Action<IFuture> _OnComplete = null;
private Exception _Error = null;
private T _Result = default(T);
public Future () {
}
public Future (T value) {
SetResult(value, null);
}
public void RegisterOnComplete (Action<IFuture> handler) {
Action<IFuture> newOnComplete;
bool completed;
// Acquire the lock, and store our completion state at the time we entered the lock.
lock (_Lock) {
completed = _Completed;
// If we're not complete when we enter the lock, we want to add this handler to the list of handlers.
if (!completed) {
var oldOnComplete = _OnComplete;
// Construct a new OnComplete delegate that will invoke the old list of handlers, then invoke our new handler.
if (oldOnComplete != null) {
newOnComplete = (f) => {
oldOnComplete(f);
handler(f);
};
} else {
newOnComplete = handler;
}
_OnComplete = newOnComplete;
}
}
// We were complete when we entered the lock, so the list of handlers is already empty. Just call the handler
// that we were given immediately.
if (completed)
handler(this);
}
public bool Completed {
get {
lock (_Lock)
return _Completed;
}
}
public bool Failed {
get {
lock (_Lock)
return _Completed && (_Error != null)
}
}
public Exception Error {
get {
lock (_Lock)
if (_Completed)
return _Error;
else
throw new FutureHasNoResultException(this);
}
}
public T Result {
get {
lock (_Lock)
if (_Completed) {
if (_Error != null)
throw new FutureException("The future's result was an error.", _Error);
else
return _Result;
} else
throw new FutureHasNoResultException(this);
}
}
void IFuture.SetResult (object result, Exception error) {
if ((error != null) && (result != null))
throw new FutureException("Cannot complete a future with both a result and an error.", error);
if (result == null)
SetResult(default(T), error);
else
SetResult((T)result, error);
}
public void SetResult (T result, Exception error) {
Action<IFuture> onComplete;
lock (_Lock) {
// A future should only be completed once. It's important for us to throw an exception in the event that it is completed twice,
// since this usually indicates a bug in other code.
if (_Completed)
throw new FutureAlreadyHasResultException(this);
// Store away the list of handlers, and store our new result/error pair.
onComplete = _OnComplete;
_OnComplete = null;
_Result = result;
_Error = error;
_Completed = true;
}
// If we had any handlers attached before we were completed, invoke them now.
if (onComplete != null)
onComplete(this);
}
public bool GetResult (out T result, out Exception error) {
lock (_Lock) {
result = _Result;
error = _Error;
return _Completed;
}
}
}
public class FutureException : Exception {
public FutureException (string message, Exception innerException)
: base(message, innerException) {
}
}
public class FutureAlreadyHasResultException : InvalidOperationException {
public readonly IFuture Future;
public FutureAlreadyHasResultException (IFuture future)
: base("Future already has a result") {
Future = future;
}
}
public class FutureHasNoResultException : InvalidOperationException {
public readonly IFuture Future;
public FutureHasNoResultException (IFuture future)
: base("Future does not yet have a result") {
Future = future;
}
}
public static partial class Future {
public static void Complete (this IFuture future) {
future.SetResult(null, null);
}
public static void Complete<T> (this Future<T> future, T result) {
future.SetResult(result, null);
}
public static void Fail (this IFuture future, Exception error) {
future.SetResult(null, error);
}
}
using System;
using System.Threading;
public static partial class Future {
public static Future<T> RunInThreadPool<T> (Func<T> workItem) {
var future = new Future<T>();
ThreadPool.QueueUserWorkItem((_) => {
try {
var result = workItem();
future.Complete(result);
} catch (Exception ex) {
future.Fail(ex);
}
});
return future;
}
}
public static partial class Future {
public static Future<IFuture> WaitForFirst (params IFuture[] futures) {
return WaitForX(futures, 1);
}
public static IFuture WaitForAll (params IFuture[] futures) {
return WaitForX(futures, futures.Length);
}
private class WaitHandler {
public readonly Future<IFuture> Composite = new Future<IFuture>();
public readonly List<IFuture> State = new List<IFuture>();
public readonly int NumberToWaitFor;
public WaitHandler (IFuture[] futures, int numberToWaitFor) {
State.AddRange(futures);
NumberToWaitFor = numberToWaitFor;
}
public void OnComplete (IFuture f) {
bool completed = false;
lock (State) {
if (State.Count == NumberToWaitFor) {
completed = true;
State.Clear();
} else {
State.Remove(f);
}
}
if (completed)
Composite.Complete(f);
}
}
private static Future<IFuture> WaitForX (IFuture[] futures, int numberToWaitFor) {
if ((futures == null) || (futures.Length == 0))
throw new ArgumentException("Must specify at least one future to wait on", "futures");
var h = new WaitHandler(futures, numberToWaitFor);
Action<IFuture> oc = h.OnComplete;
foreach (IFuture f in futures)
f.RegisterOnComplete(oc);
return h.Composite;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment