Skip to content

Instantly share code, notes, and snippets.

@rpgmaker
Created September 11, 2012 14:49
Show Gist options
  • Save rpgmaker/3699322 to your computer and use it in GitHub Desktop.
Save rpgmaker/3699322 to your computer and use it in GitHub Desktop.
PSBClient.Subscribe
filter = filter ?? string.Empty;
interval = interval ?? TimeSpan.FromMilliseconds(5);
var type = typeof(T);
var topicName = type.Name;
var headerProperty = type.GetProperties().FirstOrDefault(x => x.GetCustomAttributes(_topicHeaderAttributeType, false).Any());
var needHeader = headerProperty != null;
var reset = new ManualResetEvent(false);
var bw = new BackgroundWorker();
bw.DoWork += (s, e) => {
reset.WaitOne();
var handler =
new HttpStreaming(String.Concat(Endpoint,
String.Format(STREAM_URL, UserName, topicName,
new T().ToJson().Replace("null", "\"\""),
batchSize, interval.Value.TotalMilliseconds)));
_handlers[topicName] = handler;
handler.OnReceived = value =>
{
if (needHeader) value = value.Replace(ESBTOPIC_HEADERS, headerProperty.Name);
callback(value.FromJson<T>());
};
handler.Start();
};
bw.RunWorkerAsync();
RestHelper.Invoke("SelectSubscriber",
new Dictionary<string, object> { { "name", UserName } },
s => {
var subscriber = s.FromJson() as Dictionary<object, object>;
var isValid = (bool)subscriber["IsValid"];
var topics = subscriber["Topics"] as Dictionary<object, object>;
var action = new Action(() =>
{
if (topics.ContainsKey(topicName)) {
reset.Set();
return;
}
RestHelper.Invoke("SubscribeTo",
new Dictionary<string, object> {
{"subscriber", UserName},
{"topicName", topicName},
{"filter", filter},
{"needHeader", needHeader}
},
_ =>
{
RestHelper.Invoke("AddTransport",
new Dictionary<string, object>
{
{"subscriber", UserName},
{"transportName", topicName},
{"topicName", topicName},
{"transportType", (int)Transport},
{"transportData", GetTransportData(topicName).ToJson()}
},
__ =>
{
reset.Set();
});
});
});
if (!isValid)
RestHelper.Invoke("CreateSubscriber",
new Dictionary<string, object> { { "subscriber", UserName } },
_ => action());
else
action();
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment