Created
December 27, 2014 16:30
-
-
Save phatboyg/ea00db3cd3a74680e51d to your computer and use it in GitHub Desktop.
Example of the RequestClient usage with MassTransit v3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Diagnostics; | |
using System.Linq; | |
using System.Threading.Tasks; | |
using MassTransit; | |
namespace SimpleConsoleApp | |
{ | |
internal class Program | |
{ | |
private static void Main() | |
{ | |
IBusControl busControl = Bus.Factory.CreateUsingInMemory(x => | |
{ | |
x.UseBsonSerializer(); | |
x.ReceiveEndpoint("input_queue", e => e.Consumer<CommandConsumer>()); | |
}); | |
// create a command client to simplify the request syntax to the consumer | |
IRequestClient<SimpleCommand, SimpleCommandResult> commandClient = | |
new MessageRequestClient<SimpleCommand, SimpleCommandResult>(busControl, | |
new Uri("loopback://localhost/input_queue"), TimeSpan.FromSeconds(30)); | |
using (BusHandle handle = busControl.Start().Result) | |
{ | |
Console.WriteLine("Running a bunch of commands..."); | |
SendManyCommands(commandClient) | |
.Wait(TimeSpan.FromMinutes(1)); | |
} | |
} | |
private static async Task SendManyCommands(IRequestClient<SimpleCommand, SimpleCommandResult> client) | |
{ | |
Stopwatch timer = Stopwatch.StartNew(); | |
// warm up the pipeline | |
await client.Request(new SimpleCommand | |
{ | |
CommandId = NewId.NextGuid(), | |
Timestamp = DateTime.UtcNow | |
}); | |
int count = 100000; | |
await Task.WhenAll(Enumerable.Range(0, count).Select(async x => | |
{ | |
SimpleCommandResult result = await client.Request(new SimpleCommand | |
{ | |
CommandId = NewId.NextGuid(), | |
Timestamp = DateTime.UtcNow | |
}); | |
})); | |
timer.Stop(); | |
Console.WriteLine("Time to process {0} messages = {1}", count, timer.ElapsedMilliseconds + "ms"); | |
Console.WriteLine("Messages per second: {0}", count*2*1000/timer.ElapsedMilliseconds); | |
} | |
} | |
public class CommandConsumer : | |
IConsumer<SimpleCommand> | |
{ | |
public Task Consume(ConsumeContext<SimpleCommand> context) | |
{ | |
Stopwatch stopwatch = Stopwatch.StartNew(); | |
return context.RespondAsync(new SimpleCommandResult | |
{ | |
ResultId = NewId.NextGuid(), | |
CommandId = context.Message.CommandId, | |
Timestamp = DateTime.UtcNow, | |
Duration = stopwatch.Elapsed, | |
ResultCode = 200, | |
ResultText = "ok", | |
}); | |
} | |
} | |
public class SimpleCommand | |
{ | |
public Guid CommandId { get; set; } | |
public DateTime Timestamp { get; set; } | |
} | |
public class SimpleCommandResult | |
{ | |
public Guid ResultId { get; set; } | |
public Guid CommandId { get; set; } | |
public DateTime Timestamp { get; set; } | |
public TimeSpan Duration { get; set; } | |
public short ResultCode { get; set; } | |
public string ResultText { get; set; } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Just like in this example http://masstransit-project.com/MassTransit/usage/request-response.html
But I have multiple Consumers and each of them can take specific amount of tasks if one is busy I need another Consumer take the task and return Answer like "The task is in process"
So this will work like load balancing.