Created
December 25, 2010 04:43
-
-
Save bvanderveen/754691 to your computer and use it in GitHub Desktop.
Examples of implementing an APM operation using the Reactive Framework and the Task Parallel Library
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Disposables; | |
using System.Linq; | |
using System.Threading; | |
namespace RxVsTplSamples | |
{ | |
public class RxAsyncOperation<TArg, TResult> | |
{ | |
Func<TArg, IObservable<TResult>> operation; | |
IDisposable disposable; | |
public RxAsyncOperation(Func<TArg, IObservable<TResult>> operation) | |
{ | |
this.operation = operation; | |
} | |
public IAsyncResult BeginInvoke(TArg arg, AsyncCallback callback, object state) | |
{ | |
AsyncResult<TResult> asyncResult = new AsyncResult<TResult>(callback, state); | |
disposable = operation(arg).Subscribe( | |
// the boolean arguments to SetAsCompleted indicate whether the operation | |
// completed synchronously. IObservable doesn't relay that information, | |
// so we assume false. | |
r => | |
{ | |
UnsubscribeIfPossible(); | |
asyncResult.SetAsCompleted(r, false); | |
}, | |
e => | |
{ | |
UnsubscribeIfPossible(); | |
asyncResult.SetAsCompleted(e, false); | |
}, | |
() => { }); | |
// if the operation completed during the call to subscribe, we won't have | |
// unsubscribed yet, so do it now. | |
if (asyncResult.IsCompleted) | |
UnsubscribeIfPossible(); | |
return asyncResult; | |
} | |
public TResult EndInvoke(IAsyncResult asyncResult) | |
{ | |
// the AsyncResult<T> implementation takes care of waiting and throwing exceptions for us. | |
return ((AsyncResult<TResult>)asyncResult).EndInvoke(); | |
} | |
void UnsubscribeIfPossible() | |
{ | |
if (disposable != null) | |
{ | |
disposable.Dispose(); | |
disposable = null; | |
} | |
} | |
} | |
#region IAsyncResult implementation | |
// the next two classes lifted from http://msdn.microsoft.com/en-us/magazine/cc163467.aspx | |
class AsyncResult : IAsyncResult | |
{ | |
// Fields set at construction which never change while | |
// operation is pending | |
private readonly AsyncCallback m_AsyncCallback; | |
private readonly Object m_AsyncState; | |
// Fields set at construction which do change after | |
// operation completes | |
private const Int32 c_StatePending = 0; | |
private const Int32 c_StateCompletedSynchronously = 1; | |
private const Int32 c_StateCompletedAsynchronously = 2; | |
private Int32 m_CompletedState = c_StatePending; | |
// Field that may or may not get set depending on usage | |
private ManualResetEvent m_AsyncWaitHandle; | |
// Fields set when operation completes | |
private Exception m_exception; | |
public AsyncResult(AsyncCallback asyncCallback, Object state) | |
{ | |
m_AsyncCallback = asyncCallback; | |
m_AsyncState = state; | |
} | |
public void SetAsCompleted( | |
Exception exception, Boolean completedSynchronously) | |
{ | |
// Passing null for exception means no error occurred. | |
// This is the common case | |
m_exception = exception; | |
// The m_CompletedState field MUST be set prior calling the callback | |
Int32 prevState = Interlocked.Exchange(ref m_CompletedState, | |
completedSynchronously ? c_StateCompletedSynchronously : | |
c_StateCompletedAsynchronously); | |
if (prevState != c_StatePending) | |
throw new InvalidOperationException( | |
"You can set a result only once"); | |
// If the event exists, set it | |
if (m_AsyncWaitHandle != null) m_AsyncWaitHandle.Set(); | |
// If a callback method was set, call it | |
if (m_AsyncCallback != null) m_AsyncCallback(this); | |
} | |
public void EndInvoke() | |
{ | |
// This method assumes that only 1 thread calls EndInvoke | |
// for this object | |
if (!IsCompleted) | |
{ | |
// If the operation isn't done, wait for it | |
AsyncWaitHandle.WaitOne(); | |
AsyncWaitHandle.Close(); | |
m_AsyncWaitHandle = null; // Allow early GC | |
} | |
// Operation is done: if an exception occured, throw it | |
if (m_exception != null) throw new Exception("Exception during operation.", m_exception); | |
} | |
#region Implementation of IAsyncResult | |
public Object AsyncState { get { return m_AsyncState; } } | |
public Boolean CompletedSynchronously | |
{ | |
get | |
{ | |
return Thread.VolatileRead(ref m_CompletedState) == | |
c_StateCompletedSynchronously; | |
} | |
} | |
public WaitHandle AsyncWaitHandle | |
{ | |
get | |
{ | |
if (m_AsyncWaitHandle == null) | |
{ | |
Boolean done = IsCompleted; | |
ManualResetEvent mre = new ManualResetEvent(done); | |
if (Interlocked.CompareExchange(ref m_AsyncWaitHandle, | |
mre, null) != null) | |
{ | |
// Another thread created this object's event; dispose | |
// the event we just created | |
mre.Close(); | |
} | |
else | |
{ | |
if (!done && IsCompleted) | |
{ | |
// If the operation wasn't done when we created | |
// the event but now it is done, set the event | |
m_AsyncWaitHandle.Set(); | |
} | |
} | |
} | |
return m_AsyncWaitHandle; | |
} | |
} | |
public Boolean IsCompleted | |
{ | |
get | |
{ | |
return Thread.VolatileRead(ref m_CompletedState) != | |
c_StatePending; | |
} | |
} | |
#endregion | |
} | |
class AsyncResult<TResult> : AsyncResult | |
{ | |
// Field set when operation completes | |
private TResult m_result = default(TResult); | |
public AsyncResult(AsyncCallback asyncCallback, Object state) : | |
base(asyncCallback, state) { } | |
public void SetAsCompleted(TResult result, | |
Boolean completedSynchronously) | |
{ | |
// Save the asynchronous operation's result | |
m_result = result; | |
// Tell the base class that the operation completed | |
// sucessfully (no exception) | |
base.SetAsCompleted(null, completedSynchronously); | |
} | |
new public TResult EndInvoke() | |
{ | |
base.EndInvoke(); // Wait until operation has completed | |
return m_result; // Return the result (if above didn't throw) | |
} | |
} | |
#endregion | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading.Tasks; | |
namespace RxVsTplSample | |
{ | |
public class TplAsyncOperation<TArg, TResult> | |
{ | |
Func<TArg, Task<TResult>> operation; | |
public TplAsyncOperation(Func<TArg, Task<TResult>> operation) | |
{ | |
this.operation = operation; | |
} | |
public IAsyncResult BeginInvoke(TArg arg, AsyncCallback callback, object state) | |
{ | |
var task = operation(arg); | |
task.ContinueWith(_ => callback(task)); | |
return task; | |
} | |
public TResult EndInvoke(IAsyncResult result) | |
{ | |
return ((Task<TResult>)result).Result; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment