Created
March 27, 2020 11:20
-
-
Save tnayanam/f0e29ad5b35b8f0ab8c84a49b745ce02 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.IO; | |
using System.Text; | |
using Newtonsoft.Json; | |
using Amazon.Lambda.Core; | |
using Amazon.Lambda.DynamoDBEvents; | |
using Amazon.DynamoDBv2.Model; | |
using Microsoft.Azure.ServiceBus; | |
using System.Threading.Tasks; | |
using Amazon.DynamoDBv2; | |
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. | |
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))] | |
namespace corona | |
{ | |
public class Function | |
{ | |
private static readonly JsonSerializer _jsonSerializer = new JsonSerializer(); | |
public async Task FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context) | |
{ | |
context.Logger.LogLine($"Beginning to process {dynamoEvent.Records.Count} records..."); | |
try | |
{ | |
foreach (var record in dynamoEvent.Records) | |
{ | |
try | |
{ | |
if (record.EventName == OperationType.REMOVE) | |
{ | |
string streamRecordJson = SerializeStreamRecord(record.Dynamodb); | |
await Send(streamRecordJson, context); | |
} | |
} | |
catch (Exception ex) | |
{ | |
throw ex; | |
} | |
} | |
} | |
catch (Exception ex) | |
{ | |
context.Logger.LogLine("Exception Occurred" + ex.Message); | |
context.Logger.LogLine("Inner Exception Occurred" + ex.InnerException); | |
} | |
context.Logger.LogLine("Stream processing complete."); | |
} | |
private static async Task Send(string stream, ILambdaContext context) | |
{ | |
try | |
{ | |
const string connectionString = "QUEUE_END_POINT"; | |
string queueName = "QUEUE_NAME"; | |
ServiceBusConnectionStringBuilder svc = new ServiceBusConnectionStringBuilder(connectionString); | |
ServiceBusConnection svc1 = new ServiceBusConnection(svc); | |
var client = new QueueClient(svc1, queueName, ReceiveMode.PeekLock, RetryPolicy.Default); | |
var message = new Message(Encoding.UTF8.GetBytes(stream)); | |
await client.SendAsync(message); | |
} | |
catch (Exception ex) | |
{ | |
throw ex; | |
} | |
} | |
private string SerializeStreamRecord(StreamRecord streamRecord) | |
{ | |
try | |
{ | |
using (var writer = new StringWriter()) | |
{ | |
_jsonSerializer.Serialize(writer, streamRecord); | |
return writer.ToString(); | |
} | |
} | |
catch (Exception ex) | |
{ | |
throw ex; | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment