Created
March 24, 2011 07:05
-
-
Save kellypleahy/884685 to your computer and use it in GitHub Desktop.
an attempt at a queue watcher that is more correct for our needs.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public interface IQueueWatcher | |
{ | |
void Start(); | |
} | |
public interface IQueueMessageProcessor | |
{ | |
void ProcessMessage(byte[] messageBytes); | |
} | |
public class JobStatusMessageProcessor: IQueueMessageProcessor | |
{ | |
public ILogger Log { get; set; } | |
public JobStatusMessageProcessor() | |
{ | |
Log = new NullLogger(); | |
} | |
public void ProcessMessage(byte[] messageBytes) | |
{ | |
Log.Debug("Processing message of size: {0}", messageBytes); | |
} | |
} | |
public class JobStatusQueueWatcher: QueueWatcherBase<JobStatusMessageProcessor> | |
{ | |
public JobStatusQueueWatcher(JobStatusMessageProcessor processor, ITimerFacade timerFacade, IQueueService queueService) | |
: base(processor, timerFacade, queueService) | |
{ | |
} | |
public override string QueueName | |
{ | |
get { return "w-queue-test"; } | |
} | |
public override TimeSpan SleepInterval | |
{ | |
get { return TimeSpan.FromMilliseconds(500); } | |
} | |
public override TimeSpan VisibilityTimeout | |
{ | |
get { return TimeSpan.FromMinutes(1); } | |
} | |
} | |
public abstract class QueueWatcherBase<TProcessor>: IQueueWatcher | |
where TProcessor : IQueueMessageProcessor | |
{ | |
private readonly TimeSpan _infiniteTimeout; | |
public ILogger Log { get; set; } | |
private readonly TProcessor _processor; | |
private readonly ITimerFacade _timerFacade; | |
private readonly IQueueService _queueService; | |
public abstract string QueueName { get; } | |
public abstract TimeSpan SleepInterval { get; } | |
public abstract TimeSpan VisibilityTimeout { get; } | |
protected QueueWatcherBase(TProcessor processor, ITimerFacade timerFacade, IQueueService queueService) | |
{ | |
Log = new NullLogger(); | |
_infiniteTimeout = TimeSpan.FromMilliseconds(-1); | |
_processor = processor; | |
_queueService = queueService; | |
_timerFacade = timerFacade; | |
} | |
void IQueueWatcher.Start() | |
{ | |
Log.Debug("QueueWatcher started for processor type: " + typeof(TProcessor).Name); | |
_timerFacade.Start(ProcessMessages, _infiniteTimeout); | |
} | |
private void ProcessMessages() | |
{ | |
var message = _queueService.GetMessage(QueueName, VisibilityTimeout); | |
while (message != null) | |
{ | |
try | |
{ | |
_processor.ProcessMessage(message.MessageBody); | |
} | |
catch (Exception ex) | |
{ | |
Log.Debug("Error processing message {0} from queue {1}, moving to dead message queue {1}-errors\n --- exception was: {2}", message.MessageId, QueueName, ex); | |
// move the message to a dead-letter queue with the name of the original queue + "-errors". | |
} | |
_queueService.DeleteMessage(message.MessageId, message.PopId); | |
message = _queueService.GetMessage(QueueName, VisibilityTimeout); | |
} | |
Log.Debug("Sleeping for {0}.", SleepInterval); | |
_timerFacade.Change(SleepInterval, _infiniteTimeout); | |
} | |
} | |
[TestFixture] | |
public class QueueWatcherBaseTestContext | |
{ | |
private Action _saveAction; | |
private MockRepository _mockRepository; | |
private IProcessor _processor; | |
private ITimerFacade _timerFacade; | |
private IQueueService _queueService; | |
private TimeSpan _infiniteTimeout; | |
private QueueWatcherTest _queueWatcherTest; | |
private interface IProcessor: IQueueMessageProcessor | |
{ | |
} | |
private class QueueWatcherTest: QueueWatcherBase<IProcessor> | |
{ | |
public QueueWatcherTest(IProcessor processor, ITimerFacade timerFacade, IQueueService queueService) | |
: base(processor, timerFacade, queueService) | |
{ | |
} | |
public override string QueueName | |
{ | |
get { return "queue"; } | |
} | |
public override TimeSpan SleepInterval | |
{ | |
get { return TimeSpan.FromMinutes(3); } | |
} | |
public override TimeSpan VisibilityTimeout | |
{ | |
get { return TimeSpan.FromMinutes(15); } | |
} | |
} | |
[SetUp] | |
public void SetUp() | |
{ | |
_mockRepository = new MockRepository(); | |
_processor = _mockRepository.StrictMock<IProcessor>(); | |
_timerFacade = _mockRepository.StrictMock<ITimerFacade>(); | |
_queueService = _mockRepository.StrictMock<IQueueService>(); | |
_infiniteTimeout = TimeSpan.FromMilliseconds(-1); | |
_saveAction = null; | |
_queueWatcherTest = new QueueWatcherTest(_processor, _timerFacade, _queueService); | |
IQueueWatcher queueWatcher = _queueWatcherTest; | |
using (_mockRepository.Record()) | |
{ | |
_timerFacade.Expect(x => x.Start(Arg<Action>.Is.Anything, Arg<TimeSpan>.Is.Equal(_infiniteTimeout))) | |
.WhenCalled(x => _saveAction = (Action)x.Arguments[0]); | |
} | |
using (_mockRepository.Playback()) | |
{ | |
queueWatcher.Start(); | |
} | |
} | |
[Test] | |
public void it_schedules_the_action() | |
{ | |
Assert.That(_saveAction, Is.Not.Null); | |
} | |
[Test] | |
public void the_action_calls_the_processor_for_as_many_queue_messages_are_available_and_then_reschedules_itself() | |
{ | |
var message1 = new QueueMessage | |
{ | |
MessageBody = new byte[] {0}, | |
MessageId = "1", | |
PopId = "p1", | |
}; | |
var message2 = new QueueMessage | |
{ | |
MessageBody = new byte[] {1}, | |
MessageId = "2", | |
PopId = "p2", | |
}; | |
_mockRepository.BackToRecordAll(); | |
using (_mockRepository.Record()) | |
{ | |
_queueService.Expect(x => x.GetMessage(_queueWatcherTest.QueueName, _queueWatcherTest.VisibilityTimeout)) | |
.Return(message1); | |
_processor.ProcessMessage(message1.MessageBody); | |
_queueService.DeleteMessage(message1.MessageId, message1.PopId); | |
_queueService.Expect(x => x.GetMessage(_queueWatcherTest.QueueName, _queueWatcherTest.VisibilityTimeout)) | |
.Return(message2); | |
_processor.ProcessMessage(message2.MessageBody); | |
_queueService.DeleteMessage(message2.MessageId, message2.PopId); | |
_queueService.Expect(x => x.GetMessage(_queueWatcherTest.QueueName, _queueWatcherTest.VisibilityTimeout)) | |
.Return(null); | |
_timerFacade.Change(_queueWatcherTest.SleepInterval, _infiniteTimeout); | |
} | |
using(_mockRepository.Playback()) | |
{ | |
_saveAction(); | |
} | |
} | |
[Test] | |
public void the_action_still_deletes_the_message_and_reschedules_itself_even_if_the_processor_throws_an_exception() | |
{ | |
var message1 = new QueueMessage | |
{ | |
MessageBody = new byte[] { 0 }, | |
MessageId = "1", | |
PopId = "p1", | |
}; | |
_mockRepository.BackToRecordAll(); | |
using (_mockRepository.Record()) | |
{ | |
_queueService.Expect(x => x.GetMessage(_queueWatcherTest.QueueName, _queueWatcherTest.VisibilityTimeout)) | |
.Return(message1); | |
_processor.Expect(x => x.ProcessMessage(message1.MessageBody)).WhenCalled(x => { throw new InvalidOperationException(); }); | |
_queueService.DeleteMessage(message1.MessageId, message1.PopId); | |
_queueService.Expect(x => x.GetMessage(_queueWatcherTest.QueueName, _queueWatcherTest.VisibilityTimeout)) | |
.Return(null); | |
_timerFacade.Change(_queueWatcherTest.SleepInterval, _infiniteTimeout); | |
} | |
using (_mockRepository.Playback()) | |
{ | |
_saveAction(); | |
} | |
} | |
} | |
public class QueueMessage | |
{ | |
public string MessageId; | |
public string PopId; | |
public byte[] MessageBody; | |
} | |
public interface IQueueService | |
{ | |
QueueMessage GetMessage(string queueName, TimeSpan visibilityTimeout); | |
void DeleteMessage(string messageId, string popId); | |
} | |
public class FakeQueueService : IQueueService | |
{ | |
private bool _dequeued = false; | |
public QueueMessage GetMessage(string queueName, TimeSpan visibilityTimeout) | |
{ | |
if (DateTime.Now.Second % 5 == 0) | |
{ | |
if (_dequeued) | |
return null; | |
_dequeued = true; | |
return new QueueMessage { MessageId = Guid.NewGuid().ToString(), PopId = "blah" }; | |
} | |
_dequeued = false; | |
return null; | |
} | |
public void DeleteMessage(string messageId, string popId) | |
{ | |
} | |
} | |
public interface ITimerFacade | |
{ | |
void Start(Action action, TimeSpan timeout); | |
void Change(TimeSpan interval, TimeSpan timeout); | |
void Stop(); | |
} | |
public class TimerFacade : ITimerFacade, IDisposable | |
{ | |
private readonly Timer _timer; | |
private Action _action; | |
public TimerFacade() | |
{ | |
_timer = new Timer(obj => _action()); | |
} | |
public void Start(Action periodicAction, TimeSpan timeout) | |
{ | |
if (periodicAction == null) | |
throw new ArgumentNullException("periodicAction"); | |
if (_action != null) | |
throw new InvalidOperationException("timer already started"); | |
_action = periodicAction; | |
_timer.Change(TimeSpan.Zero, timeout); | |
} | |
public void Change(TimeSpan dueTime, TimeSpan timeout) | |
{ | |
if (_action == null) | |
throw new InvalidOperationException("timer not yet started"); | |
_timer.Change(dueTime, timeout); | |
} | |
public void Stop() | |
{ | |
if (_action == null) | |
throw new InvalidOperationException("timer not yet started"); | |
_action = null; | |
_timer.Change(-1, -1); | |
} | |
public void Dispose() | |
{ | |
_timer.Dispose(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment