Skip to content

Instantly share code, notes, and snippets.

@bforrest
Created October 17, 2018 14:10
Show Gist options
  • Save bforrest/b0b4d60bc5111d51a0dbc8dc150db76a to your computer and use it in GitHub Desktop.
Save bforrest/b0b4d60bc5111d51a0dbc8dc150db76a to your computer and use it in GitHub Desktop.
ASW Lambda + Masstransit
using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Json;
using System.Text;
using System.Threading.Tasks;
using MassTransit;
using MassTransit.Turnout;
using Messages;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
namespace Consumers
{
public class OtherIssuerAddConsumer : IConsumer<JObject>
{
public Task Consume(ConsumeContext<JObject> context)
{
Console.WriteLine(context);
var ser = new DataContractJsonSerializer(typeof(KnownCommandType),
new DataContractJsonSerializerSettings
{
SerializeReadOnlyTypes = true,
MaxItemsInObjectGraph = 256,
DateTimeFormat = new DateTimeFormat("yyyy-MM-dd'T'HH:mm:ss.FFFFFFFZ")
});
var msg = (JObject)JsonConvert.DeserializeObject(context.Message.ToString());
JToken messageToken;
JObject message;
var hasContent = msg.TryGetValue("Content", StringComparison.CurrentCultureIgnoreCase, out messageToken);
if (hasContent)
{
var target = ser.ReadObject(
new MemoryStream(Encoding.UTF8.GetBytes(JObject.Parse(messageToken.ToString()).ToString())));
}
return Task.CompletedTask;
}
}
}
using System;
using System.Threading.Tasks;
using System.Timers;
using Amazon.Lambda.Core;
using MassTransit;
using Messages;
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]
namespace tests
{
public class MessageProcessor
{
private static bool quit = false;
/// <summary>
/// A simple function that takes a string and does a ToUpper
/// </summary>
/// <param name="input"></param>
/// <param name="context"></param>
/// <returns></returns>
public async Task<string> Process(ILambdaContext context)
{
Console.WriteLine("Receiver");
string mq_uri, username, password;
// localhost
mq_uri = "rabbitmq://localhost";
username = "guest";
password = "guest";
var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
{
var host = sbc.Host(new Uri(mq_uri), h =>
{
h.Username(username);
h.Password(password);
});
sbc.ReceiveEndpoint(host, "OtherCommandConsumer",
ep =>
{
ep.Consumer<OtherCommandConsumer>();
});
});
bus.Start();
Console.WriteLine($"bus started (the battery wasn't dead!)");
//var when = new System.Timers.Timer(5000);
//when.Elapsed += OnTimedEvent;
//when.AutoReset = false;
//when.Enabled = true;
//when.Start();
//Console.WriteLine("Timer Started");
//while (!quit)
//{
//}
//when.Stop();
//Console.WriteLine($"timer stopped");
bus.Stop();
return await Task.FromResult("Messages processed");
}
private static void OnTimedEvent(object source, ElapsedEventArgs e)
{
quit = true;
Console.WriteLine($"timer elapsed");
}
}
}
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using Amazon.Lambda.APIGatewayEvents;
using Amazon.Lambda.Core;
using MassTransit;
using Messages;
using Newtonsoft.Json.Linq;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]
namespace IssuerAdd
{
public class Function
{
/// <summary>
/// A simple function that takes a string and does a ToUpper
/// </summary>
/// <param name="input"></param>
/// <param name="context"></param>
/// <returns></returns>
public async Task<APIGatewayProxyResponse> Publish(Stream stream, ILambdaContext context)
{
var logger = context.Logger;
JObject proxyEvent;
var sr = new StreamReader(stream);
proxyEvent = JObject.Parse(sr.ReadToEnd());
// TODO: handle a bad request stream
JToken messageToken;
JObject message;
// When called from an API Gateway, the "body" property contains the information of interest.
var hasBody = proxyEvent.TryGetValue("body", StringComparison.CurrentCultureIgnoreCase, out messageToken);
message = hasBody ? JObject.Parse(messageToken.ToString()) : proxyEvent;
message["RequestId"] = context.AwsRequestId;
logger.Log("Message: " + message);
string mq_uri, username, password;
// localhost
mq_uri = "rabbitmq://localhost";
username = "guest";
password = "guest";
var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
{
var host = sbc.Host(new Uri(mq_uri), h =>
{
h.Username(username);
h.Password(password);
});
});
var todo = new List<Task>();
using (bus.StartAsync())
{
logger.Log($"bus started (the battery wasn't dead!)");
//todo.Add(bus.Publish<IWsoHostedCommand>(message));
todo.Add(bus.Publish(message));
//todo.Add(bus.Publish<IssuerAddCommand>(message));
await Task.WhenAll(todo.ToArray());
}
logger.Log("Done publishing, shutting down the bus.");
//sr.Close();
//sr.Dispose();
return await Task.FromResult(new APIGatewayProxyResponse
{
Body = $"Message Id {context.AwsRequestId} was published",
StatusCode = 200,
IsBase64Encoded = false
});
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment