Created
March 15, 2014 16:58
-
-
Save hyrmn/9570454 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| using System.Collections.Generic; | |
| using Microsoft.ServiceBus; | |
| using Microsoft.ServiceBus.Messaging; | |
| namespace Infrastructure.Azure | |
| { | |
| public class ServiceBus | |
| { | |
| private NamespaceManager _namespaceManager; | |
| private MessagingFactory _messagingFactory; | |
| private string _serviceUri; | |
| private List<Tuple<string, SubscriptionClient>> _subscribers; | |
| private ServiceBus(string serviceUri) | |
| { | |
| _serviceUri = serviceUri; | |
| _subscribers = new List<Tuple<string, SubscriptionClient>>(); | |
| } | |
| private void EnsureEnvironment() | |
| { | |
| if (_namespaceManager == null) | |
| { | |
| _namespaceManager = NamespaceManager.CreateFromConnectionString(_serviceUri); | |
| _messagingFactory = MessagingFactory.CreateFromConnectionString(_serviceUri); | |
| } | |
| } | |
| public static ServiceBus Setup(string serviceUri) | |
| { | |
| return new ServiceBus(serviceUri); | |
| } | |
| public void Subscribe<T>(Action<T> receiveHandler, string subscriberName, ReceiveMode receiveMode = ReceiveMode.PeekLock) | |
| { | |
| EnsureEnvironment(); | |
| var topicName = string.Format("Topic_{0}", typeof(T).Name); | |
| var subscriptionName = string.Format("Subscription_{0}_{1}", subscriberName, typeof(T).Name); | |
| if (!_namespaceManager.TopicExists(topicName)) | |
| _namespaceManager.CreateTopic(topicName); | |
| var topic = _namespaceManager.GetTopic(topicName); | |
| if (!_namespaceManager.SubscriptionExists(topic.Path, subscriptionName)) | |
| _namespaceManager.CreateSubscription(topic.Path, subscriptionName); | |
| var subscriptionClient = _messagingFactory.CreateSubscriptionClient(topicName, subscriptionName, receiveMode); | |
| _subscribers.Add(new Tuple<string, SubscriptionClient>(topicName, subscriptionClient)); | |
| var opt = new OnMessageOptions { AutoComplete = true, MaxConcurrentCalls = 7 }; | |
| opt.ExceptionReceived += RaiseError; | |
| subscriptionClient.OnMessage(message => | |
| { | |
| var messageData = message.GetBody<T>(); | |
| receiveHandler(messageData); | |
| }, | |
| opt); | |
| } | |
| private void RaiseError(object sender, ExceptionReceivedEventArgs ex) | |
| { | |
| if (ex.Exception == null) | |
| { | |
| return; | |
| } | |
| if (OnError != null) | |
| { | |
| OnError(ex); | |
| } | |
| } | |
| public void Publish<T>(T message) | |
| { | |
| EnsureEnvironment(); | |
| var topicName = string.Format("Topic_{0}", message.GetType().Name); | |
| var topicClient = _messagingFactory.CreateTopicClient(topicName); | |
| try | |
| { | |
| topicClient.Send(new BrokeredMessage(message)); | |
| } | |
| finally | |
| { | |
| topicClient.Close(); | |
| } | |
| } | |
| public void Close() | |
| { | |
| _subscribers.ForEach(subscriber => subscriber.Item2.Close()); | |
| } | |
| public void ClearTopics() | |
| { | |
| _subscribers.ForEach((s) => _namespaceManager.DeleteTopic(s.Item1)); | |
| } | |
| public delegate void ServiceBusExceptionHandler(ExceptionReceivedEventArgs exception); | |
| public event ServiceBusExceptionHandler OnError; | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment