Last active
August 1, 2022 16:00
-
-
Save calebickler/899b577088b8e4189dd3a8e3ba07ebaa to your computer and use it in GitHub Desktop.
Subscribing from a dotnet core hosted service to a SQS queue which only supports polling
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Utility.Environment; | |
using Utility.Queue.Handler; | |
using Utility.Queue.Subscribers; | |
using Microsoft.Extensions.Hosting; | |
using System; | |
using System.Collections.Generic; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace Utility.Queue | |
{ | |
public class QueueSubscriberService<T, U> : BackgroundService where T : IMessageHandler<U> | |
{ | |
private readonly IServiceProvider ServiceProvider; | |
private readonly IEnvironmentRequester EnvironmentRequester; | |
public QueueSubscriber(IServiceProvider serviceProvider, | |
IEnvironmentRequester environmentRequester) | |
{ | |
ServiceProvider = serviceProvider; | |
EnvironmentRequester = environmentRequester; | |
} | |
protected override async Task ExecuteAsync(CancellationToken stoppingToken) | |
{ | |
var subscriber = new SQSSubscriber(EnvironmentRequester, ServiceProvider); | |
var nameResolver = new NameResolver(EnvironmentRequester); | |
await subscriber.SetQueueName(nameResolver.GetQueueName<U>()); | |
await subscriber.PollSQS<T, U>(ServiceProvider, stoppingToken); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Amazon.SQS; | |
using Amazon.SQS.Model; | |
using Utility.Environment; | |
using Utility.Queue.Handler; | |
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace Utility.Queue.Subscribers | |
{ | |
public class SQSQueueSubscriber | |
{ | |
private readonly IEnvironmentRequester EnvironmentRequester; | |
private readonly IServiceProvider ServiceProvider; | |
private readonly string QueueBaseUrl; | |
private readonly string AccessKey; | |
private readonly string SecretAccessKey; | |
private readonly AmazonSQSConfig SQSConfig; | |
private string QueueUrl; | |
public SQSSubscriber(IEnvironmentRequester environmentRequester, | |
IServiceProvider serviceProvider) | |
{ | |
AccessKey = environmentRequester.GetVariable("AWSAccessKeyId"); | |
SecretAccessKey = environmentRequester.GetVariable("AWSSecretAccessKey"); | |
QueueBaseUrl = environmentRequester.GetVariable("SQSEndpointBase"); | |
SQSConfig = new AmazonSQSConfig | |
{ | |
// Locally /queue will be added for elasticmq, remove. Live urls will never have /queue. | |
ServiceURL = QueueBaseUrl.Replace("/queue", "") | |
}; | |
EnvironmentRequester = environmentRequester; | |
ServiceProvider = serviceProvider; | |
} | |
public async Task SetQueueName(string queueName) | |
{ | |
QueueUrl = QueueBaseUrl + "/" + queueName; | |
using (var client = new AmazonSQSClient(AccessKey, SecretAccessKey, SQSConfig)) | |
{ | |
CreateQueueRequest createQueueRequest = new CreateQueueRequest | |
{ | |
QueueName = queueName | |
}; | |
await client.CreateQueueAsync(createQueueRequest); | |
} | |
} | |
public async Task Poll<T, U>(IServiceProvider serviceProvider, CancellationToken stoppingToken) where T : IMessageHandler<U> | |
{ | |
using (var client = new AmazonSQSClient(AccessKey, SecretAccessKey, SQSConfig)) | |
{ | |
while (!stoppingToken.IsCancellationRequested) | |
{ | |
string receiptHandle = ""; | |
try | |
{ | |
var request = new ReceiveMessageRequest | |
{ | |
QueueUrl = QueueUrl, | |
MaxNumberOfMessages = 1, | |
WaitTimeSeconds = 20 | |
}; | |
var response = await client.ReceiveMessageAsync(request); | |
if (response.Messages.Count > 0) | |
{ | |
var sqsMessage = response.Messages.FirstOrDefault(); | |
receiptHandle = sqsMessage.ReceiptHandle; | |
var caller = new HandlerCaller(EnvironmentRequester, ServiceProvider); | |
Task task = Task.Run(() => caller.CallHandler<T, U>(sqsMessage.MessageId, sqsMessage.Body)); | |
await client.DeleteMessageAsync(QueueUrl, sqsMessage.ReceiptHandle); | |
} | |
} | |
catch (Exception ex) | |
{ | |
Console.WriteLine(ex.ToString()); | |
if (!string.IsNullOrEmpty(receiptHandle)) | |
{ | |
await client.DeleteMessageAsync(QueueUrl, receiptHandle); | |
} | |
} | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment