-
-
Save thecarlo/ade7cc4e7d0ddc60f855022c6afa739d to your computer and use it in GitHub Desktop.
A wrapper for the SQS client in the AWS SDK for.NET v2, which uses the message-pump pattern
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Amazon; | |
using Amazon.SQS; | |
using Amazon.SQS.Model; | |
using Amazon.SQS.Util; | |
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Threading; | |
namespace Sixeyed.Blogging.Aws | |
{ | |
public class QueueClient | |
{ | |
private AmazonSQSClient _sqsClient; | |
public string QueueName { get; private set; } | |
internal string QueueUrl { get; private set; } | |
internal string QueueArn { get; private set; } | |
private Action<Message> _receiveAction; | |
private CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); | |
public QueueClient(string queueName) | |
{ | |
_sqsClient = new AmazonSQSClient(RegionEndpoint.EUWest1); | |
QueueName = queueName; | |
Ensure(); | |
} | |
public void Ensure() | |
{ | |
if (!Exists()) | |
{ | |
var request = new CreateQueueRequest(); | |
request.QueueName = QueueName; | |
var response = _sqsClient.CreateQueue(request); | |
QueueUrl = response.QueueUrl; | |
} | |
} | |
public bool Exists() | |
{ | |
var exists = false; | |
var queues = _sqsClient.ListQueues(); | |
var matchString = string.Format("/{0}", QueueName); | |
var matches = queues.QueueUrls.Where(x => x.EndsWith(QueueName)); | |
if (matches.Count() == 1) | |
{ | |
exists = true; | |
QueueUrl = matches.ElementAt(0); | |
} | |
return exists; | |
} | |
public void Unsubscribe() | |
{ | |
_cancellationTokenSource.Cancel(); | |
} | |
private async void Subscribe() | |
{ | |
if (!_cancellationTokenSource.IsCancellationRequested) | |
{ | |
var request = new ReceiveMessageRequest { MaxNumberOfMessages = 10 }; | |
request.QueueUrl = QueueUrl; | |
var result = await _sqsClient.ReceiveMessageAsync(request, _cancellationTokenSource.Token); | |
if (result.Messages.Count > 0) | |
{ | |
foreach (var message in result.Messages) | |
{ | |
if (_receiveAction != null && message != null) | |
{ | |
_receiveAction(message); | |
DeleteMessage(message.ReceiptHandle); | |
} | |
} | |
} | |
} | |
if (!_cancellationTokenSource.IsCancellationRequested) | |
{ | |
Subscribe(); | |
} | |
} | |
private DeleteMessageResponse DeleteMessage(string receiptHandle) | |
{ | |
var request = new DeleteMessageRequest(); | |
request.QueueUrl = QueueUrl; | |
request.ReceiptHandle = receiptHandle; | |
return _sqsClient.DeleteMessage(request); | |
} | |
public void Subscribe(Action<Message> receiveAction) | |
{ | |
_receiveAction = receiveAction; | |
_cancellationTokenSource = new CancellationTokenSource(); | |
Subscribe(); | |
} | |
public void Send(Message message) | |
{ | |
var request = new SendMessageRequest(); | |
request.QueueUrl = QueueUrl; | |
request.MessageBody = message.Body; | |
_sqsClient.SendMessage(request); | |
} | |
public bool HasMessages() | |
{ | |
var request = new GetQueueAttributesRequest | |
{ | |
QueueUrl = QueueUrl, | |
AttributeNames = new List<string>(new string[] { SQSConstants.ATTRIBUTE_APPROXIMATE_NUMBER_OF_MESSAGES}) | |
}; | |
var response = _sqsClient.GetQueueAttributes(request); | |
return response.ApproximateNumberOfMessages > 0; | |
} | |
public bool IsListening() | |
{ | |
return !_cancellationTokenSource.IsCancellationRequested; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment