Skip to content

Instantly share code, notes, and snippets.

@phatboyg
Created December 27, 2014 16:30
Show Gist options
  • Save phatboyg/ea00db3cd3a74680e51d to your computer and use it in GitHub Desktop.
Save phatboyg/ea00db3cd3a74680e51d to your computer and use it in GitHub Desktop.
Example of the RequestClient usage with MassTransit v3
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using MassTransit;
namespace SimpleConsoleApp
{
internal class Program
{
private static void Main()
{
IBusControl busControl = Bus.Factory.CreateUsingInMemory(x =>
{
x.UseBsonSerializer();
x.ReceiveEndpoint("input_queue", e => e.Consumer<CommandConsumer>());
});
// create a command client to simplify the request syntax to the consumer
IRequestClient<SimpleCommand, SimpleCommandResult> commandClient =
new MessageRequestClient<SimpleCommand, SimpleCommandResult>(busControl,
new Uri("loopback://localhost/input_queue"), TimeSpan.FromSeconds(30));
using (BusHandle handle = busControl.Start().Result)
{
Console.WriteLine("Running a bunch of commands...");
SendManyCommands(commandClient)
.Wait(TimeSpan.FromMinutes(1));
}
}
private static async Task SendManyCommands(IRequestClient<SimpleCommand, SimpleCommandResult> client)
{
Stopwatch timer = Stopwatch.StartNew();
// warm up the pipeline
await client.Request(new SimpleCommand
{
CommandId = NewId.NextGuid(),
Timestamp = DateTime.UtcNow
});
int count = 100000;
await Task.WhenAll(Enumerable.Range(0, count).Select(async x =>
{
SimpleCommandResult result = await client.Request(new SimpleCommand
{
CommandId = NewId.NextGuid(),
Timestamp = DateTime.UtcNow
});
}));
timer.Stop();
Console.WriteLine("Time to process {0} messages = {1}", count, timer.ElapsedMilliseconds + "ms");
Console.WriteLine("Messages per second: {0}", count*2*1000/timer.ElapsedMilliseconds);
}
}
public class CommandConsumer :
IConsumer<SimpleCommand>
{
public Task Consume(ConsumeContext<SimpleCommand> context)
{
Stopwatch stopwatch = Stopwatch.StartNew();
return context.RespondAsync(new SimpleCommandResult
{
ResultId = NewId.NextGuid(),
CommandId = context.Message.CommandId,
Timestamp = DateTime.UtcNow,
Duration = stopwatch.Elapsed,
ResultCode = 200,
ResultText = "ok",
});
}
}
public class SimpleCommand
{
public Guid CommandId { get; set; }
public DateTime Timestamp { get; set; }
}
public class SimpleCommandResult
{
public Guid ResultId { get; set; }
public Guid CommandId { get; set; }
public DateTime Timestamp { get; set; }
public TimeSpan Duration { get; set; }
public short ResultCode { get; set; }
public string ResultText { get; set; }
}
}
@EAbychkov
Copy link

EAbychkov commented Mar 29, 2017

Is it possible to work with multiple consumers?
For instance: One consumer can work only with 6 tasks simultaneously, and there are 5 consumers, 4 of them are fully loaded and I want to send SimpleCommand to a free one and get the response from it.

@EAbychkov
Copy link

EAbychkov commented Mar 29, 2017

Just like in this example http://masstransit-project.com/MassTransit/usage/request-response.html
But I have multiple Consumers and each of them can take specific amount of tasks if one is busy I need another Consumer take the task and return Answer like "The task is in process"
So this will work like load balancing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment