Created
September 10, 2012 19:37
-
-
Save rpgmaker/3693294 to your computer and use it in GitHub Desktop.
PSBClient
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Windows; | |
using System.Linq; | |
using System.Collections.Generic; | |
using System.ComponentModel; | |
#if !WINDOWS_PHONE | |
using System.Windows.Browser; | |
#endif | |
namespace | |
#if WINDOWS_PHONE | |
PServiceBus.WindowsPhone | |
#else | |
PServiceBus.Silverlight | |
#endif | |
{ | |
public static class PSBClient { | |
const string USERNAME_KEY = "pservicebus_username_info", | |
ESBTOPIC_HEADERS = "ESBTOPIC_HEADERS", | |
STREAM_URL = "StreamSubscriberMessages/?subscriberName={0}&transportName={1}&messageTypeInfo={2}&batchSize={3}&interval={4}&transport=httpstreaming"; | |
static string _username; | |
readonly static Dictionary<string, HttpStreaming> _handlers | |
= new Dictionary<string, HttpStreaming>(); | |
readonly static Type _descriptionAttributeType = | |
typeof(DescriptionAttribute); | |
readonly static Type _topicHeaderAttributeType = | |
typeof(TopicHeaderAttribute); | |
static PSBClient() { | |
Endpoint = "http://localhost:8087/ESBRestService/"; | |
Address = "localhost:5672;userID=guest;password=guest"; | |
Transport = TransportType.RabbitMQ; | |
SetupExitEvent(); | |
} | |
private static void SetupExitEvent() { | |
#if WINDOWS_PHONE | |
Application.Current.Exit += (s, e) => Disconnect(); | |
var phone = Application.Current.RootVisual as Microsoft.Phone.Controls.PhoneApplicationFrame; | |
if (phone != null) phone.BackKeyPress += (s, arg) => Disconnect(); | |
#else | |
if (Application.Current.IsRunningOutOfBrowser) | |
Application.Current.Exit += (s, e) => Disconnect(); | |
else { | |
HtmlPage.Window.Eval(String.Format(ScriptConstants.InitDurableFlag, Durable.ToString().ToLower())); | |
HtmlPage.Window.Eval(ScriptConstants.InvokeUrl); | |
HtmlPage.Window.Eval(String.Format(ScriptConstants.BeforeUnload, | |
Endpoint, ThrowException, ApiKey, PassCode, UserName)); | |
Application.Current.Exit += (s, e) => Cleanup(); | |
} | |
#endif | |
} | |
private static void RegisterTopic(string name, string description = null, | |
Dictionary<string, string> contract = null) { | |
if (name == null) throw new ArgumentNullException("name"); | |
RestHelper.Invoke("RegisterTopic", | |
new Dictionary<string, object> { | |
{"topicData", | |
new Dictionary<string, object>{ | |
{"ContractDict", contract ?? new Dictionary<string,string>()}, | |
{"TopicName", name}, | |
{"TopicDescription", description ?? name} | |
}.ToJson()} | |
}); | |
} | |
private static void UnRegisterTopic(string name) { | |
if (name == null) throw new ArgumentNullException("name"); | |
RestHelper.Invoke("DeleteTopic", | |
new Dictionary<string, object> { { "name", name } }); | |
} | |
public static void Register(string name, string description = null) { | |
RegisterTopic(name, description); | |
} | |
public static void Register<T>() where T : class { | |
var type = typeof(T); | |
var name = type.Name; | |
var typeAttributes = type.GetCustomAttributes(_descriptionAttributeType, true); | |
var description = typeAttributes != null && typeAttributes.Length > 0 ? | |
(typeAttributes[0] as DescriptionAttribute).Description : string.Empty; | |
var contract = new Dictionary<string, string>(); | |
var props = type.GetProperties(); | |
foreach (var prop in props) { | |
if (!(prop.CanWrite && prop.CanRead)) continue; | |
var propDescAttribute = prop.GetCustomAttributes(_descriptionAttributeType, true); | |
var propTopicHeaderAttribute = prop.GetCustomAttributes(_topicHeaderAttributeType, true); | |
if (propTopicHeaderAttribute != null && propTopicHeaderAttribute.Length > 0) continue; | |
var propDesc = propDescAttribute != null && propDescAttribute.Length > 0 ? | |
(propDescAttribute[0] as DescriptionAttribute).Description : string.Empty; | |
contract[prop.Name] = propDesc; | |
} | |
RegisterTopic(name, description, contract); | |
} | |
public static void UnRegister(string name) { | |
UnRegisterTopic(name); | |
} | |
public static void UnRegister<T>() where T : class { | |
UnRegisterTopic(typeof(T).Name); | |
} | |
public static void Subscribe<T>(Action<T> callback, string filter = null, TimeSpan? interval = null, int batchSize = 1) where T : class, new() { | |
filter = filter ?? string.Empty; | |
interval = interval ?? TimeSpan.FromMilliseconds(5); | |
var type = typeof(T); | |
var topicName = type.Name; | |
var headerProperty = type.GetProperties().FirstOrDefault(x => x.GetCustomAttributes(_topicHeaderAttributeType, false).Any()); | |
var needHeader = headerProperty != null; | |
RestHelper.Invoke("SelectSubscriber", | |
new Dictionary<string, object> { { "name", UserName } }, | |
s => { | |
var subscriber = s.FromJson() as Dictionary<object, object>; | |
var isValid = (bool)subscriber["IsValid"]; | |
var topics = subscriber["Topics"] as Dictionary<object, object>; | |
var action = new Action(() => { | |
if (!topics.ContainsKey(topicName)) { | |
RestHelper.Invoke("SubscribeTo", | |
new Dictionary<string, object> { | |
{"subscriber", UserName}, | |
{"topicName", topicName}, | |
{"filter", filter}, | |
{"needHeader", needHeader} | |
}, | |
_ => { | |
RestHelper.Invoke("AddTransport", | |
new Dictionary<string, object> | |
{ | |
{"subscriber", UserName}, | |
{"transportName", topicName}, | |
{"topicName", topicName}, | |
{"transportType", (int)Transport}, | |
{"transportData", GetTransportData(topicName).ToJson()} | |
}, | |
__ => { | |
var handler = | |
new HttpStreaming(String.Concat(Endpoint, | |
String.Format(STREAM_URL, UserName, topicName, | |
new T().ToJson().Replace("null","\"\""), | |
batchSize, interval.Value.TotalMilliseconds ))); | |
_handlers[topicName] = handler; | |
handler.OnReceived = value => { | |
if (needHeader) value = value.Replace(ESBTOPIC_HEADERS, headerProperty.Name); | |
callback(value.FromJson<T>()); | |
}; | |
handler.Start(); | |
}); | |
}); | |
} | |
}); | |
if (!isValid) | |
RestHelper.Invoke("CreateSubscriber", | |
new Dictionary<string, object> { { "subscriber", UserName } }, | |
_ => action()); | |
else | |
action(); | |
}); | |
} | |
private static string ParseAddress(string topicName) { | |
if (Transport == TransportType.MSMQ) | |
return String.Concat(Address, topicName, UserName); | |
return String.Concat(Address, ";queue=", topicName, UserName); | |
} | |
private static Dictionary<string, object> GetTransportData(string topicName) { | |
switch (Transport) { | |
case TransportType.MSMQ: | |
case TransportType.RabbitMQ: | |
case TransportType.Redis: | |
return new Dictionary<string, object> { | |
{"Format", 0}, | |
{"Path", ParseAddress(topicName)} | |
}; | |
default: | |
var tokens = Address.Split(':'); | |
var useSSL = tokens.Length > 2 ? | |
tokens[2].Equals("true", StringComparison.OrdinalIgnoreCase) : false; | |
var ipAddress = tokens[0]; | |
var port = Int32.Parse(tokens[1]); | |
return new Dictionary<string, object> { | |
{"Format", 0}, | |
{"IPAddress", ipAddress}, | |
{"Port", port}, | |
{"UseSSL", useSSL} | |
}; | |
} | |
} | |
private static void UnSubscribeFromTopic(string topicName) { | |
if (topicName == null) throw new ArgumentNullException("topicName"); | |
var handler = default(HttpStreaming); | |
RestHelper.Invoke("UnSubscribeFrom", | |
new Dictionary<string, object> { | |
{"subscriber", UserName}, | |
{"topicName", topicName} | |
}); | |
RestHelper.Invoke("DeleteTransport", | |
new Dictionary<string, object> { | |
{"subscriber", UserName}, | |
{"transportName", topicName} | |
}); | |
if (_handlers.TryGetValue(topicName, out handler)) { | |
_handlers.Remove(topicName); | |
handler.Stop(); | |
} | |
} | |
public static void UnSubscribe<T>() { | |
UnSubscribeFromTopic(typeof(T).Name); | |
} | |
public static void UnSubscribe(string topicName) { | |
UnSubscribeFromTopic(topicName); | |
} | |
public static void Publish(string topicName, object message, string groupID = null, int sequenceID = 0, TimeSpan? expiresIn = null, Dictionary<string, string> headers = null) { | |
expiresIn = expiresIn ?? TimeSpan.FromDays(30); | |
headers = headers ?? new Dictionary<string, string>(); | |
if (!String.IsNullOrWhiteSpace(groupID) && sequenceID > 0) { | |
headers["ESB_GROUP_ID"] = groupID; | |
headers["ESB_SEQUENCE_ID"] = sequenceID.ToString(); | |
} | |
RestHelper.Invoke("PublishTopic", | |
new Dictionary<string, object> { | |
{"topicName", topicName }, | |
{"topicData", new Dictionary<string,object>{ | |
{"Headers", headers}, | |
{"ExpiresIn", expiresIn.Value.TotalMilliseconds}} | |
.ToJson()}, | |
{"publishData", new []{ message }.ToJson()} | |
}); | |
} | |
public static void Publish<T>(T message, string groupID = null, int sequenceID = 0, TimeSpan? expiresIn = null, Dictionary<string, string> headers = null) where T : class { | |
var type = typeof(T); | |
Publish(type.Name, message, groupID, sequenceID, expiresIn, headers); | |
} | |
public static void Disconnect() { | |
var username = UserName; | |
var action = new Action<string>(name => { | |
RestHelper.Invoke("Disconnect", | |
new Dictionary<string, object> { { "subscriberName", name } }); | |
}); | |
Cleanup(); | |
if (!Durable) | |
RestHelper.Invoke("DeleteSubscriber", new Dictionary<string, object> { { "name", username } }, | |
_ => action(username)); | |
else | |
action(username); | |
} | |
private static void Cleanup() { | |
if (!Durable) LocalStorage.Delete(USERNAME_KEY); | |
foreach (var kv in _handlers) { | |
var handler = kv.Value; | |
handler.Stop(); | |
} | |
_handlers.Clear(); | |
if (OnDisconnect != null) OnDisconnect(); | |
} | |
public static void ReThrowException(bool value) { | |
ThrowException = value; | |
} | |
private static string UserName { | |
get { | |
if (!String.IsNullOrWhiteSpace(_username)) return _username; | |
_username = LocalStorage.Get<string>(USERNAME_KEY); | |
if (_username != null) Durable = true; | |
var usernamePrefix = string.Empty; | |
#if WINDOWS_PHONE | |
usernamePrefix = "WP"; | |
#else | |
usernamePrefix = "SL"; | |
#endif | |
if (String.IsNullOrWhiteSpace(_username)) | |
_username = String.Concat(usernamePrefix, Guid.NewGuid().ToString()); | |
if (Durable) LocalStorage.Set(_username, USERNAME_KEY); | |
return _username; | |
} | |
} | |
internal static bool ThrowException { get; set; } | |
private static bool _durable; | |
public static bool Durable { internal get { return _durable; } | |
set { | |
_durable = value; | |
#if !WINDOWS_PHONE | |
HtmlPage.Window.Eval(String.Format(ScriptConstants.SetDurableFlag, value.ToString().ToLower())); | |
#endif | |
} | |
} | |
private static string _endpoint; | |
public static string Endpoint { | |
get { | |
return _endpoint + (_endpoint.EndsWith("/") ? string.Empty : "/"); | |
} set { _endpoint = value; } | |
} | |
public static string ApiKey { internal get; set; } | |
public static string PassCode { internal get; set; } | |
public static string Address { internal get; set; } | |
public static TransportType Transport { internal get; set; } | |
public static Action OnDisconnect { get; set; } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment