Skip to content

Instantly share code, notes, and snippets.

@hyrmn
Created March 15, 2014 16:58
Show Gist options
  • Select an option

  • Save hyrmn/9570454 to your computer and use it in GitHub Desktop.

Select an option

Save hyrmn/9570454 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
namespace Infrastructure.Azure
{
public class ServiceBus
{
private NamespaceManager _namespaceManager;
private MessagingFactory _messagingFactory;
private string _serviceUri;
private List<Tuple<string, SubscriptionClient>> _subscribers;
private ServiceBus(string serviceUri)
{
_serviceUri = serviceUri;
_subscribers = new List<Tuple<string, SubscriptionClient>>();
}
private void EnsureEnvironment()
{
if (_namespaceManager == null)
{
_namespaceManager = NamespaceManager.CreateFromConnectionString(_serviceUri);
_messagingFactory = MessagingFactory.CreateFromConnectionString(_serviceUri);
}
}
public static ServiceBus Setup(string serviceUri)
{
return new ServiceBus(serviceUri);
}
public void Subscribe<T>(Action<T> receiveHandler, string subscriberName, ReceiveMode receiveMode = ReceiveMode.PeekLock)
{
EnsureEnvironment();
var topicName = string.Format("Topic_{0}", typeof(T).Name);
var subscriptionName = string.Format("Subscription_{0}_{1}", subscriberName, typeof(T).Name);
if (!_namespaceManager.TopicExists(topicName))
_namespaceManager.CreateTopic(topicName);
var topic = _namespaceManager.GetTopic(topicName);
if (!_namespaceManager.SubscriptionExists(topic.Path, subscriptionName))
_namespaceManager.CreateSubscription(topic.Path, subscriptionName);
var subscriptionClient = _messagingFactory.CreateSubscriptionClient(topicName, subscriptionName, receiveMode);
_subscribers.Add(new Tuple<string, SubscriptionClient>(topicName, subscriptionClient));
var opt = new OnMessageOptions { AutoComplete = true, MaxConcurrentCalls = 7 };
opt.ExceptionReceived += RaiseError;
subscriptionClient.OnMessage(message =>
{
var messageData = message.GetBody<T>();
receiveHandler(messageData);
},
opt);
}
private void RaiseError(object sender, ExceptionReceivedEventArgs ex)
{
if (ex.Exception == null)
{
return;
}
if (OnError != null)
{
OnError(ex);
}
}
public void Publish<T>(T message)
{
EnsureEnvironment();
var topicName = string.Format("Topic_{0}", message.GetType().Name);
var topicClient = _messagingFactory.CreateTopicClient(topicName);
try
{
topicClient.Send(new BrokeredMessage(message));
}
finally
{
topicClient.Close();
}
}
public void Close()
{
_subscribers.ForEach(subscriber => subscriber.Item2.Close());
}
public void ClearTopics()
{
_subscribers.ForEach((s) => _namespaceManager.DeleteTopic(s.Item1));
}
public delegate void ServiceBusExceptionHandler(ExceptionReceivedEventArgs exception);
public event ServiceBusExceptionHandler OnError;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment