Created
June 4, 2012 18:30
-
-
Save rpgmaker/2870068 to your computer and use it in GitHub Desktop.
Message Resequencer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using PServiceBus.Core.Interface; | |
using PServiceBus.Core.Runtime.Messages; | |
using PServiceBus.Core.Manager; | |
using PServiceBus.Core.Runtime.Configuration; | |
using System.Threading; | |
using PServiceBus.Core.Runtime; | |
using System.Collections.Concurrent; | |
using System.Threading.Tasks; | |
using PServiceBus.Core.Logger; | |
using PServiceBus.Core.Runtime.Transports; | |
using PServiceBus.Core.Runtime.Extensions; | |
using PServiceBus.Core.Runtime.Topics; | |
namespace PServiceBus.Core.Components { | |
public class MessageResequencer : ComponentBase { | |
const string MESSAGE_SEQUENCEID_PREFIX = "MessageResequencer"; | |
const int LOOK_BACK_QUEUE_MAX_SIZE = 5; | |
private List<Task> _tasks = new List<Task>(); | |
private ConcurrentDictionary<string, BlockingCollection<ResequenceMessage>> _collections = | |
new ConcurrentDictionary<string, BlockingCollection<ResequenceMessage>>(); | |
private ConcurrentDictionary<string, Queue<ResequenceMessage>> _queues = | |
new ConcurrentDictionary<string, Queue<ResequenceMessage>>(); | |
private ConcurrentDictionary<string, HashSet<ulong>> _processedSequences = | |
new ConcurrentDictionary<string, HashSet<ulong>>(); | |
private ConcurrentDictionary<string, ManualResetEvent> _resets | |
= new ConcurrentDictionary<string, ManualResetEvent>(); | |
private HashSet<string> _groups = new HashSet<string>(); | |
private volatile bool _completed = false; | |
private IObjectManager<ResequenceMessage> _resequenceMessages; | |
private IObjectManager<ResequenceMessage> ResequenceMessages { | |
get { | |
if (_resequenceMessages == null) | |
_resequenceMessages = ObjectManager<ResequenceMessage>.Create(ConfigurationProvider.ComponentContainerConfig.Address, | |
ConfigurationProvider.ComponentContainerConfig.AddressID); | |
return _resequenceMessages; | |
} | |
} | |
public override void SaveState() { | |
Cleanup(); | |
} | |
public override void Close() { | |
Cleanup(); | |
MethodHelper.TryLog(() => _tasks.ForEach(task => task.Dispose())); | |
} | |
public override void Execute() { | |
var counter = 0; | |
var hasMessages = false; | |
if (_completed) return; | |
foreach (var message in ResequenceMessages) { | |
hasMessages = true;//Flag used to determine when to process backlogs | |
QueueMessageForResequencing(message); | |
if (++counter == BatchSize) break; | |
} | |
//Clean backlog to avoid large backlogs | |
if (!hasMessages) ProcessBackLogs(); | |
} | |
private void Cleanup() { | |
_completed = true; | |
WaitHandle.WaitAll(_resets.Select(x => x.Value).ToArray()); | |
ProcessBackLogs(); | |
} | |
private void QueueMessageForResequencing(ResequenceMessage message) { | |
var groupID = message.GroupID; | |
var collection = default(BlockingCollection<ResequenceMessage>); | |
if (!_collections.TryGetValue(groupID, out collection)) | |
collection = _collections[groupID] | |
= new BlockingCollection<ResequenceMessage>(); | |
collection.Add(message); | |
AddGroup(groupID); | |
} | |
private void Process(ResequenceMessage resequenceMessage) { | |
var subscribers = resequenceMessage.SubscriberID > 0 ? | |
SubscriberManager.Current.Where(x => x.ID == resequenceMessage.SubscriberID).ToList() | |
: SubscriptionCollection.Instance[resequenceMessage.TopicID]; | |
var index = 0; | |
var tasks = new Task[subscribers.Count]; | |
var message = resequenceMessage.Message; | |
var topicID = resequenceMessage.TopicID; | |
var publishedMessage = new PublishedMessage(message, topicID); | |
var failedMessage = new FailedMessage(message, topicID); | |
foreach (var subscriber in subscribers) { | |
tasks[index++] = Task.Factory.StartNew(o => { | |
var sub = o as ISubscriber; | |
ProcessMessage(topicID, sub, message, publishedMessage, failedMessage, resequenceMessage); | |
}, subscriber); | |
} | |
Task.WaitAll(tasks); | |
PublishedMessageManager.Attach(publishedMessage); | |
FailedMessageManager.Attach(failedMessage); | |
} | |
private void ProcessMessage(ulong topicID, ISubscriber subscriber, TransportMessage message, PublishedMessage publishedMessage, | |
FailedMessage failedMessage, ResequenceMessage resequenceMessage) { | |
var transports = subscriber.Transports.Where(t => t.TopicID == topicID || t.TopicID == 0); | |
foreach (var transportInfo in transports) { | |
var transport = transportInfo.Transport; | |
var tranMessage = TransportProvider.FilterMessage(subscriber, topicID, message); | |
if (tranMessage == null) continue; | |
var response = transport.Send(TransportProvider.TransformMessage(transport.Format, tranMessage), tranMessage.Parameters[0]); | |
var messageResponse = new MessageResponse() { SubscriberName = subscriber.Name, SubscriberID = subscriber.ID, | |
TransportResponse = response }; | |
if (!response.Success && !subscriber.Durable) failedMessage.Responses.Add(messageResponse); | |
if (!response.Success && subscriber.Durable && !message.IsExpired) | |
ResequenceMessages.Attach(new ResequenceMessage(message) { | |
ID = resequenceMessage.ID, SubscriberID = subscriber.ID, TopicID = topicID }); | |
publishedMessage.Responses.Add(messageResponse); | |
} | |
} | |
private void ProcessBackLogs() { | |
var messages = new List<ResequenceMessage>(); | |
foreach (var kv in _queues) { | |
var queue = kv.Value; | |
while (queue.Count > 0) | |
messages.Add(queue.Dequeue()); | |
} | |
ResequenceMessages.AttachMany(messages); | |
} | |
private void ExecuteGroup(object state) { | |
var groupID = state as string; | |
var collection = _collections[groupID]; | |
var queue = default(Queue<ResequenceMessage>); | |
var reset = default(ManualResetEvent); | |
var processedSequence = default(HashSet<ulong>); | |
if (!_queues.TryGetValue(groupID, out queue)) | |
queue = _queues[groupID] | |
= new Queue<ResequenceMessage>(LOOK_BACK_QUEUE_MAX_SIZE); | |
if (!_resets.TryGetValue(groupID, out reset)) | |
reset = _resets[groupID] | |
= new ManualResetEvent(false); | |
if (!_processedSequences.TryGetValue(groupID, out processedSequence)) | |
processedSequence = _processedSequences[groupID] = | |
new HashSet<ulong>(); | |
var key = String.Concat(MESSAGE_SEQUENCEID_PREFIX, groupID); | |
foreach (var message in collection.GetConsumingEnumerable()) { | |
reset.Reset(); | |
var currentSequenceID = KeyValueStore.Instance.Get<ulong>(key); | |
if (currentSequenceID == 0) currentSequenceID = KeyValueStore.Instance.Incr(key); | |
// If message is already processed, we can go ahead and send it out of order | |
if (processedSequence.Contains(message.SequenceID)) { | |
Process(message); | |
//Just in case collection is empty after processing duplicates | |
if (collection.Count == 0) reset.Set(); | |
continue; | |
} | |
if (message.SequenceID == currentSequenceID) { | |
Process(message); | |
processedSequence.Add(currentSequenceID); | |
currentSequenceID = KeyValueStore.Instance.Incr(key); | |
while (queue.Count > 0) { | |
var msg = queue.Dequeue(); | |
if (msg.SequenceID == currentSequenceID) { | |
Process(msg); | |
processedSequence.Add(currentSequenceID); | |
currentSequenceID = KeyValueStore.Instance.Incr(key); | |
} else ResequenceMessages.Attach(msg); | |
} | |
} | |
else { | |
if (queue.Count < LOOK_BACK_QUEUE_MAX_SIZE) | |
queue.Enqueue(message); | |
else ResequenceMessages.Attach(message); | |
} | |
if (collection.Count == 0) reset.Set(); | |
} | |
} | |
private void AddGroup(string groupID) { | |
if (_groups.Contains(groupID)) return; | |
_tasks.Add( | |
Task.Factory.StartNew(ExecuteGroup, groupID, TaskCreationOptions.LongRunning) | |
.ContinueWith(task => { | |
if (task.Exception != null) | |
ESBLogger.Log(task.Exception); | |
}, TaskContinuationOptions.OnlyOnFaulted) | |
.ContinueWith(_ => {})); | |
_groups.Add(groupID); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment