Skip to content

Instantly share code, notes, and snippets.

@OnurGumus
Created March 7, 2025 06:28
Show Gist options
  • Save OnurGumus/069aa8e4bed2bd5b061945ee1199f326 to your computer and use it in GitHub Desktop.
Save OnurGumus/069aa8e4bed2bd5b061945ee1199f326 to your computer and use it in GitHub Desktop.
marten snapshot
using Marten;
using Marten.Events.Aggregation;
using Marten.Events.Projections;
const string connectionString =
"PORT = 5432; HOST = localhost; TIMEOUT = 15; POOLING = True; DATABASE = 'marten_cqrs_test'; USER ID = 'test'";
var documentStore = DocumentStore.For(options =>
{
options.Connection(connectionString);
options.Projections.Add<WarehouseProductProjection>(ProjectionLifecycle.Inline);
// same transaction as the events being appended
options.Projections.Snapshot<WarehouseProductWriteModel>(SnapshotLifecycle.Inline);
// Opt into an optimization for the inline aggregates
// used with FetchForWriting()
options.Projections.UseIdentityMapForAggregates = true;
});
var id = Guid.NewGuid();
var warehouseRepository = new WarehouseRepository(documentStore);
var warehouseProductReadModel = warehouseRepository.Get(id);
DemoConsole.WriteWithColour($"{warehouseProductReadModel?.QuantityOnHand ?? 0} items of stock in the warehouse for {id}");
var handler = new WarehouseProductHandler(id, documentStore);
await handler.ReceiveProduct(100);
DemoConsole.WriteWithColour($"Received 100 items of stock into the warehouse for {id}");
await handler.ShipProduct(10);
DemoConsole.WriteWithColour($"Shipped 10 items of stock out of the warehouse for {id}");
await handler.AdjustInventory(5, "Ordered too many");
DemoConsole.WriteWithColour($"Found 5 items of stock hiding in the warehouse for {id} and have adjusted the stock count");
warehouseProductReadModel = warehouseRepository.Get(id);
DemoConsole.WriteWithColour($"{warehouseProductReadModel.QuantityOnHand} items of stock in the warehouse for {warehouseProductReadModel.Id}");
public record ProductShipped(Guid Id, int Quantity, DateTime DateTime);
public record ProductReceived(Guid Id, int Quantity, DateTime DateTime);
public record InventoryAdjusted(Guid Id, int Quantity, string Reason, DateTime DateTime);
public class WarehouseRepository
{
private readonly IDocumentStore documentStore;
public WarehouseRepository(IDocumentStore documentStore)
{
this.documentStore = documentStore;
}
public WarehouseProductReadModel Get(Guid id)
{
using var session = documentStore.QuerySession();
var doc = session.Query<WarehouseProductReadModel>()
.SingleOrDefault(x => x.Id == id);
return doc;
}
}
public class WarehouseProductReadModel
{
public Guid Id { get; set; }
public int QuantityOnHand { get; set; }
}
public class WarehouseProductProjection : SingleStreamProjection<WarehouseProductReadModel>
{
public WarehouseProductProjection()
{
ProjectEvent<ProductShipped>(Apply);
ProjectEvent<ProductReceived>(Apply);
ProjectEvent<InventoryAdjusted>(Apply);
}
public void Apply(WarehouseProductReadModel readModel, ProductShipped evnt)
{
readModel.QuantityOnHand -= evnt.Quantity;
}
public void Apply(WarehouseProductReadModel readModel, ProductReceived evnt)
{
readModel.QuantityOnHand += evnt.Quantity;
}
public void Apply(WarehouseProductReadModel readModel, InventoryAdjusted evnt)
{
readModel.QuantityOnHand += evnt.Quantity;
}
}
public class WarehouseProductWriteModel
{
public Guid Id { get; set; }
public int QuantityOnHand { get; set; }
public void Apply(ProductShipped evnt)
{
Id = evnt.Id;
QuantityOnHand -= evnt.Quantity;
}
public void Apply(ProductReceived evnt)
{
Id = evnt.Id;
QuantityOnHand += evnt.Quantity;
}
public void Apply(InventoryAdjusted evnt)
{
Id = evnt.Id;
QuantityOnHand += evnt.Quantity;
}
}
public class WarehouseProductHandler
{
private readonly Guid id;
private readonly IDocumentStore documentStore;
public WarehouseProductHandler(Guid id, IDocumentStore documentStore)
{
this.id = id;
this.documentStore = documentStore;
}
public async Task ShipProduct(int quantity)
{
await using var session = documentStore.LightweightSession();
var stream = await session.Events.FetchForWriting<WarehouseProductWriteModel>(id);
var warehouseProduct = stream?.Aggregate;
if (quantity > warehouseProduct?.QuantityOnHand)
{
throw new InvalidDomainException("Ah... we don't have enough product to ship?");
}
stream.AppendOne(new ProductShipped(id, quantity, DateTime.UtcNow));
await session.SaveChangesAsync();
}
public async Task ReceiveProduct(int quantity)
{
using var session = documentStore.LightweightSession();
var stream = await session.Events.FetchForWriting<WarehouseProductWriteModel>(id);
stream.AppendOne(new ProductReceived(id, quantity, DateTime.UtcNow));
await session.SaveChangesAsync();
}
public async Task AdjustInventory(int quantity, string reason)
{
using var session = documentStore.LightweightSession();
var stream = await session.Events.FetchForWriting<WarehouseProductWriteModel>(id);
var warehouseProduct = stream?.Aggregate;
if (warehouseProduct?.QuantityOnHand + quantity < 0)
{
throw new InvalidDomainException("Cannot adjust to a negative quantity on hand.");
}
stream.AppendOne(new InventoryAdjusted(id, quantity, reason, DateTime.UtcNow));
await session.SaveChangesAsync();
}
}
public class InvalidDomainException : Exception
{
public InvalidDomainException(string message) : base(message)
{
}
}
public static class DemoConsole
{
public static void WriteWithColour(string value)
{
lock (Console.Out)
{
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine(value);
Console.ForegroundColor = ConsoleColor.White;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment