Skip to content

Instantly share code, notes, and snippets.

@rpgmaker
Created June 4, 2012 18:30
Show Gist options
  • Save rpgmaker/2870068 to your computer and use it in GitHub Desktop.
Save rpgmaker/2870068 to your computer and use it in GitHub Desktop.
Message Resequencer
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using PServiceBus.Core.Interface;
using PServiceBus.Core.Runtime.Messages;
using PServiceBus.Core.Manager;
using PServiceBus.Core.Runtime.Configuration;
using System.Threading;
using PServiceBus.Core.Runtime;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using PServiceBus.Core.Logger;
using PServiceBus.Core.Runtime.Transports;
using PServiceBus.Core.Runtime.Extensions;
using PServiceBus.Core.Runtime.Topics;
namespace PServiceBus.Core.Components {
public class MessageResequencer : ComponentBase {
const string MESSAGE_SEQUENCEID_PREFIX = "MessageResequencer";
const int LOOK_BACK_QUEUE_MAX_SIZE = 5;
private List<Task> _tasks = new List<Task>();
private ConcurrentDictionary<string, BlockingCollection<ResequenceMessage>> _collections =
new ConcurrentDictionary<string, BlockingCollection<ResequenceMessage>>();
private ConcurrentDictionary<string, Queue<ResequenceMessage>> _queues =
new ConcurrentDictionary<string, Queue<ResequenceMessage>>();
private ConcurrentDictionary<string, HashSet<ulong>> _processedSequences =
new ConcurrentDictionary<string, HashSet<ulong>>();
private ConcurrentDictionary<string, ManualResetEvent> _resets
= new ConcurrentDictionary<string, ManualResetEvent>();
private HashSet<string> _groups = new HashSet<string>();
private volatile bool _completed = false;
private IObjectManager<ResequenceMessage> _resequenceMessages;
private IObjectManager<ResequenceMessage> ResequenceMessages {
get {
if (_resequenceMessages == null)
_resequenceMessages = ObjectManager<ResequenceMessage>.Create(ConfigurationProvider.ComponentContainerConfig.Address,
ConfigurationProvider.ComponentContainerConfig.AddressID);
return _resequenceMessages;
}
}
public override void SaveState() {
Cleanup();
}
public override void Close() {
Cleanup();
MethodHelper.TryLog(() => _tasks.ForEach(task => task.Dispose()));
}
public override void Execute() {
var counter = 0;
var hasMessages = false;
if (_completed) return;
foreach (var message in ResequenceMessages) {
hasMessages = true;//Flag used to determine when to process backlogs
QueueMessageForResequencing(message);
if (++counter == BatchSize) break;
}
//Clean backlog to avoid large backlogs
if (!hasMessages) ProcessBackLogs();
}
private void Cleanup() {
_completed = true;
WaitHandle.WaitAll(_resets.Select(x => x.Value).ToArray());
ProcessBackLogs();
}
private void QueueMessageForResequencing(ResequenceMessage message) {
var groupID = message.GroupID;
var collection = default(BlockingCollection<ResequenceMessage>);
if (!_collections.TryGetValue(groupID, out collection))
collection = _collections[groupID]
= new BlockingCollection<ResequenceMessage>();
collection.Add(message);
AddGroup(groupID);
}
private void Process(ResequenceMessage resequenceMessage) {
var subscribers = resequenceMessage.SubscriberID > 0 ?
SubscriberManager.Current.Where(x => x.ID == resequenceMessage.SubscriberID).ToList()
: SubscriptionCollection.Instance[resequenceMessage.TopicID];
var index = 0;
var tasks = new Task[subscribers.Count];
var message = resequenceMessage.Message;
var topicID = resequenceMessage.TopicID;
var publishedMessage = new PublishedMessage(message, topicID);
var failedMessage = new FailedMessage(message, topicID);
foreach (var subscriber in subscribers) {
tasks[index++] = Task.Factory.StartNew(o => {
var sub = o as ISubscriber;
ProcessMessage(topicID, sub, message, publishedMessage, failedMessage, resequenceMessage);
}, subscriber);
}
Task.WaitAll(tasks);
PublishedMessageManager.Attach(publishedMessage);
FailedMessageManager.Attach(failedMessage);
}
private void ProcessMessage(ulong topicID, ISubscriber subscriber, TransportMessage message, PublishedMessage publishedMessage,
FailedMessage failedMessage, ResequenceMessage resequenceMessage) {
var transports = subscriber.Transports.Where(t => t.TopicID == topicID || t.TopicID == 0);
foreach (var transportInfo in transports) {
var transport = transportInfo.Transport;
var tranMessage = TransportProvider.FilterMessage(subscriber, topicID, message);
if (tranMessage == null) continue;
var response = transport.Send(TransportProvider.TransformMessage(transport.Format, tranMessage), tranMessage.Parameters[0]);
var messageResponse = new MessageResponse() { SubscriberName = subscriber.Name, SubscriberID = subscriber.ID,
TransportResponse = response };
if (!response.Success && !subscriber.Durable) failedMessage.Responses.Add(messageResponse);
if (!response.Success && subscriber.Durable && !message.IsExpired)
ResequenceMessages.Attach(new ResequenceMessage(message) {
ID = resequenceMessage.ID, SubscriberID = subscriber.ID, TopicID = topicID });
publishedMessage.Responses.Add(messageResponse);
}
}
private void ProcessBackLogs() {
var messages = new List<ResequenceMessage>();
foreach (var kv in _queues) {
var queue = kv.Value;
while (queue.Count > 0)
messages.Add(queue.Dequeue());
}
ResequenceMessages.AttachMany(messages);
}
private void ExecuteGroup(object state) {
var groupID = state as string;
var collection = _collections[groupID];
var queue = default(Queue<ResequenceMessage>);
var reset = default(ManualResetEvent);
var processedSequence = default(HashSet<ulong>);
if (!_queues.TryGetValue(groupID, out queue))
queue = _queues[groupID]
= new Queue<ResequenceMessage>(LOOK_BACK_QUEUE_MAX_SIZE);
if (!_resets.TryGetValue(groupID, out reset))
reset = _resets[groupID]
= new ManualResetEvent(false);
if (!_processedSequences.TryGetValue(groupID, out processedSequence))
processedSequence = _processedSequences[groupID] =
new HashSet<ulong>();
var key = String.Concat(MESSAGE_SEQUENCEID_PREFIX, groupID);
foreach (var message in collection.GetConsumingEnumerable()) {
reset.Reset();
var currentSequenceID = KeyValueStore.Instance.Get<ulong>(key);
if (currentSequenceID == 0) currentSequenceID = KeyValueStore.Instance.Incr(key);
// If message is already processed, we can go ahead and send it out of order
if (processedSequence.Contains(message.SequenceID)) {
Process(message);
//Just in case collection is empty after processing duplicates
if (collection.Count == 0) reset.Set();
continue;
}
if (message.SequenceID == currentSequenceID) {
Process(message);
processedSequence.Add(currentSequenceID);
currentSequenceID = KeyValueStore.Instance.Incr(key);
while (queue.Count > 0) {
var msg = queue.Dequeue();
if (msg.SequenceID == currentSequenceID) {
Process(msg);
processedSequence.Add(currentSequenceID);
currentSequenceID = KeyValueStore.Instance.Incr(key);
} else ResequenceMessages.Attach(msg);
}
}
else {
if (queue.Count < LOOK_BACK_QUEUE_MAX_SIZE)
queue.Enqueue(message);
else ResequenceMessages.Attach(message);
}
if (collection.Count == 0) reset.Set();
}
}
private void AddGroup(string groupID) {
if (_groups.Contains(groupID)) return;
_tasks.Add(
Task.Factory.StartNew(ExecuteGroup, groupID, TaskCreationOptions.LongRunning)
.ContinueWith(task => {
if (task.Exception != null)
ESBLogger.Log(task.Exception);
}, TaskContinuationOptions.OnlyOnFaulted)
.ContinueWith(_ => {}));
_groups.Add(groupID);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment