Skip to content

Instantly share code, notes, and snippets.

@caleb-vear
Last active December 22, 2015 06:48
Show Gist options
  • Save caleb-vear/6433026 to your computer and use it in GitHub Desktop.
Save caleb-vear/6433026 to your computer and use it in GitHub Desktop.
Poor mans await
public static class AsyncHelper<TResult>
{
public static Task<TResult> RunAsync(Func<TaskCompletionSource<TResult>, IEnumerable<Task>> createProcessSequence)
{
return RunAsync(createProcessSequence, TaskScheduler.Current);
}
public static Task<TResult> RunAsync(Func<TaskCompletionSource<TResult>, IEnumerable<Task>> createProcessSequence, TaskScheduler continuationScheduler)
{
var cancellationTokenSource = new CancellationTokenSource();
return RunAsync(createProcessSequence, cancellationTokenSource.Token, continuationScheduler);
}
public static Task<TResult> RunAsync(Func<TaskCompletionSource<TResult>, IEnumerable<Task>> createProcessSequence, CancellationToken cancellationToken, TaskScheduler continuationScheduler)
{
var completionSource = new TaskCompletionSource<TResult>();
var processSequenceEnumerator = createProcessSequence(completionSource).GetEnumerator();
ScheduleNext(processSequenceEnumerator, completionSource, continuationScheduler);
// We need to dispose of the enumerator once we are done.
completionSource.Task.ContinueWith(_ => processSequenceEnumerator.Dispose());
return completionSource.Task;
}
static void ScheduleNext(IEnumerator<Task> processSequenceEnumerator, TaskCompletionSource<TResult> completionSource, TaskScheduler continuationScheduler)
{
if (completionSource.Task.IsCompleted || completionSource.Task.IsCanceled || completionSource.Task.IsFaulted)
return;
try
{
if (processSequenceEnumerator.MoveNext())
{
processSequenceEnumerator.Current.ContinueWith(
continuationAction: t =>
{
if (t.IsFaulted)
completionSource.SetException(t.Exception);
else if (t.IsCanceled)
completionSource.SetCanceled();
else
ScheduleNext(processSequenceEnumerator,completionSource, continuationScheduler);
},
scheduler: continuationScheduler);
}
else
{
if (!completionSource.Task.IsCompleted)
completionSource.SetException(new AsyncProcessException());
}
}
catch (OperationCanceledException)
{
completionSource.SetCanceled();
}
catch (Exception ex)
{
completionSource.SetException(ex);
}
}
public class AsyncProcessException : Exception
{
public AsyncProcessException() : base("Async process ran to the end, but never provided a result to the completion source.") { }
}
}
public static class StreamExtensions
{
public static Task CopyToAsync(this Stream source, Stream destination, int? bufferSize = null)
{
if (destination == null)
throw new ArgumentNullException("destination");
if (!source.CanRead && !source.CanWrite)
throw new ObjectDisposedException((string)null, "Source stream has been closed.");
if (!destination.CanRead && !destination.CanWrite)
throw new ObjectDisposedException("destination", "Destination stream has been closed.");
if (!source.CanRead)
throw new NotSupportedException("Can not read from source stream");
if (!destination.CanWrite)
throw new NotSupportedException("Can not write to destination stream");
return AsyncHelper<object>.RunAsync(completionSource => CopyStreamProcess(source, destination, bufferSize ?? 81920, completionSource));
}
static IEnumerable<Task> CopyStreamProcess(Stream source, Stream destination, int bufferSize, TaskCompletionSource<object> completionSource)
{
var buffer = new byte[bufferSize];
int count;
while (true)
{
var readTask = Task.Factory.FromAsync(
beginMethod: (callback, s) => source.BeginRead(buffer, 0, bufferSize, callback, s),
endMethod: result => source.EndRead(result),
state: null);
yield return readTask;
if (readTask.Result == 0)
break;
yield return destination.WriteBytesAsync(buffer, offset: 0, count: readTask.Result);
}
completionSource.SetResult(null);
}
public static Task<byte[]> ReadBytesAsync(this Stream stream, int bytesToRead)
{
return AsyncHelper<byte[]>.RunAsync(completionSource => ReadBytesProcess(stream, bytesToRead, completionSource));
}
static IEnumerable<Task> ReadBytesProcess(Stream stream, int bytesToRead, TaskCompletionSource<byte[]> completionSource)
{
var buffer = new byte[bytesToRead];
var bytesRemaining = bytesToRead;
var currentPosition = 0;
while (bytesRemaining > 0)
{
var readTask = Task.Factory.FromAsync(
beginMethod: (callback, s) => stream.BeginRead(buffer, currentPosition, bytesRemaining, callback, s),
endMethod: result => stream.EndRead(result),
state: null);
yield return readTask;
currentPosition += readTask.Result;
bytesRemaining -= readTask.Result;
}
completionSource.SetResult(buffer);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment