Last active
June 24, 2023 14:00
-
-
Save j5ik2o/f6ab15320b6f54fed90f9afdef1474c4 to your computer and use it in GitHub Desktop.
RustでのステートレスなEvent Sourcingの実装
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// ここでいうステートレスというのはアプリケーションに状態があるのではなく、DBに状態があることを意味しています。 | |
// 逆にAkka Clusterで実装されるEvent Sourcingシステムでは、アプリケーションの状態はアプリケーションが保持しており、 | |
// DBはバックアップログを持つストレージの役割になります。 | |
pub struct EventPersistenceGateway<'a> { | |
journal_table_name: String, | |
snapshot_table_name: String, | |
client: &'a Client, | |
} | |
impl<'a> EventPersistenceGateway<'a> { | |
pub fn new(journal_table_name: String, snapshot_table_name: String, client: &'a Client) -> Self { | |
Self { | |
journal_table_name, | |
snapshot_table_name, | |
client, | |
} | |
} | |
pub async fn get_snapshot_by_id<T>(&self, aid: String) -> Result<(T, usize)> | |
where | |
T: for<'de> de::Deserialize<'de>, { | |
let response = self | |
.client | |
.get_item() | |
.table_name(self.snapshot_table_name.clone()) | |
.key("pkey", AttributeValue::S(aid)) | |
.send() | |
.await?; | |
let item = response.item().unwrap(); | |
let payload = item.get("payload").unwrap().as_s().unwrap(); | |
let aggregate = serde_json::from_str::<T>(payload).unwrap(); | |
let seq_nr = item.get("seq_nr").unwrap().as_n().unwrap(); | |
Ok((aggregate, seq_nr.parse::<usize>().unwrap())) | |
} | |
pub async fn get_events_by_id_and_seq_nr<T>(&self, aid: String, seq_nr: usize) -> Result<Vec<T>> | |
where | |
T: for<'de> de::Deserialize<'de>, { | |
let response = self | |
.client | |
.query() | |
.table_name(self.journal_table_name.clone()) | |
.index_name("aid_index") | |
.key_condition_expression("#aid = :aid AND #skey >= :skey") | |
.expression_attribute_names("#aid", "aid") | |
.expression_attribute_values(":aid", AttributeValue::S(aid)) | |
.expression_attribute_names("#skey", "skey") | |
.expression_attribute_values(":skey", AttributeValue::N(seq_nr.to_string())) | |
.send() | |
.await?; | |
let mut events = Vec::new(); | |
if let Some(items) = response.items { | |
for item in items { | |
let payload = item.get("payload").unwrap(); | |
let str = payload.as_s().unwrap(); | |
let event = serde_json::from_str::<T>(str).unwrap(); | |
events.push(event); | |
} | |
} | |
Ok(events) | |
} | |
pub async fn store_event_with_snapshot_opt<A, E>( | |
&mut self, | |
event: &E, | |
version: usize, | |
aggregate: Option<&A>, | |
) -> Result<()> | |
where | |
A: ?Sized + Serialize + Aggregate, | |
E: ?Sized + Serialize + Event, { | |
match (event.is_created(), aggregate) { | |
(true, Some(ar)) => { | |
let aggregate_id = AttributeValue::S(event.aggregate_id().to_string()); | |
let put_snapshot = Put::builder() | |
.table_name(self.snapshot_table_name.clone()) | |
.item("pkey", aggregate_id) | |
.item("payload", AttributeValue::S(serde_json::to_string(ar)?)) | |
.item("seq_nr", AttributeValue::N(event.seq_nr().to_string())) | |
.item("version", AttributeValue::N("1".to_string())) | |
.condition_expression("attribute_not_exists(pkey)") | |
.build(); | |
let put_journal = Put::builder() | |
.table_name(self.journal_table_name.clone()) | |
.item("pkey", AttributeValue::S(event.id().to_string())) | |
.item("skey", AttributeValue::N(event.seq_nr().to_string())) | |
.item("aid", AttributeValue::S(event.aggregate_id().to_string())) | |
.item("payload", AttributeValue::S(serde_json::to_string(event)?)) | |
.item( | |
"occurred_at", | |
AttributeValue::N(event.occurred_at().timestamp_millis().to_string()), | |
) | |
.build(); | |
let twi1 = TransactWriteItem::builder().put(put_snapshot).build(); | |
let twi2 = TransactWriteItem::builder().put(put_journal).build(); | |
let _ = self | |
.client | |
.transact_write_items() | |
.set_transact_items(Some(vec![twi1, twi2])) | |
.send() | |
.await?; | |
} | |
(true, None) => { | |
panic!("Aggregate is not found"); | |
} | |
(false, Some(ar)) => { | |
let aggregate_id = AttributeValue::S(event.aggregate_id().to_string()); | |
let update_snapshot = Update::builder() | |
.table_name(self.snapshot_table_name.clone()) | |
.update_expression("SET #payload=:payload, #seq_nr=:seq_nr, #version=:after_version") | |
.key("pkey", aggregate_id) | |
.expression_attribute_names("#payload", "payload") | |
.expression_attribute_names("#seq_nr", "seq_nr") | |
.expression_attribute_names("#version", "version") | |
.expression_attribute_values(":payload", AttributeValue::S(serde_json::to_string(ar)?)) | |
.expression_attribute_values(":seq_nr", AttributeValue::N(event.seq_nr().to_string())) | |
.expression_attribute_values(":before_version", AttributeValue::N(version.to_string())) | |
.expression_attribute_values(":after_version", AttributeValue::N((version + 1).to_string())) | |
.condition_expression("#version=:before_version") | |
.build(); | |
let put_journal = Put::builder() | |
.table_name(self.journal_table_name.clone()) | |
.item("pkey", AttributeValue::S(event.id().to_string())) | |
.item("skey", AttributeValue::N(event.seq_nr().to_string())) | |
.item("aid", AttributeValue::S(event.aggregate_id().to_string())) | |
.item("payload", AttributeValue::S(serde_json::to_string(event)?)) | |
.item( | |
"occurred_at", | |
AttributeValue::N(event.occurred_at().timestamp_millis().to_string()), | |
) | |
.build(); | |
let twi1 = TransactWriteItem::builder().update(update_snapshot).build(); | |
let twi2 = TransactWriteItem::builder().put(put_journal).build(); | |
let _ = self | |
.client | |
.transact_write_items() | |
.set_transact_items(Some(vec![twi1, twi2])) | |
.send() | |
.await?; | |
} | |
(false, None) => { | |
let aggregate_id = AttributeValue::S(event.aggregate_id().to_string()); | |
let update_snapshot = Update::builder() | |
.table_name(self.snapshot_table_name.clone()) | |
.update_expression("SET #seq_nr=:seq_nr, #version=:after_version") | |
.key("pkey", aggregate_id) | |
.expression_attribute_names("#seq_nr", "seq_nr") | |
.expression_attribute_names("#version", "version") | |
.expression_attribute_values(":seq_nr", AttributeValue::N(event.seq_nr().to_string())) | |
.expression_attribute_values(":before_version", AttributeValue::N(version.to_string())) | |
.expression_attribute_values(":after_version", AttributeValue::N((version + 1).to_string())) | |
.condition_expression("#version=:before_version") | |
.build(); | |
let put_journal = Put::builder() | |
.table_name(self.journal_table_name.clone()) | |
.item("pkey", AttributeValue::S(event.id().to_string())) | |
.item("skey", AttributeValue::N(event.seq_nr().to_string())) | |
.item("aid", AttributeValue::S(event.aggregate_id().to_string())) | |
.item("payload", AttributeValue::S(serde_json::to_string(event)?)) | |
.item( | |
"occurred_at", | |
AttributeValue::N(event.occurred_at().timestamp_millis().to_string()), | |
) | |
.build(); | |
let twi1 = TransactWriteItem::builder().update(update_snapshot).build(); | |
let twi2 = TransactWriteItem::builder().put(put_journal).build(); | |
let _ = self | |
.client | |
.transact_write_items() | |
.set_transact_items(Some(vec![twi1, twi2])) | |
.send() | |
.await?; | |
} | |
} | |
Ok(()) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// リポジトリではイベントとスナップショットを保存できる。 | |
pub struct ThreadRepository<'a> { | |
event_persistence_gateway: EventPersistenceGateway<'a>, | |
} | |
impl<'a> ThreadRepository<'a> { | |
pub fn new(gateway: EventPersistenceGateway<'a>) -> Self { | |
Self { | |
event_persistence_gateway: gateway, | |
} | |
} | |
pub async fn store(&mut self, event: &ThreadEvent, version: usize, snapshot: Option<&Thread>) -> Result<()> { | |
self | |
.event_persistence_gateway | |
.store_event_with_snapshot_opt(event, version, snapshot) | |
.await | |
} | |
pub async fn find_by_id(&self, id: &ThreadID) -> Result<Thread> { | |
let (mut snapshot, seq_nr) = self | |
.event_persistence_gateway | |
.get_snapshot_by_id::<Thread>(id.to_string()) | |
.await?; | |
let events = self | |
.event_persistence_gateway | |
.get_events_by_id_and_seq_nr::<ThreadEvent>(id.to_string(), seq_nr) | |
.await?; | |
let result = Thread::replay(events, Some(snapshot)); | |
Ok(result) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment