Skip to content

Instantly share code, notes, and snippets.

@egorsmkv
Last active August 14, 2025 09:03
Show Gist options
  • Save egorsmkv/19ba7c52980bb801da5a832092397367 to your computer and use it in GitHub Desktop.
Save egorsmkv/19ba7c52980bb801da5a832092397367 to your computer and use it in GitHub Desktop.
use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::Mutex;
/// ======== TYPES ========
type UserId = String;
/// ======== STRUCTS, ENUMS ========
#[derive(Debug)]
enum SnapshotterError<RepoError> {
AggregateLoading(RepoError),
EventsReading(RepoError),
AggregateStoring(RepoError),
MissingAggregateId,
}
#[derive(Debug, Clone)]
struct EventMeta<Id> {
_id: PhantomData<Id>,
}
#[derive(Debug, Clone, Default)]
struct UserAggregate {
id: UserId,
version: i64,
name: String,
}
#[derive(Debug, Clone)]
enum UserEvent {
NameUpdated(String),
}
#[derive(Debug, Clone)]
struct MockRepo {
snapshots: Arc<Mutex<HashMap<UserId, UserAggregate>>>,
events: Arc<Mutex<HashMap<UserId, Vec<UserEvent>>>>,
}
struct Snapshotter<Repo> {
repo: Repo,
}
struct UpdateNameCommand {
id: UserId,
}
/// ======== TRAITS ========
trait Aggregate: Clone {
type Id: Clone + Send + Sync + Eq + std::hash::Hash;
fn id(&self) -> &Self::Id;
}
trait Command {
type Aggregate: Aggregate;
fn aggregate_id(&self) -> Option<&<Self::Aggregate as Aggregate>::Id>;
}
trait VersionedAggregate: Aggregate {
fn version(&self) -> i64;
}
trait EventMessageSourced<E, M> {
fn apply_event_message(&mut self, msg: &E);
}
trait AggregateEvent: Clone {}
trait Repository<A: Aggregate>: Clone + Send + Sync + 'static {
type Error: std::fmt::Debug + Send;
type Event: AggregateEvent + Send + Sync + 'static;
fn load(&self, id: &A::Id) -> Result<Option<A>, Self::Error>;
fn store(&self, agg: &A) -> Result<(), Self::Error>;
fn read_events(
&self,
id: &A::Id,
since_version: Option<i64>,
) -> Result<impl Iterator<Item = Result<Self::Event, Self::Error>>, Self::Error>;
}
trait CommandGateway<C: Command> {
type Error;
fn command(&self, cmd: C) -> Result<Option<C::Aggregate>, Self::Error>;
}
impl<Repo> Snapshotter<Repo> {
fn new(repo: Repo) -> Self {
Self { repo }
}
}
impl<Cmd, Repo> CommandGateway<Cmd> for Snapshotter<Repo>
where
Cmd: Command + Send + Sync + 'static,
Cmd::Aggregate: VersionedAggregate
+ EventMessageSourced<Repo::Event, EventMeta<<Cmd::Aggregate as Aggregate>::Id>>
+ Default
+ Send,
Repo: Repository<Cmd::Aggregate>,
{
type Error = SnapshotterError<Repo::Error>;
fn command(&self, cmd: Cmd) -> Result<Option<Cmd::Aggregate>, Self::Error> {
let repo = self.repo.clone();
let aggregate_id = cmd
.aggregate_id()
.ok_or(SnapshotterError::MissingAggregateId)?;
let mut agg = repo
.load(aggregate_id)
.map_err(SnapshotterError::AggregateLoading)?
.unwrap_or_else(Cmd::Aggregate::default);
let initial_version = agg.version();
// Re-hydrate the aggregate state by applying new events from the event store.
let events = repo
.read_events(aggregate_id, Some(initial_version))
.map_err(SnapshotterError::EventsReading)?;
for event_result in events {
let event_message = event_result.map_err(SnapshotterError::EventsReading)?;
agg.apply_event_message(&event_message);
}
if agg.version() > initial_version {
repo.store(&agg)
.map_err(SnapshotterError::AggregateStoring)?;
Ok(Some(agg))
} else {
Ok(None)
}
}
}
impl Aggregate for UserAggregate {
type Id = UserId;
fn id(&self) -> &Self::Id {
&self.id
}
}
impl VersionedAggregate for UserAggregate {
fn version(&self) -> i64 {
self.version
}
}
impl AggregateEvent for UserEvent {}
impl EventMessageSourced<UserEvent, EventMeta<UserId>> for UserAggregate {
fn apply_event_message(&mut self, msg: &UserEvent) {
match msg {
UserEvent::NameUpdated(name) => {
self.name.clone_from(name);
}
}
self.version += 1;
}
}
impl Command for UpdateNameCommand {
type Aggregate = UserAggregate;
fn aggregate_id(&self) -> Option<&<Self::Aggregate as Aggregate>::Id> {
Some(&self.id)
}
}
impl MockRepo {
fn new() -> Self {
MockRepo {
snapshots: Arc::new(Mutex::new(HashMap::new())),
events: Arc::new(Mutex::new(HashMap::new())),
}
}
}
impl Repository<UserAggregate> for MockRepo {
type Error = String;
type Event = UserEvent;
fn load(
&self,
id: &<UserAggregate as Aggregate>::Id,
) -> Result<Option<UserAggregate>, Self::Error> {
let snapshots = self.snapshots.lock().unwrap();
Ok(snapshots.get(id).cloned())
}
fn store(&self, agg: &UserAggregate) -> Result<(), Self::Error> {
let mut snapshots = self.snapshots.lock().unwrap();
snapshots.insert(agg.id().clone(), agg.clone());
println!(
"--- Stored snapshot for user '{}', version: {}",
agg.id(),
agg.version()
);
Ok(())
}
fn read_events(
&self,
id: &<UserAggregate as Aggregate>::Id,
since_version: Option<i64>,
) -> Result<impl Iterator<Item = Result<Self::Event, Self::Error>>, Self::Error> {
let events_db = self.events.lock().unwrap();
let since_version = since_version.unwrap_or(0);
if let Some(all_events) = events_db.get(id) {
// This logic assumes `all_events` is the complete history.
// It correctly skips events already accounted for in the snapshot.
let new_events: Vec<Result<UserEvent, String>> = all_events
.iter()
.skip(usize::try_from(since_version).expect("cannot convert version to usize"))
.cloned()
.map(Ok)
.collect();
println!(
"--- Reading {} new events for user '{}'",
new_events.len(),
id
);
Ok(new_events.into_iter())
} else {
Ok(vec![].into_iter())
}
}
}
fn main() {
let repo = MockRepo::new();
let user_id = "user-123".to_string();
{
// A snapshot of the user at version 1. This state was reached after one event.
let initial_snapshot = UserAggregate {
id: user_id.clone(),
version: 1,
name: "Initial Name".to_string(),
};
repo.snapshots
.lock()
.unwrap()
.insert(user_id.clone(), initial_snapshot);
// The event store should contain the FULL history of events.
// The snapshot at v1 corresponds to the state after the first event.
let all_events = vec![
// Event that led to the v1 snapshot
UserEvent::NameUpdated("Initial Name".to_string()),
// Events that occurred after the snapshot
UserEvent::NameUpdated("Second Name".to_string()), // This will bring version to 2
UserEvent::NameUpdated("Final Name".to_string()), // This will bring version to 3
];
repo.events
.lock()
.unwrap()
.insert(user_id.clone(), all_events);
}
let snapshotter = Snapshotter::<MockRepo>::new(repo.clone());
let command = UpdateNameCommand {
id: user_id.clone(),
};
println!("Dispatching command for user '{user_id}'...");
let result = snapshotter.command(command);
match result {
Ok(Some(final_aggregate)) => {
println!("\n✅ Command successful!");
println!("Final Aggregate State: {final_aggregate:?}");
assert_eq!(final_aggregate.version(), 3);
assert_eq!(final_aggregate.name, "Final Name");
}
Ok(None) => {
println!("\nℹ️ Command resulted in no changes.");
}
Err(e) => {
println!("\n❌ Command failed: {e:?}");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment