Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save agehrke/9046525 to your computer and use it in GitHub Desktop.
Save agehrke/9046525 to your computer and use it in GitHub Desktop.
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Concorde.AzurePack.Domain.Websites.Preallocation.Settings;
using Concorde.Contracts.Processes;
using Concorde.Infrastructure.Messaging;
using Microsoft.ServiceBus.Messaging;
namespace Allocation
{
public class Processor : IProcessor, IDisposable
{
private bool _disposed;
private CancellationTokenSource _cancellationSource;
private readonly object _lockObject = new object();
private readonly AllocationSettings _allocationSettings;
private readonly ProjectSettings _projectSettings;
private readonly QueueClient _queueClient;
public Processor(AllocationSettings allocationSettings,
ProjectSettings projectSettings,
QueueClient queueClient)
{
_allocationSettings = allocationSettings;
_projectSettings = projectSettings;
_queueClient = queueClient;
}
public Task Start()
{
ThrowIfDisposed();
lock (_lockObject)
{
Trace.WriteLine(string.Format("Started '{0}' allocation.", _projectSettings.Name));
_cancellationSource = new CancellationTokenSource();
return ReceiveAndProcessMessagesAsync(_cancellationSource.Token);
}
}
public void Stop()
{
lock (_lockObject)
{
using (_cancellationSource)
{
if (_cancellationSource != null)
{
_cancellationSource.Cancel();
_cancellationSource = null;
}
}
Trace.WriteLine(string.Format("Stopped '{0}' allocation.", _projectSettings.Name));
}
}
private async Task ReceiveAndProcessMessagesAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var msg = await _queueClient.ReceiveAsync(TimeSpan.FromMinutes(1)).ConfigureAwait(false);
await ProcessMessageAsync(msg);
await Task.Delay(TimeSpan.FromMinutes(1), cancellationToken);
}
}
private async Task ProcessMessageAsync(BrokeredMessage result)
{
var releaseAction = MessageReleaseAction.AbandonMessage;
try
{
// Process
}
finally
{
//ReleaseMessage
await ReleaseMessageAsync(result, releaseAction);
}
}
private Task ReleaseMessageAsync(BrokeredMessage msg, MessageReleaseAction releaseAction)
{
//Release message according to release actions:
//MessageReleaseActionKind.Complete
//MessageReleaseActionKind.Abandon
//MessageReleaseActionKind.DeadLetter
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment