Created
October 1, 2019 19:06
-
-
Save allenmichael/36c77d82399f049cd1f9aead9ad3a52d to your computer and use it in GitHub Desktop.
SQSConsumer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| using System.Threading.Tasks; | |
| using Amazon.SQS; | |
| using Amazon.SQS.Model; | |
| using Newtonsoft.Json; | |
| using SQSConsumer.Models; | |
| using Amazon.SimpleSystemsManagement; | |
| using Amazon.SimpleSystemsManagement.Model; | |
| using System.Collections.Generic; | |
| using Amazon.DynamoDBv2.DataModel; | |
| using Amazon.DynamoDBv2; | |
| using Amazon.DynamoDBv2.DocumentModel; | |
| namespace SQSPlayground | |
| { | |
| class Program | |
| { | |
| private static Amazon.RegionEndpoint REGION = Amazon.RegionEndpoint.USWest2; | |
| private static Amazon.DynamoDBv2.DocumentModel.Table Table; | |
| static void Main(string[] args) | |
| { | |
| var config = ConfigAsync().Result; | |
| using (var ddbClient = new AmazonDynamoDBClient(REGION)) | |
| { | |
| Table = Table.LoadTable(ddbClient, config[1]); | |
| while (true) | |
| { | |
| MainAsync(config[0]).Wait(); | |
| } | |
| } | |
| } | |
| public static async Task<string[]> ConfigAsync() | |
| { | |
| var request = new GetParameterRequest() | |
| { | |
| Name = "/dotnet/QueueUrl" | |
| }; | |
| var tblRequest = new GetParameterRequest() | |
| { | |
| Name = "/dotnet/TableName" | |
| }; | |
| var region = Amazon.RegionEndpoint.USWest2; | |
| using (var client = new AmazonSimpleSystemsManagementClient(REGION)) | |
| { | |
| try | |
| { | |
| var response = await client.GetParameterAsync(request); | |
| var tblResponse = await client.GetParameterAsync(tblRequest); | |
| Console.WriteLine($"Parameter {request.Name} has value: {response.Parameter.Value}"); | |
| return new string[] { response.Parameter.Value, tblResponse.Parameter.Value }; | |
| } | |
| catch (Exception ex) | |
| { | |
| Console.Error.WriteLine($"Error occurred: {ex.Message}"); | |
| throw ex; | |
| } | |
| } | |
| } | |
| static async Task MainAsync(string url) | |
| { | |
| using (var sqsClient = new AmazonSQSClient(REGION)) | |
| { | |
| Console.WriteLine("Another one"); | |
| ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest() | |
| { | |
| QueueUrl = url, | |
| WaitTimeSeconds = 5, | |
| MaxNumberOfMessages = 2 | |
| }; | |
| var receiveMessageResponse = await sqsClient.ReceiveMessageAsync(receiveMessageRequest); | |
| var tasks = new List<Task>(); | |
| receiveMessageResponse.Messages.ForEach(msg => | |
| { | |
| tasks.Add(ProcessMessageAsync(msg)); | |
| }); | |
| await Task.WhenAll(tasks); | |
| System.Console.WriteLine("All Processed..."); | |
| System.Console.WriteLine("Starting again."); | |
| // await Task.Factory.StartNew(() => | |
| // { | |
| // Parallel.ForEach(receiveMessageResponse.Messages, new ParallelOptions { MaxDegreeOfParallelism = 2 }, ProcessMessageAsync); | |
| // }); | |
| // | |
| //foreach (var message in receiveMessageResponse.Messages) | |
| //{ | |
| // System.Console.WriteLine(message.Body); | |
| // // call a method to process messages and pass in message | |
| //} | |
| } | |
| } | |
| static async Task<string> PersistMessageStateAsync(Amazon.SQS.Model.Message msg) | |
| { | |
| var status = ""; | |
| try | |
| { | |
| var tblMsg = await Table.GetItemAsync(msg.MessageId); | |
| System.Console.WriteLine(tblMsg == null); | |
| if (tblMsg == null) | |
| { | |
| status = "Processing"; | |
| System.Console.WriteLine("Setting message status..."); | |
| var processingMsg = new Document(); | |
| processingMsg["messageId"] = msg.MessageId; | |
| processingMsg["status"] = status; | |
| var result = await Table.PutItemAsync(processingMsg); | |
| System.Console.WriteLine($"Set message status for {msg.MessageId}"); | |
| return status; | |
| } | |
| else | |
| { | |
| System.Console.WriteLine($"Current status for message - {tblMsg["messageId"]}:"); | |
| System.Console.WriteLine($"- {tblMsg["status"]}"); | |
| return tblMsg["status"]; | |
| } | |
| } | |
| catch (Exception e) | |
| { | |
| System.Console.WriteLine(e.Message); | |
| return e.Message; | |
| } | |
| } | |
| static async Task ProcessMessageAsync(Amazon.SQS.Model.Message msg) | |
| { | |
| Console.WriteLine(msg.Body); | |
| try | |
| { | |
| var status = await PersistMessageStateAsync(msg); | |
| System.Console.WriteLine($"Current message {msg.MessageId} status - {status}"); | |
| var deSer = JsonConvert.DeserializeObject<S3Events>(msg.Body); | |
| foreach (var item in deSer.Records) | |
| { | |
| Console.WriteLine(item.EventVersion); | |
| Console.WriteLine(item.UserIdentity.PrincipalId); | |
| Console.WriteLine(item.ResponseElements.AmazonRequestId); | |
| Console.WriteLine(item.S3.S3SchemaVersion); | |
| Console.WriteLine(item.S3.BucketObject.Key); | |
| } | |
| } | |
| catch (Exception e) | |
| { | |
| System.Console.WriteLine(e.Message); | |
| } | |
| Console.WriteLine(msg.ReceiptHandle); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment