Created
February 23, 2024 13:27
-
-
Save flew2bits/128d637f822de1835b58d0bbe4abc15e to your computer and use it in GitHub Desktop.
Repro demonstrating possible inline fanout bug
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Marten; | |
using Marten.Events.Daemon.Resiliency; | |
using Marten.Events.Projections; | |
using Marten.Schema; | |
using Microsoft.Extensions.DependencyInjection; | |
using Microsoft.Extensions.Hosting; | |
var builder = Host.CreateApplicationBuilder(); | |
builder.Services.AddMarten(opts => | |
{ | |
opts.Connection("Host=localhost;Port=5432;User Id=postgres;Password=pgsql;Persist Security Info=true"); | |
opts.Projections.Add<FanoutProjection>(ProjectionLifecycle.Async); | |
}) | |
.UseLightweightSessions() | |
.AddAsyncDaemon(DaemonMode.Solo); | |
builder.Services.AddHostedService<App>(); | |
var app = builder.Build(); | |
app.Run(); | |
public record A(Guid Id, string Whatever, B[] Children); | |
public record B(int ChildId, string Something); | |
public record C([property: Identity] int ChildId, string Whatever, string Something); | |
public record SplitEvent(A Parent, B Child); | |
public class FanoutProjection: MultiStreamProjection<C, int> | |
{ | |
public FanoutProjection() | |
{ | |
FanOut<A, SplitEvent>(evt => evt.Children.Select(child => new SplitEvent(evt, child)), FanoutMode.BeforeGrouping); | |
Identity<SplitEvent>(evt => evt.Child.ChildId); | |
} | |
public static C Create(SplitEvent evt) => new(evt.Child.ChildId, evt.Parent.Whatever, evt.Child.Something); | |
} | |
public class App(IHostApplicationLifetime hostApplicationLifetime, IDocumentStore store) | |
: BackgroundService | |
{ | |
protected override Task ExecuteAsync(CancellationToken stoppingToken) | |
=> Task.Run(async () => | |
{ | |
await using var session = store.LightweightSession(); | |
session.Events.StartStream(new A(Guid.NewGuid(), "Whatever", | |
[new B(1, "Something 1"), new B(2, "Something 2"), new B(3, "Something 3")])); | |
await session.SaveChangesAsync(stoppingToken); | |
// give the async daemon time to finish | |
await Task.Delay(2000, stoppingToken); | |
hostApplicationLifetime.StopApplication(); | |
}, stoppingToken); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment