Created
December 12, 2012 16:29
-
-
Save LeeCampbell/4269282 to your computer and use it in GitHub Desktop.
Thoughts on a simple Messaging API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
namespace MyLib.Messaging | |
{ | |
public interface IConsumer<out T> | |
{ | |
///<summary>Allows non-destructive read access to the next message on the queue</summary> | |
T Peek(); | |
///<summary>Transactional consumer. Requires a transaction scope to be accessed.</summary> | |
///<example><code><![[ | |
///using(TransactionScope tran = new TransactionScope()) | |
///{ | |
/// var bookingRequest = _bookingRequestConsumer.Receive<BookingRequest>(TimeSpan.FromMinutes(1)); | |
/// var booking = _bookingFactory.Create(bookingRequest); | |
/// var reciept = booking.Save(); | |
/// _bookingReceiptPublisher.Publish(recipt); | |
/// tran.Commit(); | |
///} | |
///]]></code></example> | |
T Receive(TimeSpan timeout); | |
} | |
public interface IListener<out T> | |
{ | |
IObservable<T> ReceivedMessages(); | |
} | |
public interface IMapper<in T, in TMessage> | |
{ | |
void Map(T source, TMessage target); | |
} | |
public interface IPublisher<in T> | |
{ | |
void Publish(T message); | |
} | |
public interface IRequestor<in TRequest, out TResponse> | |
{ | |
IObservable<TResponse> Request(TRequest request); | |
} | |
//E.g. | |
//ProtocolBufferSerialzier<T> : ISerializer<T, byte[]> | |
//XmlSerializer<T> : ISerializer<T, XElement> | |
//JSONSerializer<T> : ISerializer<T, string> | |
public interface ISerializer<T, TSerialized> | |
{ | |
TSerialized Serialize(T input); | |
T Deserialize(TSerialized input); | |
} | |
public interface ITranslator<in TInput, out TOutput> | |
{ | |
TOutput Translate(TInput source); | |
} | |
} | |
namespace MyLib.Messaging.Http | |
{ | |
public sealed class HttpRequestParameters | |
{ | |
private readonly Uri _rootUrl; | |
private readonly NameValueCollection _queryStringParameters = new NameValueCollection(); | |
private readonly NameValueCollection _postParameters = new NameValueCollection(); | |
private readonly NameValueCollection _headers = new NameValueCollection(); | |
public HttpRequestParameters(Uri rootUrl) | |
{ | |
_rootUrl = rootUrl; | |
} | |
public Uri RootUrl | |
{ | |
get { return _rootUrl; } | |
} | |
public NameValueCollection QueryStringParameters | |
{ | |
get { return _queryStringParameters; } | |
} | |
public NameValueCollection PostParameters | |
{ | |
get { return _postParameters; } | |
} | |
public NameValueCollection Headers | |
{ | |
get { return _headers; } | |
} | |
public Uri ConstructUri() | |
{ | |
var uriBuilder = new UriBuilder(RootUrl); | |
if (QueryStringParameters.Count > 0) | |
{ | |
var queryString = System.Web.HttpUtility.ParseQueryString(string.Empty); | |
foreach (var key in QueryStringParameters.AllKeys) | |
{ | |
queryString[key] = QueryStringParameters[key]; | |
} | |
uriBuilder.Query = queryString.ToString(); // Returns "key1=value1&key2=value2", all URL-encoded | |
} | |
return uriBuilder.Uri; | |
} | |
public override string ToString() | |
{ | |
var headers = ToString(Headers); | |
var post = ToString(PostParameters); | |
return string.Format("{0} HEADERS:{1} POST:{2}", ConstructUri(), headers, post); | |
} | |
private static string ToString(NameValueCollection nvc) | |
{ | |
return string.Join(", ", nvc.AllKeys.Select(k => string.Format("[{0}:{1}]", k, nvc[k]))); | |
} | |
} | |
//TODO: deal with potential Struct/Value return types | |
//TODO: provide cancelation feature | |
//TODO: Test POST, GET, Adding Headers, sending binary etc.. | |
public abstract class HttpRequestor<TRequest, TResponse> : IRequestor<TRequest, TResponse> | |
{ | |
private readonly IMapper<TRequest, HttpRequestParameters> _requestMapper; | |
private readonly ITranslator<HttpWebResponse, TResponse> _responseTranslator; | |
protected HttpRequestor(IMapper<TRequest, HttpRequestParameters> requestMapper, ITranslator<HttpWebResponse, TResponse> responseTranslator) | |
{ | |
_requestMapper = requestMapper; | |
_responseTranslator = responseTranslator; | |
} | |
public IObservable<TResponse> Request(TRequest request) | |
{ | |
return Observable.Create<TResponse>( | |
o => | |
{ | |
try | |
{ | |
var httpRequestParams = new HttpRequestParameters(RequestUri); | |
_requestMapper.Map(request, httpRequestParams); | |
var webRequest = CreateRequest(httpRequestParams); | |
var webResponse = (HttpWebResponse)webRequest.GetResponse(); | |
var response = _responseTranslator.Translate(webResponse); | |
if (response != null) | |
{ | |
o.OnNext(response); | |
} | |
o.OnCompleted(); | |
} | |
catch (Exception e) | |
{ | |
o.OnError(e); | |
} | |
return Disposable.Empty; | |
}); | |
} | |
protected virtual HttpWebRequest CreateRequest(HttpRequestParameters requestParameters) | |
{ | |
var queryUri = requestParameters.ConstructUri(); | |
var httpWebRequest = (HttpWebRequest)WebRequest.Create(queryUri); | |
httpWebRequest.Method = RequestMethod; | |
httpWebRequest.ContentType = RequestContentType; | |
//webRequest.Proxy = new System.Net.WebProxy(ProxyString, true); //true means no proxy | |
//Proxy can be added in overridden implementation? | |
foreach (var key in requestParameters.Headers.AllKeys) | |
{ | |
httpWebRequest.Headers.Add(key, requestParameters.Headers[key]); | |
} | |
if (requestParameters.PostParameters.Count > 0) | |
{ | |
var postArguments = System.Web.HttpUtility.ParseQueryString(string.Empty); | |
foreach (var key in requestParameters.PostParameters.AllKeys) | |
{ | |
postArguments[key] = requestParameters.PostParameters[key]; | |
} | |
using (var requestStream = httpWebRequest.GetRequestStream()) | |
using (var writer = new StreamWriter(requestStream)) | |
{ | |
writer.Write(postArguments.ToString()); | |
} | |
} | |
return httpWebRequest; | |
} | |
protected abstract Uri RequestUri { get; } | |
protected virtual string RequestMethod | |
{ | |
get { return "GET"; } | |
} | |
protected virtual string RequestContentType | |
{ | |
get { return null; } | |
} | |
} | |
} | |
/* | |
using System; | |
using System.IO; | |
using System.Net; | |
using System.Reactive; | |
using System.Reactive.Linq; | |
using NUnit.Framework; | |
namespace MyLib.Messaging.Http.IntegrationTests | |
{ | |
public sealed class GoogleHttpRequestor : HttpRequestor<string, string> | |
{ | |
public GoogleHttpRequestor(IMapper<string, HttpRequestParameters> requestMapper, ITranslator<WebResponse, string> responseTranslator) | |
: base(requestMapper, responseTranslator) | |
{ | |
} | |
protected override Uri RequestUri | |
{ | |
get { return new Uri("http://www.google.com"); } | |
} | |
} | |
public class GoogleRequestMapper : IMapper<string, HttpRequestParameters> | |
{ | |
public void Map(string searchText, HttpRequestParameters target) | |
{ | |
target.QueryStringParameters.Add("q", searchText); | |
} | |
} | |
public sealed class IwsHttpRequestor : HttpRequestor<Unit, byte[]> | |
{ | |
public IwsHttpRequestor(IMapper<Unit, HttpRequestParameters> requestMapper, ITranslator<HttpWebResponse, byte[]> responseTranslator) | |
: base(requestMapper, responseTranslator) | |
{ | |
} | |
protected override Uri RequestUri | |
{ | |
get { return new Uri("http://lndvs415:8080/fixing/version"); } //Dev | |
//get { return new Uri("http://lndvs415.ln.rbccm.com:8090/fixing/version"); } //SIT (IWS QA) | |
//get { return new Uri("http://lnpss191.ln.rbccm.com:8080/fixing/version"); } //QA (IWS UAT) | |
} | |
protected override string RequestContentType | |
{ | |
get { return "application/text"; } | |
} | |
} | |
public sealed class NullRequestMapper : IMapper<Unit, HttpRequestParameters> | |
{ | |
public static readonly NullRequestMapper Instance = new NullRequestMapper(); | |
public void Map(Unit _, HttpRequestParameters target) | |
{ | |
} | |
} | |
public class WebResponseTranslator : ITranslator<WebResponse, string> | |
{ | |
public string Translate(WebResponse source) | |
{ | |
var responseStream = source.GetResponseStream(); | |
if (responseStream == null) return null; | |
using (var reader = new StreamReader(responseStream)) | |
{ | |
var payload = reader.ReadToEnd(); | |
return payload; | |
} | |
} | |
} | |
public class BinaryWebResponseTranslator : ITranslator<HttpWebResponse, byte[]> | |
{ | |
public byte[] Translate(HttpWebResponse wr) | |
{ | |
if (wr.StatusCode != HttpStatusCode.OK || wr.ContentLength <= 0) | |
return new byte[0]; | |
using (var s = wr.GetResponseStream()) | |
{ | |
if (s == null) | |
throw new InvalidOperationException("WebResponse contained a null response stream"); | |
//s.ReadTimeout = TimeoutMs; | |
long bufferPos = 0; | |
var buffer = new byte[wr.ContentLength]; | |
int b; | |
while (bufferPos < buffer.Length && (b = s.ReadByte()) != -1) | |
buffer[bufferPos++] = (byte)b; | |
if (bufferPos != wr.ContentLength) | |
throw new InvalidOperationException("WebResponse failed to receive correct payload"); | |
return buffer; | |
} | |
} | |
} | |
[TestFixture] | |
public class HttpGetRequestorFixture | |
{ | |
//Need to test : | |
// query string e.g. http://www.google.com?qJames520Dean | |
// REST GET e.g. http://contacts.google.com/contact/12345/?json | |
// REST PUT e.g. http://contacts.google.com/contact/12345/?json | |
// REST POST? e.g. http://contacts.google.com/contact/12345/?json | |
// Post e.g. Submitting a form | |
// Post e.g. pushing binary data | |
[Test] | |
public void Should_hit_google_machine() | |
{ | |
var requestor = new GoogleHttpRequestor(new GoogleRequestMapper(), new WebResponseTranslator()); | |
var result = requestor.Request("James Dean").First(); | |
Assert.AreEqual("blah", result); | |
} | |
[Test] | |
public void Should_hit_IWS_version_service() | |
{ | |
var requestor = new IwsHttpRequestor(NullRequestMapper.Instance, new BinaryWebResponseTranslator()); | |
var result = requestor.Request(Unit.Default).First(); | |
foreach (var b in result) | |
{ | |
Console.Write("[{0:x2}]", b); | |
} | |
Console.WriteLine(); | |
Assert.AreEqual(8, result.Length); | |
} | |
} | |
} | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment