Last active
August 14, 2025 09:03
-
-
Save egorsmkv/19ba7c52980bb801da5a832092397367 to your computer and use it in GitHub Desktop.
Rustcamp 1_9_phantom, simplified code: https://github.com/rust-bootcamp/rustcamp_summer_2025_Yehor_Smoliakov/tree/master/1_concepts/1_9_phantom
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::collections::HashMap; | |
use std::marker::PhantomData; | |
use std::sync::Arc; | |
use std::sync::Mutex; | |
/// ======== TYPES ======== | |
type UserId = String; | |
/// ======== STRUCTS, ENUMS ======== | |
#[derive(Debug)] | |
enum SnapshotterError<RepoError> { | |
AggregateLoading(RepoError), | |
EventsReading(RepoError), | |
AggregateStoring(RepoError), | |
MissingAggregateId, | |
} | |
#[derive(Debug, Clone)] | |
struct EventMeta<Id> { | |
_id: PhantomData<Id>, | |
} | |
#[derive(Debug, Clone, Default)] | |
struct UserAggregate { | |
id: UserId, | |
version: i64, | |
name: String, | |
} | |
#[derive(Debug, Clone)] | |
enum UserEvent { | |
NameUpdated(String), | |
} | |
#[derive(Debug, Clone)] | |
struct MockRepo { | |
snapshots: Arc<Mutex<HashMap<UserId, UserAggregate>>>, | |
events: Arc<Mutex<HashMap<UserId, Vec<UserEvent>>>>, | |
} | |
struct Snapshotter<Repo> { | |
repo: Repo, | |
} | |
struct UpdateNameCommand { | |
id: UserId, | |
} | |
/// ======== TRAITS ======== | |
trait Aggregate: Clone { | |
type Id: Clone + Send + Sync + Eq + std::hash::Hash; | |
fn id(&self) -> &Self::Id; | |
} | |
trait Command { | |
type Aggregate: Aggregate; | |
fn aggregate_id(&self) -> Option<&<Self::Aggregate as Aggregate>::Id>; | |
} | |
trait VersionedAggregate: Aggregate { | |
fn version(&self) -> i64; | |
} | |
trait EventMessageSourced<E, M> { | |
fn apply_event_message(&mut self, msg: &E); | |
} | |
trait AggregateEvent: Clone {} | |
trait Repository<A: Aggregate>: Clone + Send + Sync + 'static { | |
type Error: std::fmt::Debug + Send; | |
type Event: AggregateEvent + Send + Sync + 'static; | |
fn load(&self, id: &A::Id) -> Result<Option<A>, Self::Error>; | |
fn store(&self, agg: &A) -> Result<(), Self::Error>; | |
fn read_events( | |
&self, | |
id: &A::Id, | |
since_version: Option<i64>, | |
) -> Result<impl Iterator<Item = Result<Self::Event, Self::Error>>, Self::Error>; | |
} | |
trait CommandGateway<C: Command> { | |
type Error; | |
fn command(&self, cmd: C) -> Result<Option<C::Aggregate>, Self::Error>; | |
} | |
impl<Repo> Snapshotter<Repo> { | |
fn new(repo: Repo) -> Self { | |
Self { repo } | |
} | |
} | |
impl<Cmd, Repo> CommandGateway<Cmd> for Snapshotter<Repo> | |
where | |
Cmd: Command + Send + Sync + 'static, | |
Cmd::Aggregate: VersionedAggregate | |
+ EventMessageSourced<Repo::Event, EventMeta<<Cmd::Aggregate as Aggregate>::Id>> | |
+ Default | |
+ Send, | |
Repo: Repository<Cmd::Aggregate>, | |
{ | |
type Error = SnapshotterError<Repo::Error>; | |
fn command(&self, cmd: Cmd) -> Result<Option<Cmd::Aggregate>, Self::Error> { | |
let repo = self.repo.clone(); | |
let aggregate_id = cmd | |
.aggregate_id() | |
.ok_or(SnapshotterError::MissingAggregateId)?; | |
let mut agg = repo | |
.load(aggregate_id) | |
.map_err(SnapshotterError::AggregateLoading)? | |
.unwrap_or_else(Cmd::Aggregate::default); | |
let initial_version = agg.version(); | |
// Re-hydrate the aggregate state by applying new events from the event store. | |
let events = repo | |
.read_events(aggregate_id, Some(initial_version)) | |
.map_err(SnapshotterError::EventsReading)?; | |
for event_result in events { | |
let event_message = event_result.map_err(SnapshotterError::EventsReading)?; | |
agg.apply_event_message(&event_message); | |
} | |
if agg.version() > initial_version { | |
repo.store(&agg) | |
.map_err(SnapshotterError::AggregateStoring)?; | |
Ok(Some(agg)) | |
} else { | |
Ok(None) | |
} | |
} | |
} | |
impl Aggregate for UserAggregate { | |
type Id = UserId; | |
fn id(&self) -> &Self::Id { | |
&self.id | |
} | |
} | |
impl VersionedAggregate for UserAggregate { | |
fn version(&self) -> i64 { | |
self.version | |
} | |
} | |
impl AggregateEvent for UserEvent {} | |
impl EventMessageSourced<UserEvent, EventMeta<UserId>> for UserAggregate { | |
fn apply_event_message(&mut self, msg: &UserEvent) { | |
match msg { | |
UserEvent::NameUpdated(name) => { | |
self.name.clone_from(name); | |
} | |
} | |
self.version += 1; | |
} | |
} | |
impl Command for UpdateNameCommand { | |
type Aggregate = UserAggregate; | |
fn aggregate_id(&self) -> Option<&<Self::Aggregate as Aggregate>::Id> { | |
Some(&self.id) | |
} | |
} | |
impl MockRepo { | |
fn new() -> Self { | |
MockRepo { | |
snapshots: Arc::new(Mutex::new(HashMap::new())), | |
events: Arc::new(Mutex::new(HashMap::new())), | |
} | |
} | |
} | |
impl Repository<UserAggregate> for MockRepo { | |
type Error = String; | |
type Event = UserEvent; | |
fn load( | |
&self, | |
id: &<UserAggregate as Aggregate>::Id, | |
) -> Result<Option<UserAggregate>, Self::Error> { | |
let snapshots = self.snapshots.lock().unwrap(); | |
Ok(snapshots.get(id).cloned()) | |
} | |
fn store(&self, agg: &UserAggregate) -> Result<(), Self::Error> { | |
let mut snapshots = self.snapshots.lock().unwrap(); | |
snapshots.insert(agg.id().clone(), agg.clone()); | |
println!( | |
"--- Stored snapshot for user '{}', version: {}", | |
agg.id(), | |
agg.version() | |
); | |
Ok(()) | |
} | |
fn read_events( | |
&self, | |
id: &<UserAggregate as Aggregate>::Id, | |
since_version: Option<i64>, | |
) -> Result<impl Iterator<Item = Result<Self::Event, Self::Error>>, Self::Error> { | |
let events_db = self.events.lock().unwrap(); | |
let since_version = since_version.unwrap_or(0); | |
if let Some(all_events) = events_db.get(id) { | |
// This logic assumes `all_events` is the complete history. | |
// It correctly skips events already accounted for in the snapshot. | |
let new_events: Vec<Result<UserEvent, String>> = all_events | |
.iter() | |
.skip(usize::try_from(since_version).expect("cannot convert version to usize")) | |
.cloned() | |
.map(Ok) | |
.collect(); | |
println!( | |
"--- Reading {} new events for user '{}'", | |
new_events.len(), | |
id | |
); | |
Ok(new_events.into_iter()) | |
} else { | |
Ok(vec![].into_iter()) | |
} | |
} | |
} | |
fn main() { | |
let repo = MockRepo::new(); | |
let user_id = "user-123".to_string(); | |
{ | |
// A snapshot of the user at version 1. This state was reached after one event. | |
let initial_snapshot = UserAggregate { | |
id: user_id.clone(), | |
version: 1, | |
name: "Initial Name".to_string(), | |
}; | |
repo.snapshots | |
.lock() | |
.unwrap() | |
.insert(user_id.clone(), initial_snapshot); | |
// The event store should contain the FULL history of events. | |
// The snapshot at v1 corresponds to the state after the first event. | |
let all_events = vec![ | |
// Event that led to the v1 snapshot | |
UserEvent::NameUpdated("Initial Name".to_string()), | |
// Events that occurred after the snapshot | |
UserEvent::NameUpdated("Second Name".to_string()), // This will bring version to 2 | |
UserEvent::NameUpdated("Final Name".to_string()), // This will bring version to 3 | |
]; | |
repo.events | |
.lock() | |
.unwrap() | |
.insert(user_id.clone(), all_events); | |
} | |
let snapshotter = Snapshotter::<MockRepo>::new(repo.clone()); | |
let command = UpdateNameCommand { | |
id: user_id.clone(), | |
}; | |
println!("Dispatching command for user '{user_id}'..."); | |
let result = snapshotter.command(command); | |
match result { | |
Ok(Some(final_aggregate)) => { | |
println!("\n✅ Command successful!"); | |
println!("Final Aggregate State: {final_aggregate:?}"); | |
assert_eq!(final_aggregate.version(), 3); | |
assert_eq!(final_aggregate.name, "Final Name"); | |
} | |
Ok(None) => { | |
println!("\nℹ️ Command resulted in no changes."); | |
} | |
Err(e) => { | |
println!("\n❌ Command failed: {e:?}"); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment