Last active
September 1, 2020 17:37
-
-
Save sixeyed/6941016 to your computer and use it in GitHub Desktop.
Wrappers for the SQS and SNS clients in the AWS SDK for .NET v2
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Amazon; | |
using Amazon.SQS; | |
using Amazon.SQS.Model; | |
using Amazon.SQS.Util; | |
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Threading; | |
namespace Sixeyed.Blogging.Aws | |
{ | |
public class QueueClient | |
{ | |
private AmazonSQSClient _sqsClient; | |
public string QueueName { get; private set; } | |
internal string QueueUrl { get; private set; } | |
internal string QueueArn { get; private set; } | |
private Action<Message> _receiveAction; | |
private CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); | |
public QueueClient(string queueName) | |
{ | |
_sqsClient = new AmazonSQSClient(RegionEndpoint.EUWest1); | |
QueueName = queueName; | |
Ensure(); | |
} | |
public void Ensure() | |
{ | |
if (!Exists()) | |
{ | |
var request = new CreateQueueRequest(); | |
request.QueueName = QueueName; | |
var response = _sqsClient.CreateQueue(request); | |
QueueUrl = response.QueueUrl; | |
} | |
} | |
public bool Exists() | |
{ | |
var exists = false; | |
var queues = _sqsClient.ListQueues(); | |
var matchString = string.Format("/{0}", QueueName); | |
var matches = queues.QueueUrls.Where(x => x.EndsWith(QueueName)); | |
if (matches.Count() == 1) | |
{ | |
exists = true; | |
QueueUrl = matches.ElementAt(0); | |
PopulateArn(); | |
} | |
return exists; | |
} | |
private void PopulateArn() | |
{ | |
var attributes = _sqsClient.GetQueueAttributes(new GetQueueAttributesRequest | |
{ | |
AttributeNames = new List<string>(new string[] {SQSConstants.ATTRIBUTE_QUEUE_ARN}), | |
QueueUrl = QueueUrl | |
}); | |
QueueArn = attributes.QueueARN; | |
} | |
public void DeleteQueue() | |
{ | |
var request = new DeleteQueueRequest(); | |
request.QueueUrl = QueueUrl; | |
_sqsClient.DeleteQueue(request); | |
} | |
public void Unsubscribe() | |
{ | |
_cancellationTokenSource.Cancel(); | |
} | |
private async void Subscribe() | |
{ | |
if (!_cancellationTokenSource.IsCancellationRequested) | |
{ | |
var request = new ReceiveMessageRequest { MaxNumberOfMessages = 10 }; | |
request.QueueUrl = QueueUrl; | |
var result = await _sqsClient.ReceiveMessageAsync(request, _cancellationTokenSource.Token); | |
if (result.Messages.Count > 0) | |
{ | |
foreach (var message in result.Messages) | |
{ | |
if (_receiveAction != null && message != null) | |
{ | |
_receiveAction(message); | |
DeleteMessage(message.ReceiptHandle); | |
} | |
} | |
} | |
} | |
if (!_cancellationTokenSource.IsCancellationRequested) | |
{ | |
Subscribe(); | |
} | |
} | |
private DeleteMessageResponse DeleteMessage(string receiptHandle) | |
{ | |
var request = new DeleteMessageRequest(); | |
request.QueueUrl = QueueUrl; | |
request.ReceiptHandle = receiptHandle; | |
return _sqsClient.DeleteMessage(request); | |
} | |
public void Subscribe(Action<Message> receiveAction) | |
{ | |
_receiveAction = receiveAction; | |
_cancellationTokenSource = new CancellationTokenSource(); | |
Subscribe(); | |
} | |
public void Send(Message message) | |
{ | |
var request = new SendMessageRequest(); | |
request.QueueUrl = QueueUrl; | |
request.MessageBody = message.Body; | |
_sqsClient.SendMessage(request); | |
} | |
internal void AllowSnsToSendMessages(TopicClient topicClient) | |
{ | |
var policy = SetQueueAttributeRequest.AllowSendFormat.Replace("%QueueArn%", QueueArn).Replace("%TopicArn%", topicClient.TopicArn); | |
var request = new SetQueueAttributesRequest(); | |
request.Attributes.Add("Policy", policy); | |
request.QueueUrl = QueueUrl; | |
var response = _sqsClient.SetQueueAttributes(request); | |
} | |
public bool HasMessages() | |
{ | |
var request = new GetQueueAttributesRequest | |
{ | |
QueueUrl = QueueUrl, | |
AttributeNames = new List<string>(new string[] { SQSConstants.ATTRIBUTE_APPROXIMATE_NUMBER_OF_MESSAGES}) | |
}; | |
var response = _sqsClient.GetQueueAttributes(request); | |
return response.ApproximateNumberOfMessages > 0; | |
} | |
public bool IsListening() | |
{ | |
return !_cancellationTokenSource.IsCancellationRequested; | |
} | |
private struct SetQueueAttributeRequest | |
{ | |
public const string AllowSendFormat = | |
@"{ | |
""Statement"": [ | |
{ | |
""Sid"": ""MySQSPolicy001"", | |
""Effect"": ""Allow"", | |
""Principal"": { | |
""AWS"": ""*"" | |
}, | |
""Action"": ""sqs:SendMessage"", | |
""Resource"": ""%QueueArn%"", | |
""Condition"": { | |
""ArnEquals"": { | |
""aws:SourceArn"": ""%TopicArn%"" | |
} | |
} | |
} | |
] | |
}"; | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Amazon; | |
using Amazon.SimpleNotificationService; | |
using Amazon.SimpleNotificationService.Model; | |
using Amazon.SQS.Model; | |
using System; | |
using System.Linq; | |
namespace Sixeyed.Blogging.Aws | |
{ | |
public class TopicClient | |
{ | |
private AmazonSimpleNotificationServiceClient _snsClient; | |
public string TopicName { get; private set; } | |
public string SubscriptionName { get; private set; } | |
internal string TopicArn { get; private set; } | |
private string _subscriptionArn; | |
private QueueClient _queueClient; | |
public TopicClient(string topicName) : this(topicName, null) { } | |
public TopicClient(string topicName, string subscriptionName) | |
{ | |
_snsClient = new AmazonSimpleNotificationServiceClient(RegionEndpoint.EUWest1); | |
TopicName = topicName; | |
SubscriptionName = subscriptionName; | |
Ensure(); | |
} | |
private void Ensure() | |
{ | |
if (!TopicExists()) | |
{ | |
var request = new CreateTopicRequest(); | |
request.Name = TopicName; | |
var response = _snsClient.CreateTopic(request); | |
TopicArn = response.TopicArn; | |
} | |
if (!string.IsNullOrEmpty(SubscriptionName)) | |
{ | |
_queueClient = new QueueClient(SubscriptionName); | |
if (!SubscriptionExists()) | |
{ | |
var response = _snsClient.Subscribe(new SubscribeRequest | |
{ | |
TopicArn = TopicArn, | |
Protocol = "sqs", | |
Endpoint = _queueClient.QueueArn | |
}); | |
_subscriptionArn = response.SubscriptionArn; | |
var attrRequest = new SetSubscriptionAttributesRequest | |
{ | |
AttributeName = "RawMessageDelivery", | |
AttributeValue = "true", | |
SubscriptionArn = _subscriptionArn | |
}; | |
_snsClient.SetSubscriptionAttributes(attrRequest); | |
_queueClient.AllowSnsToSendMessages(this); | |
} | |
} | |
} | |
private bool TopicExists() | |
{ | |
var exists = false; | |
var response = _snsClient.ListTopics(); | |
var matchString = string.Format(":{0}", TopicName); | |
var matches = response.Topics.Where(x => x.TopicArn.EndsWith(matchString)); | |
if (matches.Count() == 1) | |
{ | |
exists = true; | |
TopicArn = matches.ElementAt(0).TopicArn; | |
} | |
return exists; | |
} | |
private bool SubscriptionExists() | |
{ | |
var exists = false; | |
var request = new ListSubscriptionsByTopicRequest { | |
TopicArn = TopicArn | |
}; | |
var response = _snsClient.ListSubscriptionsByTopic(request); | |
var matchString = string.Format(":{0}", SubscriptionName); | |
var matches = response.Subscriptions.Where(x => x.Endpoint.EndsWith(matchString)); | |
if (matches.Count() == 1) | |
{ | |
exists = true; | |
_subscriptionArn = matches.ElementAt(0).SubscriptionArn; | |
} | |
return exists; | |
} | |
public void Subscribe(Action<Message> receiveAction) | |
{ | |
_queueClient.Subscribe(receiveAction); | |
} | |
public void Unsubscribe() | |
{ | |
_queueClient.Unsubscribe(); | |
} | |
public bool IsListening() | |
{ | |
return _queueClient.IsListening(); | |
} | |
public bool HasMessages() | |
{ | |
return _queueClient.HasMessages(); | |
} | |
public void Publish(Message message) | |
{ | |
var request = new PublishRequest(); | |
request.TopicArn = TopicArn; | |
request.Message = message.Body; | |
var response = _snsClient.Publish(request); | |
} | |
public void DeleteSubscription() | |
{ | |
_queueClient.DeleteQueue(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment