Last active
August 29, 2015 14:19
-
-
Save XSockets/f82d838da4d0f6e9cf97 to your computer and use it in GitHub Desktop.
Override the default scaling of XSockets and use Azure Service Bus instead.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Configuration; | |
using System.Linq; | |
using Microsoft.ServiceBus; | |
using Microsoft.ServiceBus.Messaging; | |
using XSockets.Core.Common.Enterprise; | |
using XSockets.Core.Common.Socket; | |
using XSockets.Core.Common.Socket.Event.Interface; | |
using XSockets.Core.Common.Utility.Logging; | |
using XSockets.Core.Common.Utility.Serialization; | |
using XSockets.Core.XSocket.Model; | |
using XSockets.Enterprise.Scaling; | |
using XSockets.Plugin.Framework; | |
using XSockets.Plugin.Framework.Attributes; | |
namespace XSocketsScalingOverAzureServiceBus | |
{ | |
[Export(typeof(IXSocketsScaleOut), Rewritable = Rewritable.No, InstancePolicy = InstancePolicy.Shared)] | |
public class AzureServiceBusScaling : BaseScaleOut | |
{ | |
private readonly string SID = Guid.NewGuid().ToString(); | |
private string ConnString; | |
private NamespaceManager NamespaceManager; | |
private TopicClient TopicClient; | |
private SubscriptionClient SubscriptionClient; | |
private IXSocketJsonSerializer JsonSerializer; | |
public override void Publish(IMessage message) | |
{ | |
Composable.GetExport<IXLogger>().Debug("Azure ServiceBus Scaling - PUBLISH"); | |
TopicClient.Send(GetBrokerMessage(message)); | |
} | |
private BrokeredMessage GetBrokerMessage(IMessage message) | |
{ | |
// Create message, passing a string message for the body | |
var m = new BrokeredMessage(); | |
m.Properties["JSON"] = this.JsonSerializer.SerializeToString(message); | |
m.Properties["SID"] = SID; | |
return m; | |
} | |
public override void Subscribe() | |
{ | |
Composable.GetExport<IXLogger>().Debug("Azure ServiceBus Scaling - SUBSCRIBE"); | |
var options = new OnMessageOptions { AutoComplete = false, AutoRenewTimeout = TimeSpan.FromMinutes(30) }; | |
SubscriptionClient.OnMessage(OnBrokerMessage, options); | |
} | |
private void OnBrokerMessage(BrokeredMessage message) | |
{ | |
try | |
{ | |
if (message.Properties["SID"].ToString() == SID) return; | |
var m = this.JsonSerializer.DeserializeFromString<Message>(message.Properties["JSON"].ToString()); | |
var pipe = Composable.GetExport<IXSocketPipeline>(); | |
var ctrl = Composable.GetExports<IXSocketController>().First(p => p.Alias == m.Controller); | |
pipe.OnIncomingMessage(ctrl, m); | |
message.Complete(); | |
} | |
catch (Exception) | |
{ | |
// Indicates a problem, unlock message in subscription | |
message.DeadLetter(); | |
} | |
} | |
public override void Init() | |
{ | |
this.ConnString = ConfigurationManager.AppSettings.Get("Microsoft.ServiceBus.ConnectionString"); | |
this.JsonSerializer = Composable.GetExport<IXSocketJsonSerializer>(); | |
SetupAzureServiceBus(); | |
} | |
private void SetupAzureServiceBus() | |
{ | |
Composable.GetExport<IXLogger>().Debug("Azure ServiceBus Scaling - INIT"); | |
NamespaceManager = NamespaceManager.CreateFromConnectionString(ConnString); | |
if (!NamespaceManager.TopicExists("XDATA")) | |
{ | |
Composable.GetExport<IXLogger>().Debug("Creating Topic for Azure Service Bus"); | |
NamespaceManager.CreateTopic("XDATA"); | |
} | |
if (!NamespaceManager.SubscriptionExists("XDATA", SID)) | |
{ | |
NamespaceManager.CreateSubscription("XDATA", SID); | |
Composable.GetExport<IXLogger>().Debug("Creating Subscription for Azure Service Bus"); | |
} | |
this.TopicClient = TopicClient.CreateFromConnectionString(ConnString, "XDATA"); | |
this.SubscriptionClient = SubscriptionClient.CreateFromConnectionString(ConnString, "XDATA", SID); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment