Skip to content

Instantly share code, notes, and snippets.

@prabirshrestha
Created February 21, 2011 21:21
Show Gist options
  • Save prabirshrestha/837728 to your computer and use it in GitHub Desktop.
Save prabirshrestha/837728 to your computer and use it in GitHub Desktop.
EeekSoft.Asynchronous
namespace HttpWebHelperSample
{
// http://stackoverflow.com/questions/3963483/stitching-together-multiple-streams-in-one-stream-class
// todo: implement BeginRead and EndRead.
internal class CombinationStream : System.IO.Stream
{
private readonly System.Collections.Generic.IList<System.IO.Stream> streams;
private int currentStreamIndex;
private System.IO.Stream currentStream;
private long length = -1;
private long postion;
public CombinationStream(System.Collections.Generic.IList<System.IO.Stream> streams)
{
if (streams == null)
{
throw new System.ArgumentNullException("streams");
}
this.streams = streams;
if (streams.Count > 0)
{
this.currentStream = streams[currentStreamIndex++];
}
}
public override void Flush()
{
if (currentStream != null)
{
currentStream.Flush();
}
}
public override long Seek(long offset, System.IO.SeekOrigin origin)
{
throw new System.InvalidOperationException("Stream is not seekable.");
}
public override void SetLength(long value)
{
this.length = value;
}
public override int Read(byte[] buffer, int offset, int count)
{
int result = 0;
int buffPostion = offset;
while (count > 0)
{
int bytesRead = this.currentStream.Read(buffer, buffPostion, count);
result += bytesRead;
buffPostion += bytesRead;
this.postion += bytesRead;
if (bytesRead <= count)
{
count -= bytesRead;
}
if (count > 0)
{
if (this.currentStreamIndex >= this.streams.Count)
{
break;
}
this.currentStream = this.streams[this.currentStreamIndex++];
}
}
return result;
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new System.InvalidOperationException("Stream is not writable");
}
public override bool CanRead
{
get { return true; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return false; }
}
public override long Length
{
get
{
if (this.length == -1)
{
foreach (var stream in this.streams)
{
this.length += stream.Length;
}
}
return this.length;
}
}
public override long Position
{
get { return this.postion; }
set { throw new System.NotImplementedException(); }
}
}
}
// original code from http://tomasp.net/blog/csharp-async.aspx
// converted to C# 2.0 compatible code
namespace EeekSoft.Asynchronous
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Threading;
public delegate void Action();
public delegate IAsyncResult BeginDelegate(AsyncCallback callback, object state);
public delegate T EndDelegate<T>(IAsyncResult asyncResult);
public delegate void ErrorFunc<T>(T obj, Exception ex);
/// <summary>
/// Represents a type with no value - alternative to C# void in
/// situations where void can't be used
/// </summary>
public class Unit
{
private Unit() { }
static Unit()
{
Value = new Unit();
}
public static Unit Value { get; private set; }
}
/// <summary>
/// Use this cass to return a value from asynchronous method.
/// </summary>
/// <example><code>
/// // Returns hello world
/// IEnumerable&lt;IAsync&gt; Hello() {
/// yield return new Result&lt;String&gt;("Hello world");
/// }
/// </code></example>
/// <typeparam name="T"></typeparam>
public class Result<T> : IAsync
{
public T ReturnValue { get; private set; }
public Result(T value)
{
ReturnValue = value;
}
public void ExecuteStep(Action cont)
{
throw new InvalidOperationException
("Cannot call ExecuteStep on IAsync created as a 'Result'.");
}
public Exception Exception
{
get { throw new NotImplementedException(); }
}
}
/// <summary>
/// Provides several extension methods to standard System classes
/// and for executing the asynchronous methods implemented using the library
/// </summary>
public static class AsyncExtensions
{
#region System Extensions
public static Async<Stream> GetWebRequestStreamAsync(WebRequest req)
{
return new AsyncPrimitive<Stream>(req.BeginGetRequestStream, req.EndGetRequestStream);
}
/// <summary>
/// Asynchronously gets response from the internet using BeginGetResponse method.
/// </summary>
public static Async<WebResponse> GetResponseAsync(WebRequest req)
{
return new AsyncPrimitive<WebResponse>(req.BeginGetResponse, req.EndGetResponse);
}
/// <summary>
/// Asynchronously reads data from a stream using BeginRead.
/// </summary>
/// <param name="stream">The stream on which the method is called</param>
/// <param name="buffer">The buffer to read the data into</param>
/// <param name="offset">Byte offset in the buffer</param>
/// <param name="count">Maximum number of bytes to read</param>
/// <returns>Returns non-zero if there are still some data to read</returns>
public static Async<int> ReadAsync(Stream stream, byte[] buffer, int offset, int count)
{
return new AsyncPrimitive<int>(
(callback, st) => stream.BeginRead(buffer, offset, count, callback, st),
stream.EndRead);
}
/// <summary>
/// Asynchronously writes data to a stream using BeginWrite.
/// </summary>
/// <param name="stream">The stream on which method is called.</param>
/// <param name="buffer">The buffer data to be written.</param>
/// <param name="offset">Byte offset in the buffer.</param>
/// <param name="count">Maximum number of bytes to write.</param>
/// <returns>
/// </returns>
public static Async<Unit> WriteAsync(Stream stream, byte[] buffer, int offset, int count)
{
return new AsyncPrimitive(
(callback, st) => stream.BeginWrite(buffer, offset, count, callback, st), stream.EndWrite);
}
public static IEnumerable<IAsync> ReadWriteAsync(Stream input, Stream output, int bufferSize)
{
// buffer space for the data to be read and written.
byte[] buffer = new byte[bufferSize];
// while there is data to be read and written.
while (true)
{
// read data asynchronously.
// when the operatino completes, if no data could be read then we are done.
Async<int> count = ReadAsync(input, buffer, 0, bufferSize);
yield return count;
if (count.Result == 0) break;
// write data asynchronously.
yield return WriteAsync(output, buffer, 0, count.Result);
}
}
/// <summary>
/// Reads asynchronously the entire content of the stream and returns it
/// as a string using StreamReader.
/// </summary>
/// <returns>Returns string using the 'Result' class.</returns>
public static IEnumerable<IAsync> ReadToEndAsync(Stream stream)
{
MemoryStream ms = new MemoryStream();
int read = -1;
while (read != 0)
{
byte[] buffer = new byte[1024];
Async<int> count = AsyncExtensions.ReadAsync(stream, buffer, 0, 1024);
yield return count;
Console.WriteLine("[{0}] got data: {1}", "url", count.Result);
ms.Write(buffer, 0, count.Result);
read = count.Result;
}
ms.Seek(0, SeekOrigin.Begin);
string s = new StreamReader(ms).ReadToEnd();
yield return new Result<string>(s);
}
#endregion
#region Async Extensions
/// <summary>
/// Executes asynchronous method and blocks the calling thread until the operation completes.
/// </summary>
/// <param name="async"></param>
public static void ExecuteAndWait(IEnumerable<IAsync> async)
{
ManualResetEvent wh = new ManualResetEvent(false);
AsyncExtensions.Run(async.GetEnumerator(),
() => wh.Set());
wh.WaitOne();
}
/// <summary>
/// Spawns the asynchronous method without waiting for the result.
/// </summary>
/// <param name="async"></param>
public static void Execute(IEnumerable<IAsync> async)
{
AsyncExtensions.Run(async.GetEnumerator());
}
/// <summary>
/// Executes the asynchronous method in another asynchronous method,
/// and assumes that the method returns result of type T.
/// </summary>
public static Async<T> ExecuteAsync<T>(IEnumerable<IAsync> async)
{
return new AsyncWithResult<T>(async);
}
/// <summary>
/// Executes the asynchronous method in another asynchronous method,
/// and assumes that the method doesn't return any result.
/// </summary>
public static Async<Unit> ExecuteAsync(IEnumerable<IAsync> async)
{
return new AsyncWithUnitResult(async);
}
#endregion
#region Implementation
internal static void Run<T>(IEnumerator<IAsync> en, Action<T> cont)
{
if (!en.MoveNext())
throw new InvalidOperationException("Asynchronous workflow executed using"
+ "'AsyncWithResult' didn't return result using 'Result'!");
var res = (en.Current as Result<T>);
if (res != null) { cont(res.ReturnValue); return; }
en.Current.ExecuteStep
(() => { AsyncExtensions.Run<T>(en, cont); });
}
internal static void Run(IEnumerator<IAsync> en, Action cont)
{
if (!en.MoveNext()) { cont(); return; }
en.Current.ExecuteStep
(() =>
{
AsyncExtensions.Run(en, cont);
});
}
internal static void Run(IEnumerator<IAsync> en)
{
if (!en.MoveNext()) return;
en.Current.ExecuteStep
(() =>
{
AsyncExtensions.Run(en);
});
}
#endregion
}
/// <summary>
/// Provides several helper methods for working with asynchronous computations.
/// </summary>
public static class Async
{
/// <summary>
/// Combines the given asynchronous methods and returns an asynchronous method that,
/// when executed - executes the methods in parallel.
/// </summary>
public static Async<Unit> Parallel(params IEnumerable<IAsync>[] operations)
{
return new AsyncPrimitive<Unit>((cont) =>
{
bool[] completed = new bool[operations.Length];
for (int i = 0; i < operations.Length; i++)
AsyncExtensions.Execute(ExecuteAndSet(operations[i], completed, i, cont));
});
}
#region Implementation
private static IEnumerable<IAsync> ExecuteAndSet(IEnumerable<IAsync> op, bool[] flags, int index, ErrorFunc<Unit> cont)
{
foreach (IAsync async in op) yield return async;
bool allSet = true;
lock (flags)
{
flags[index] = true;
foreach (bool b in flags) if (!b) { allSet = false; break; }
}
if (allSet) cont(Unit.Value, null);
}
#endregion
}
/// <summary>
/// Represents a primitive untyped asynchronous operation.
/// This interface should be used only in asynchronous method declaration.
/// </summary>
public interface IAsync
{
void ExecuteStep(Action cont);
Exception Exception { get; }
}
/// <summary>
/// Represents an asynchronous computation that yields a result of type T.
/// </summary>
public abstract class Async<T> : IAsync
{
protected T result;
protected bool completed = false;
protected Exception exception = null;
public T Result
{
get
{
if (!completed) throw new Exception("Operation not completed, did you forgot 'yield return'?");
return result;
}
}
abstract public void ExecuteStep(Action cont);
public Exception Exception
{
get
{
if (!completed) throw new Exception("Operation not completed, did you forgot 'yield return'?");
return exception;
}
}
}
public class AsyncPrimitive<T> : Async<T>
{
Action<ErrorFunc<T>> func;
public AsyncPrimitive(Action<ErrorFunc<T>> function)
{
this.func = function;
}
public AsyncPrimitive(BeginDelegate begin, EndDelegate<T> end)
{
//this.func = (cont) => begin(delegate(IAsyncResult res) { cont(end(res)); }, null);
this.func = (cont) => begin(delegate(IAsyncResult res)
{
try
{
cont(end(res), null);
}
catch (Exception ex)
{
cont(default(T), ex);
}
}, null);
}
public override void ExecuteStep(Action cont)
{
func((res, ex) =>
{
result = res;
completed = true;
exception = ex;
cont();
});
}
}
public class AsyncPrimitive : Async<Unit>
{
Action<ErrorFunc<Unit>> func;
public AsyncPrimitive(BeginDelegate begin, Action<IAsyncResult> end)
{
//this.func = (cont) => begin(delegate(IAsyncResult res) { cont(end(res)); }, null);
this.func = (cont) => begin(delegate(IAsyncResult res)
{
try
{
end(res);
cont(null, null);
}
catch (Exception ex)
{
cont(default(Unit), ex);
}
}, null);
}
public override void ExecuteStep(Action cont)
{
func((res, ex) =>
{
result = res;
completed = true;
exception = ex;
cont();
});
}
}
public class AsyncWithResult<T> : Async<T>
{
IEnumerable<IAsync> en;
public AsyncWithResult(IEnumerable<IAsync> async)
{
en = async;
}
public override void ExecuteStep(Action cont)
{
AsyncExtensions.Run<T>(en.GetEnumerator(), (res) =>
{
completed = true;
result = res;
cont();
});
}
}
public class AsyncWithUnitResult : Async<Unit>
{
IEnumerable<IAsync> en;
public AsyncWithUnitResult(IEnumerable<IAsync> async)
{
en = async;
result = Unit.Value;
}
public override void ExecuteStep(Action cont)
{
AsyncExtensions.Run(en.GetEnumerator(), () =>
{
completed = true;
cont();
});
}
}
}
private static void Get()
{
var responseBody = new MemoryStream();
var http = new HttpWebHelper();
var request = http.CreateHttpWebRequest("http://graph.facebook.com", "/prabirshrestha", "GET", null, null, null);
AsyncExtensions.ExecuteAndWait(http.Execute(request, null, responseBody));
responseBody.Seek(0, SeekOrigin.Begin);
Console.WriteLine(new StreamReader(responseBody).ReadToEnd());
}
private static void Post()
{
var responseBody = new MemoryStream();
var postBody = new MemoryStream(Encoding.UTF8.GetBytes("message=hello"));
var http = new HttpWebHelper();
var queryStrings = new System.Collections.Generic.Dictionary<string, object>
{
{ "oauth_token", AccessToken }
};
var headers = new System.Collections.Generic.Dictionary<string, object>
{
{ "content-type", "application/x-www-form-urlencoded" },
{ "content-length", postBody.Length }
};
var request = http.CreateHttpWebRequest("https://graph.facebook.com", "/me/feed", "POST", queryStrings, headers, null);
AsyncExtensions.ExecuteAndWait(http.Execute(request, postBody, responseBody));
responseBody.Seek(0, SeekOrigin.Begin);
Console.WriteLine(new StreamReader(responseBody).ReadToEnd());
}
namespace HttpWebHelperSample
{
using EeekSoft.Asynchronous;
internal class HttpWebHelper
{
public enum ResponseStatus
{
Non,
Completed,
Error
}
public virtual string BuildRequestUrl(string baseUrl, string resourcePath, System.Collections.Generic.IDictionary<string, object> queryStrings)
{
var sb = new System.Text.StringBuilder();
if (string.IsNullOrEmpty(baseUrl))
{
throw new System.ArgumentNullException("baseUrl");
}
sb.Append(baseUrl);
sb.Append(AddStartingSlashIfNotPresent(resourcePath));
sb.Append("?");
if (queryStrings != null)
{
foreach (var queryString in queryStrings)
{
// note: assume queryString is already url encoded.
sb.AppendFormat("{0}={1}&", queryString.Key, queryString.Value);
}
}
// remote the last & or ?
--sb.Length;
return sb.ToString();
}
internal protected virtual void SetHttpWebRequestHeaders(System.Net.HttpWebRequest httpWebRequest, System.Collections.Generic.IDictionary<string, object> requestHeaders)
{
if (httpWebRequest == null)
{
throw new System.ArgumentNullException("httpWebRequest");
}
if (requestHeaders == null)
{
return;
}
foreach (var requestHeader in requestHeaders)
{
var key = requestHeader.Key;
var value = requestHeader.Value;
// todo: add more special headers
if (key.Equals("content-type", System.StringComparison.OrdinalIgnoreCase))
{
httpWebRequest.ContentType = value.ToString();
}
else if (key.Equals("content-length", System.StringComparison.OrdinalIgnoreCase))
{
httpWebRequest.ContentLength = System.Convert.ToInt64(value);
}
else if (key.Equals("user-agent", System.StringComparison.OrdinalIgnoreCase))
{
httpWebRequest.UserAgent = value.ToString();
}
else
{
httpWebRequest.Headers.Add(key, value.ToString());
}
}
}
internal protected virtual System.Collections.Generic.IDictionary<string, object> ExtractResponseHeaders(System.Net.HttpWebResponse response)
{
var responseHeaders = new System.Collections.Generic.Dictionary<string, object>();
for (int i = 0; i < response.Headers.Count; i++)
{
responseHeaders.Add(response.Headers.GetKey(i), response.Headers[i]);
}
return responseHeaders;
}
public virtual System.Net.HttpWebRequest CreateHttpWebRequest(string baseUrl, string resourcePath, string httpMethod, System.Collections.Generic.IDictionary<string, object> queryStrings, System.Collections.Generic.IDictionary<string, object> requestHeaders, System.Collections.Generic.IDictionary<string, object> requestCookies)
{
var httpWebRequest = (System.Net.HttpWebRequest)System.Net.WebRequest.Create(BuildRequestUrl(baseUrl, resourcePath, queryStrings));
httpWebRequest.Method = httpMethod ?? "GET";
#if !SILVERLIGHT
if (requestCookies != null)
{
foreach (var requestCookie in requestCookies)
{
httpWebRequest.CookieContainer.Add(new System.Net.Cookie(requestCookie.Key, requestCookie.Value.ToString()) { Domain = httpWebRequest.RequestUri.Host });
}
}
#endif
#if !WINDOWS_PHONE
if (!httpWebRequest.Method.Equals("GET", System.StringComparison.OrdinalIgnoreCase))
{
// set default content-length to 0 if it is not GET.
httpWebRequest.ContentLength = 0;
}
#endif
SetHttpWebRequestHeaders(httpWebRequest, requestHeaders);
return httpWebRequest;
}
public virtual System.Collections.Generic.IEnumerable<IAsync> Execute(
System.Net.HttpWebRequest httpWebRequest,
System.IO.Stream requestBody,
System.IO.Stream responseSaveStream)
{
if (httpWebRequest == null)
{
throw new System.ArgumentNullException("httpWebRequest");
}
System.Console.WriteLine("[{0}] starting", httpWebRequest.RequestUri);
if (requestBody != null && requestBody.Length != 0)
{
// we have a request body.
// assume that content-type and content-length is already set.
var requestStream = AsyncExtensions.GetWebRequestStreamAsync(httpWebRequest);
yield return requestStream;
// write the request body.
var readWriteRequest = AsyncExtensions.ReadWriteAsync(requestBody, requestStream.Result, 1024);
foreach (var readWriteAsync in readWriteRequest)
{
yield return readWriteAsync;
}
}
// asynchronously get the response from the http server.
var response = AsyncExtensions.GetResponseAsync(httpWebRequest);
yield return response;
var httpWebResponse = (System.Net.HttpWebResponse)response.Result;
System.Exception exception = null;
if (response.Exception != null)
{
exception = response.Exception;
// exception occured
if (response.Exception is System.Net.WebException)
{
httpWebResponse = (System.Net.HttpWebResponse)((System.Net.WebException)exception).Response;
}
}
if (exception != null && !(exception is System.Net.WebException || exception is System.Security.SecurityException))
{
// critical error occurred.
// set response status as error
// and return the exception.
}
else
{
// we have got the response
if (httpWebResponse == null)
{
// most likely no internet connection or silverlight cross domain policy exception.
// set response status as error
// and return the exception.
}
else
{
// we have the response headers.
System.Console.WriteLine("[{0}] got response", httpWebResponse.ResponseUri);
// extract response headers.
var responseHeaders = this.ExtractResponseHeaders(httpWebResponse);
// extract cookies
// extract http status code
// read response stream
var responseStream = httpWebResponse.GetResponseStream();
// todo: gzip/deflate support
if (responseSaveStream == null)
{
// read the response stream asynchronosuly but don't write.
var readResponse = AsyncExtensions.ReadToEndAsync(responseStream);
foreach (var readResponseAsync in readResponse)
{
yield return readResponseAsync;
}
}
else
{
// read the response buffer asynchronously and then write the buffer asynchronously.
var readWriteResponse = AsyncExtensions.ReadWriteAsync(responseStream, responseSaveStream, 1024);
foreach (var readWriteAsync in readWriteResponse)
{
yield return readWriteAsync;
}
}
}
}
}
internal static string AddStartingSlashIfNotPresent(string input)
{
if (string.IsNullOrEmpty(input))
{
return "/";
}
// if not null or empty
if (input[0] != '/')
{
// if doesn't start with / then add /
return "/" + input;
}
else
{
// else return the original input.
return input;
}
}
}
}
public class async_sample
{
public static void Run()
{
// Download the URLs and wait until all of them complete
AsyncExtensions.ExecuteAndWait(DownloadAll());
Console.ReadLine();
}
/// <summary>
/// Asynchronous method that downloads the specified url and prints the HTML title
/// </summary>
static IEnumerable<IAsync> AsyncMethod(string url)
{
WebRequest req = HttpWebRequest.Create(url);
Console.WriteLine("[{0}] starting", url);
// asynchronously get the response from http server
Async<WebResponse> response = AsyncExtensions.GetResponseAsync(req);
yield return response;
Console.WriteLine("[{0}] got response", url);
Stream resp = response.Result.GetResponseStream();
// download HTML using the asynchronous extension method
// instead of using synchronous StreamReader
Async<string> html = AsyncExtensions.ExecuteAsync<string>(AsyncExtensions.ReadToEndAsync(resp));
yield return html;
// extract and print the HTML title
Regex reg = new Regex(@"<title[^>]*>(.*)</title[^>]*>");
string title = reg.Match(html.Result).Groups[1].Value;
title = "".PadLeft((78 - title.Length) / 2) +
title + "".PadRight((78 - title.Length) / 2);
Console.WriteLine("[{0}] completed\n{2}\n{1}\n{2}",
url, title, "".PadRight(79, '*'));
}
/// <summary>
/// Method which performs several HTTP requests asyncrhonously in parallel
/// </summary>
static IEnumerable<IAsync> DownloadAll()
{
var methods = Async.Parallel(
AsyncMethod("http://www.microsoft.com"),
AsyncMethod("http://www.google.com"),
AsyncMethod("http://www.apple.com"),
AsyncMethod("http://www.novell.com"));
yield return methods;
Console.WriteLine("Completed all!");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment