Last active
December 22, 2015 06:48
-
-
Save caleb-vear/6433026 to your computer and use it in GitHub Desktop.
Poor mans await
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static class AsyncHelper<TResult> | |
{ | |
public static Task<TResult> RunAsync(Func<TaskCompletionSource<TResult>, IEnumerable<Task>> createProcessSequence) | |
{ | |
return RunAsync(createProcessSequence, TaskScheduler.Current); | |
} | |
public static Task<TResult> RunAsync(Func<TaskCompletionSource<TResult>, IEnumerable<Task>> createProcessSequence, TaskScheduler continuationScheduler) | |
{ | |
var cancellationTokenSource = new CancellationTokenSource(); | |
return RunAsync(createProcessSequence, cancellationTokenSource.Token, continuationScheduler); | |
} | |
public static Task<TResult> RunAsync(Func<TaskCompletionSource<TResult>, IEnumerable<Task>> createProcessSequence, CancellationToken cancellationToken, TaskScheduler continuationScheduler) | |
{ | |
var completionSource = new TaskCompletionSource<TResult>(); | |
var processSequenceEnumerator = createProcessSequence(completionSource).GetEnumerator(); | |
ScheduleNext(processSequenceEnumerator, completionSource, continuationScheduler); | |
// We need to dispose of the enumerator once we are done. | |
completionSource.Task.ContinueWith(_ => processSequenceEnumerator.Dispose()); | |
return completionSource.Task; | |
} | |
static void ScheduleNext(IEnumerator<Task> processSequenceEnumerator, TaskCompletionSource<TResult> completionSource, TaskScheduler continuationScheduler) | |
{ | |
if (completionSource.Task.IsCompleted || completionSource.Task.IsCanceled || completionSource.Task.IsFaulted) | |
return; | |
try | |
{ | |
if (processSequenceEnumerator.MoveNext()) | |
{ | |
processSequenceEnumerator.Current.ContinueWith( | |
continuationAction: t => | |
{ | |
if (t.IsFaulted) | |
completionSource.SetException(t.Exception); | |
else if (t.IsCanceled) | |
completionSource.SetCanceled(); | |
else | |
ScheduleNext(processSequenceEnumerator,completionSource, continuationScheduler); | |
}, | |
scheduler: continuationScheduler); | |
} | |
else | |
{ | |
if (!completionSource.Task.IsCompleted) | |
completionSource.SetException(new AsyncProcessException()); | |
} | |
} | |
catch (OperationCanceledException) | |
{ | |
completionSource.SetCanceled(); | |
} | |
catch (Exception ex) | |
{ | |
completionSource.SetException(ex); | |
} | |
} | |
public class AsyncProcessException : Exception | |
{ | |
public AsyncProcessException() : base("Async process ran to the end, but never provided a result to the completion source.") { } | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static class StreamExtensions | |
{ | |
public static Task CopyToAsync(this Stream source, Stream destination, int? bufferSize = null) | |
{ | |
if (destination == null) | |
throw new ArgumentNullException("destination"); | |
if (!source.CanRead && !source.CanWrite) | |
throw new ObjectDisposedException((string)null, "Source stream has been closed."); | |
if (!destination.CanRead && !destination.CanWrite) | |
throw new ObjectDisposedException("destination", "Destination stream has been closed."); | |
if (!source.CanRead) | |
throw new NotSupportedException("Can not read from source stream"); | |
if (!destination.CanWrite) | |
throw new NotSupportedException("Can not write to destination stream"); | |
return AsyncHelper<object>.RunAsync(completionSource => CopyStreamProcess(source, destination, bufferSize ?? 81920, completionSource)); | |
} | |
static IEnumerable<Task> CopyStreamProcess(Stream source, Stream destination, int bufferSize, TaskCompletionSource<object> completionSource) | |
{ | |
var buffer = new byte[bufferSize]; | |
int count; | |
while (true) | |
{ | |
var readTask = Task.Factory.FromAsync( | |
beginMethod: (callback, s) => source.BeginRead(buffer, 0, bufferSize, callback, s), | |
endMethod: result => source.EndRead(result), | |
state: null); | |
yield return readTask; | |
if (readTask.Result == 0) | |
break; | |
yield return destination.WriteBytesAsync(buffer, offset: 0, count: readTask.Result); | |
} | |
completionSource.SetResult(null); | |
} | |
public static Task<byte[]> ReadBytesAsync(this Stream stream, int bytesToRead) | |
{ | |
return AsyncHelper<byte[]>.RunAsync(completionSource => ReadBytesProcess(stream, bytesToRead, completionSource)); | |
} | |
static IEnumerable<Task> ReadBytesProcess(Stream stream, int bytesToRead, TaskCompletionSource<byte[]> completionSource) | |
{ | |
var buffer = new byte[bytesToRead]; | |
var bytesRemaining = bytesToRead; | |
var currentPosition = 0; | |
while (bytesRemaining > 0) | |
{ | |
var readTask = Task.Factory.FromAsync( | |
beginMethod: (callback, s) => stream.BeginRead(buffer, currentPosition, bytesRemaining, callback, s), | |
endMethod: result => stream.EndRead(result), | |
state: null); | |
yield return readTask; | |
currentPosition += readTask.Result; | |
bytesRemaining -= readTask.Result; | |
} | |
completionSource.SetResult(buffer); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment