Skip to content

Instantly share code, notes, and snippets.

@rpgmaker
Created September 11, 2012 14:48
Show Gist options
  • Save rpgmaker/3699308 to your computer and use it in GitHub Desktop.
Save rpgmaker/3699308 to your computer and use it in GitHub Desktop.
PSBClient Updated
using System;
using System.Windows;
using System.Linq;
using System.Collections.Generic;
using System.ComponentModel;
using System.Threading;
#if !WINDOWS_PHONE
using System.Windows.Browser;
#endif
namespace
#if WINDOWS_PHONE
PServiceBus.WindowsPhone
#else
PServiceBus.Silverlight
#endif
{
public static class PSBClient {
const string USERNAME_KEY = "pservicebus_username_info",
ESBTOPIC_HEADERS = "ESBTOPIC_HEADERS",
STREAM_URL = "StreamSubscriberMessages/?subscriberName={0}&transportName={1}&messageTypeInfo={2}&batchSize={3}&interval={4}&transport=httpstreaming";
static string _username;
readonly static Dictionary<string, HttpStreaming> _handlers
= new Dictionary<string, HttpStreaming>();
readonly static Type _descriptionAttributeType =
typeof(DescriptionAttribute);
readonly static Type _topicHeaderAttributeType =
typeof(TopicHeaderAttribute);
static PSBClient() {
Endpoint = "http://localhost:8087/ESBRestService/";
Address = "localhost:5672;userID=guest;password=guest";
Transport = TransportType.RabbitMQ;
SetupExitEvent();
}
private static void SetupExitEvent() {
#if WINDOWS_PHONE
Application.Current.Exit += (s, e) => Disconnect();
var phone = Application.Current.RootVisual as Microsoft.Phone.Controls.PhoneApplicationFrame;
if (phone != null) phone.BackKeyPress += (s, arg) => Disconnect();
#else
if (Application.Current.IsRunningOutOfBrowser)
Application.Current.Exit += (s, e) => Disconnect();
else {
HtmlPage.Window.Eval(String.Format(ScriptConstants.InitDurableFlag, Durable.ToString().ToLower()));
HtmlPage.Window.Eval(ScriptConstants.InvokeUrl);
HtmlPage.Window.Eval(String.Format(ScriptConstants.BeforeUnload,
Endpoint, ThrowException, ApiKey, PassCode, UserName));
Application.Current.Exit += (s, e) => Cleanup();
}
#endif
}
private static void RegisterTopic(string name, string description = null,
Dictionary<string, string> contract = null) {
if (name == null) throw new ArgumentNullException("name");
RestHelper.Invoke("RegisterTopic",
new Dictionary<string, object> {
{"topicData",
new Dictionary<string, object>{
{"ContractDict", contract ?? new Dictionary<string,string>()},
{"TopicName", name},
{"TopicDescription", description ?? name}
}.ToJson()}
});
}
private static void UnRegisterTopic(string name) {
if (name == null) throw new ArgumentNullException("name");
RestHelper.Invoke("DeleteTopic",
new Dictionary<string, object> { { "name", name } });
}
public static void Register(string name, string description = null) {
RegisterTopic(name, description);
}
public static void Register<T>() where T : class {
var type = typeof(T);
var name = type.Name;
var typeAttributes = type.GetCustomAttributes(_descriptionAttributeType, true);
var description = typeAttributes != null && typeAttributes.Length > 0 ?
(typeAttributes[0] as DescriptionAttribute).Description : string.Empty;
var contract = new Dictionary<string, string>();
var props = type.GetProperties();
foreach (var prop in props) {
if (!(prop.CanWrite && prop.CanRead)) continue;
var propDescAttribute = prop.GetCustomAttributes(_descriptionAttributeType, true);
var propTopicHeaderAttribute = prop.GetCustomAttributes(_topicHeaderAttributeType, true);
if (propTopicHeaderAttribute != null && propTopicHeaderAttribute.Length > 0) continue;
var propDesc = propDescAttribute != null && propDescAttribute.Length > 0 ?
(propDescAttribute[0] as DescriptionAttribute).Description : string.Empty;
contract[prop.Name] = propDesc;
}
RegisterTopic(name, description, contract);
}
public static void UnRegister(string name) {
UnRegisterTopic(name);
}
public static void UnRegister<T>() where T : class {
UnRegisterTopic(typeof(T).Name);
}
public static void Subscribe<T>(Action<T> callback, string filter = null, TimeSpan? interval = null, int batchSize = 1) where T : class, new() {
filter = filter ?? string.Empty;
interval = interval ?? TimeSpan.FromMilliseconds(5);
var type = typeof(T);
var topicName = type.Name;
var headerProperty = type.GetProperties().FirstOrDefault(x => x.GetCustomAttributes(_topicHeaderAttributeType, false).Any());
var needHeader = headerProperty != null;
var reset = new ManualResetEvent(false);
var bw = new BackgroundWorker();
bw.DoWork += (s, e) => {
reset.WaitOne();
var handler =
new HttpStreaming(String.Concat(Endpoint,
String.Format(STREAM_URL, UserName, topicName,
new T().ToJson().Replace("null", "\"\""),
batchSize, interval.Value.TotalMilliseconds)));
_handlers[topicName] = handler;
handler.OnReceived = value =>
{
if (needHeader) value = value.Replace(ESBTOPIC_HEADERS, headerProperty.Name);
callback(value.FromJson<T>());
};
handler.Start();
};
bw.RunWorkerAsync();
RestHelper.Invoke("SelectSubscriber",
new Dictionary<string, object> { { "name", UserName } },
s => {
var subscriber = s.FromJson() as Dictionary<object, object>;
var isValid = (bool)subscriber["IsValid"];
var topics = subscriber["Topics"] as Dictionary<object, object>;
var action = new Action(() =>
{
if (topics.ContainsKey(topicName)) {
reset.Set();
return;
}
RestHelper.Invoke("SubscribeTo",
new Dictionary<string, object> {
{"subscriber", UserName},
{"topicName", topicName},
{"filter", filter},
{"needHeader", needHeader}
},
_ =>
{
RestHelper.Invoke("AddTransport",
new Dictionary<string, object>
{
{"subscriber", UserName},
{"transportName", topicName},
{"topicName", topicName},
{"transportType", (int)Transport},
{"transportData", GetTransportData(topicName).ToJson()}
},
__ =>
{
reset.Set();
});
});
});
if (!isValid)
RestHelper.Invoke("CreateSubscriber",
new Dictionary<string, object> { { "subscriber", UserName } },
_ => action());
else
action();
});
}
private static string ParseAddress(string topicName) {
if (Transport == TransportType.MSMQ)
return String.Concat(Address, topicName, UserName);
return String.Concat(Address, ";queue=", topicName, UserName);
}
private static Dictionary<string, object> GetTransportData(string topicName) {
switch (Transport) {
case TransportType.MSMQ:
case TransportType.RabbitMQ:
case TransportType.Redis:
return new Dictionary<string, object> {
{"Format", 0},
{"Path", ParseAddress(topicName)}
};
default:
var tokens = Address.Split(':');
var useSSL = tokens.Length > 2 ?
tokens[2].Equals("true", StringComparison.OrdinalIgnoreCase) : false;
var ipAddress = tokens[0];
var port = Int32.Parse(tokens[1]);
return new Dictionary<string, object> {
{"Format", 0},
{"IPAddress", ipAddress},
{"Port", port},
{"UseSSL", useSSL}
};
}
}
private static void UnSubscribeFromTopic(string topicName) {
if (topicName == null) throw new ArgumentNullException("topicName");
var handler = default(HttpStreaming);
RestHelper.Invoke("UnSubscribeFrom",
new Dictionary<string, object> {
{"subscriber", UserName},
{"topicName", topicName}
});
RestHelper.Invoke("DeleteTransport",
new Dictionary<string, object> {
{"subscriber", UserName},
{"transportName", topicName}
});
if (_handlers.TryGetValue(topicName, out handler)) {
_handlers.Remove(topicName);
handler.Stop();
}
}
public static void UnSubscribe<T>() {
UnSubscribeFromTopic(typeof(T).Name);
}
public static void UnSubscribe(string topicName) {
UnSubscribeFromTopic(topicName);
}
public static void Publish(string topicName, object message, string groupID = null, int sequenceID = 0, TimeSpan? expiresIn = null, Dictionary<string, string> headers = null) {
expiresIn = expiresIn ?? TimeSpan.FromDays(30);
headers = headers ?? new Dictionary<string, string>();
if (!String.IsNullOrWhiteSpace(groupID) && sequenceID > 0) {
headers["ESB_GROUP_ID"] = groupID;
headers["ESB_SEQUENCE_ID"] = sequenceID.ToString();
}
RestHelper.Invoke("PublishTopic",
new Dictionary<string, object> {
{"topicName", topicName },
{"topicData", new Dictionary<string,object>{
{"Headers", headers},
{"ExpiresIn", expiresIn.Value.TotalMilliseconds}}
.ToJson()},
{"publishData", new []{ message }.ToJson()}
});
}
public static void Publish<T>(T message, string groupID = null, int sequenceID = 0, TimeSpan? expiresIn = null, Dictionary<string, string> headers = null) where T : class {
var type = typeof(T);
Publish(type.Name, message, groupID, sequenceID, expiresIn, headers);
}
public static void Disconnect() {
var username = UserName;
var action = new Action<string>(name => {
RestHelper.Invoke("Disconnect",
new Dictionary<string, object> { { "subscriberName", name } });
});
Cleanup();
if (!Durable)
RestHelper.Invoke("DeleteSubscriber", new Dictionary<string, object> { { "name", username } },
_ => action(username));
else
action(username);
}
private static void Cleanup() {
if (!Durable) LocalStorage.Delete(USERNAME_KEY);
foreach (var kv in _handlers) {
var handler = kv.Value;
handler.Stop();
}
_handlers.Clear();
if (OnDisconnect != null) OnDisconnect();
}
public static void ReThrowException(bool value) {
ThrowException = value;
}
private static string UserName {
get {
if (!String.IsNullOrWhiteSpace(_username)) return _username;
_username = LocalStorage.Get<string>(USERNAME_KEY);
if (_username != null) Durable = true;
var usernamePrefix = string.Empty;
#if WINDOWS_PHONE
usernamePrefix = "WP";
#else
usernamePrefix = "SL";
#endif
if (String.IsNullOrWhiteSpace(_username))
_username = String.Concat(usernamePrefix, Guid.NewGuid().ToString());
if (Durable) LocalStorage.Set(_username, USERNAME_KEY);
return _username;
}
}
internal static bool ThrowException { get; set; }
private static bool _durable;
public static bool Durable { internal get { return _durable; }
set {
_durable = value;
#if !WINDOWS_PHONE
HtmlPage.Window.Eval(String.Format(ScriptConstants.SetDurableFlag, value.ToString().ToLower()));
#endif
}
}
private static string _endpoint;
public static string Endpoint {
get {
return _endpoint + (_endpoint.EndsWith("/") ? string.Empty : "/");
} set { _endpoint = value; }
}
public static string ApiKey { internal get; set; }
public static string PassCode { internal get; set; }
public static string Address { internal get; set; }
public static TransportType Transport { internal get; set; }
public static Action OnDisconnect { get; set; }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment