Created
October 16, 2023 23:26
-
-
Save djeikyb/32d7882bca19334fb047d1993d514b25 to your computer and use it in GitHub Desktop.
Rabbitmq, MassTransit, & the dotnet generic host in a cli program.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.ComponentModel.DataAnnotations; | |
using System.Net.Mime; | |
using System.Text; | |
using MassTransit; | |
using Microsoft.Extensions.Configuration; | |
using Microsoft.Extensions.DependencyInjection; | |
using Microsoft.Extensions.Hosting; | |
using Microsoft.Extensions.Logging; | |
using Microsoft.Extensions.Options; | |
namespace Poc.MassTransit; | |
internal class Program | |
{ | |
public static async Task<int> Main(string[] args) | |
{ | |
var host = Host.CreateDefaultBuilder() | |
.ConfigureHostConfiguration(b => b.AddJsonFile("appsettings.json")) | |
.ConfigureServices((hbc, services) => | |
{ | |
services.AddOptions<RabbitOptions>() | |
.ValidateDataAnnotations() | |
.BindConfiguration("app:rabbit"); | |
services.AddTransient<SomeMessageConsumer>(); | |
services.AddMassTransit(bus => | |
{ | |
bus.UsingRabbitMq((context, cfg) => | |
{ | |
var rabbitOptions = context.GetRequiredService<IOptions<RabbitOptions>>().Value; | |
cfg.Host(rabbitOptions.Host, rabbitOptions.Port, rabbitOptions.VirtualHost, h => | |
{ | |
h.Username(rabbitOptions.Username); | |
h.Password(rabbitOptions.Password); | |
}); | |
cfg.ReceiveEndpoint( | |
"my_queue", e => | |
{ | |
e.Consumer<SomeMessageConsumer>(context); | |
e.DefaultContentType = new ContentType("application/json"); | |
e.UseRawJsonSerializer(); | |
}); | |
cfg.ConfigureEndpoints(context); | |
}); | |
}); | |
services.AddHostedService<Worker>(); | |
}) | |
.Build(); | |
try | |
{ | |
await host.RunAsync(); | |
} | |
catch (OptionsValidationException e) | |
{ | |
var sb = new StringBuilder(); | |
sb.Append("Bad config for ").Append(e.OptionsType.Name).AppendLine(":"); | |
foreach (var f in e.Failures) | |
{ | |
var skipFrom = f.IndexOf("with the error: '", StringComparison.InvariantCulture) + 17; | |
var skipUntil = f.Length - 2; | |
var msg = f[skipFrom..skipUntil]; | |
sb.Append('\t').AppendLine(msg); | |
} | |
Console.Error.WriteLine(sb); | |
return -1; | |
} | |
return 0; | |
} | |
} | |
public class RabbitOptions | |
{ | |
[Required] public string Host { get; set; } = null!; | |
[Required, Range(1, int.MaxValue)] public ushort Port { get; set; } | |
[Required] public string Username { get; set; } = null!; | |
[Required] public string Password { get; set; } = null!; | |
[Required] public string VirtualHost { get; set; } = null!; | |
public Uri Uri(string queue) => | |
new UriBuilder | |
{ | |
Scheme = "rabbitmq", | |
Port = Port, | |
Path = "/".Equals(VirtualHost) ? $"/{queue}" : $"/{VirtualHost}/{queue}", | |
}.Uri; | |
} | |
public class Worker(IOptions<RabbitOptions> rabbitOptions, IBus bus, ILogger<Worker> logger) | |
: BackgroundService | |
{ | |
private readonly RabbitOptions _rabbitOptions = rabbitOptions.Value; | |
protected override async Task ExecuteAsync(CancellationToken stoppingToken) | |
{ | |
try | |
{ | |
var endpoint = await bus.GetSendEndpoint(_rabbitOptions.Uri("my_queue")); | |
while (!stoppingToken.IsCancellationRequested) | |
{ | |
await endpoint.Send(new SomeMessage { Value = $"The time is {DateTimeOffset.Now}" }, stoppingToken); | |
await Task.Delay(1000, stoppingToken); | |
} | |
} | |
catch (Exception e) | |
{ | |
logger.LogError(e, "Publisher worker failed."); | |
} | |
} | |
} | |
public class SomeMessage | |
{ | |
public required string Value { get; set; } | |
} | |
public class SomeMessageConsumer(ILogger<SomeMessageConsumer> logger) : IConsumer<SomeMessage> | |
{ | |
public async Task Consume(ConsumeContext<SomeMessage> context) | |
{ | |
logger.LogInformation("🐙 Received Text: {Text}", context.Message.Value); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<Project Sdk="Microsoft.NET.Sdk"> | |
<PropertyGroup> | |
<OutputType>Exe</OutputType> | |
<TargetFramework>net8.0</TargetFramework> | |
<ImplicitUsings>enable</ImplicitUsings> | |
<Nullable>enable</Nullable> | |
<EnableConfigurationBindingGenerator>true</EnableConfigurationBindingGenerator> | |
</PropertyGroup> | |
<ItemGroup> | |
<PackageReference Include="MassTransit.RabbitMQ" Version="8.1.1"/> | |
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0-rc.2.23479.6"/> | |
<PackageReference Include="Microsoft.Extensions.Options.DataAnnotations" Version="8.0.0-rc.2.23479.6"/> | |
</ItemGroup> | |
<ItemGroup> | |
<None Update="appsettings.json"> | |
<CopyToOutputDirectory>Always</CopyToOutputDirectory> | |
</None> | |
</ItemGroup> | |
</Project> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment