Skip to content

Instantly share code, notes, and snippets.

@XSockets
Last active August 29, 2015 14:19
Show Gist options
  • Save XSockets/f82d838da4d0f6e9cf97 to your computer and use it in GitHub Desktop.
Save XSockets/f82d838da4d0f6e9cf97 to your computer and use it in GitHub Desktop.
Override the default scaling of XSockets and use Azure Service Bus instead.
using System;
using System.Configuration;
using System.Linq;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using XSockets.Core.Common.Enterprise;
using XSockets.Core.Common.Socket;
using XSockets.Core.Common.Socket.Event.Interface;
using XSockets.Core.Common.Utility.Logging;
using XSockets.Core.Common.Utility.Serialization;
using XSockets.Core.XSocket.Model;
using XSockets.Enterprise.Scaling;
using XSockets.Plugin.Framework;
using XSockets.Plugin.Framework.Attributes;
namespace XSocketsScalingOverAzureServiceBus
{
[Export(typeof(IXSocketsScaleOut), Rewritable = Rewritable.No, InstancePolicy = InstancePolicy.Shared)]
public class AzureServiceBusScaling : BaseScaleOut
{
private readonly string SID = Guid.NewGuid().ToString();
private string ConnString;
private NamespaceManager NamespaceManager;
private TopicClient TopicClient;
private SubscriptionClient SubscriptionClient;
private IXSocketJsonSerializer JsonSerializer;
public override void Publish(IMessage message)
{
Composable.GetExport<IXLogger>().Debug("Azure ServiceBus Scaling - PUBLISH");
TopicClient.Send(GetBrokerMessage(message));
}
private BrokeredMessage GetBrokerMessage(IMessage message)
{
// Create message, passing a string message for the body
var m = new BrokeredMessage();
m.Properties["JSON"] = this.JsonSerializer.SerializeToString(message);
m.Properties["SID"] = SID;
return m;
}
public override void Subscribe()
{
Composable.GetExport<IXLogger>().Debug("Azure ServiceBus Scaling - SUBSCRIBE");
var options = new OnMessageOptions { AutoComplete = false, AutoRenewTimeout = TimeSpan.FromMinutes(30) };
SubscriptionClient.OnMessage(OnBrokerMessage, options);
}
private void OnBrokerMessage(BrokeredMessage message)
{
try
{
if (message.Properties["SID"].ToString() == SID) return;
var m = this.JsonSerializer.DeserializeFromString<Message>(message.Properties["JSON"].ToString());
var pipe = Composable.GetExport<IXSocketPipeline>();
var ctrl = Composable.GetExports<IXSocketController>().First(p => p.Alias == m.Controller);
pipe.OnIncomingMessage(ctrl, m);
message.Complete();
}
catch (Exception)
{
// Indicates a problem, unlock message in subscription
message.DeadLetter();
}
}
public override void Init()
{
this.ConnString = ConfigurationManager.AppSettings.Get("Microsoft.ServiceBus.ConnectionString");
this.JsonSerializer = Composable.GetExport<IXSocketJsonSerializer>();
SetupAzureServiceBus();
}
private void SetupAzureServiceBus()
{
Composable.GetExport<IXLogger>().Debug("Azure ServiceBus Scaling - INIT");
NamespaceManager = NamespaceManager.CreateFromConnectionString(ConnString);
if (!NamespaceManager.TopicExists("XDATA"))
{
Composable.GetExport<IXLogger>().Debug("Creating Topic for Azure Service Bus");
NamespaceManager.CreateTopic("XDATA");
}
if (!NamespaceManager.SubscriptionExists("XDATA", SID))
{
NamespaceManager.CreateSubscription("XDATA", SID);
Composable.GetExport<IXLogger>().Debug("Creating Subscription for Azure Service Bus");
}
this.TopicClient = TopicClient.CreateFromConnectionString(ConnString, "XDATA");
this.SubscriptionClient = SubscriptionClient.CreateFromConnectionString(ConnString, "XDATA", SID);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment