Skip to content

Instantly share code, notes, and snippets.

@j5ik2o
Last active June 24, 2023 14:00
Show Gist options
  • Save j5ik2o/f6ab15320b6f54fed90f9afdef1474c4 to your computer and use it in GitHub Desktop.
Save j5ik2o/f6ab15320b6f54fed90f9afdef1474c4 to your computer and use it in GitHub Desktop.
RustでのステートレスなEvent Sourcingの実装
// ここでいうステートレスというのはアプリケーションに状態があるのではなく、DBに状態があることを意味しています。
// 逆にAkka Clusterで実装されるEvent Sourcingシステムでは、アプリケーションの状態はアプリケーションが保持しており、
// DBはバックアップログを持つストレージの役割になります。
pub struct EventPersistenceGateway<'a> {
journal_table_name: String,
snapshot_table_name: String,
client: &'a Client,
}
impl<'a> EventPersistenceGateway<'a> {
pub fn new(journal_table_name: String, snapshot_table_name: String, client: &'a Client) -> Self {
Self {
journal_table_name,
snapshot_table_name,
client,
}
}
pub async fn get_snapshot_by_id<T>(&self, aid: String) -> Result<(T, usize)>
where
T: for<'de> de::Deserialize<'de>, {
let response = self
.client
.get_item()
.table_name(self.snapshot_table_name.clone())
.key("pkey", AttributeValue::S(aid))
.send()
.await?;
let item = response.item().unwrap();
let payload = item.get("payload").unwrap().as_s().unwrap();
let aggregate = serde_json::from_str::<T>(payload).unwrap();
let seq_nr = item.get("seq_nr").unwrap().as_n().unwrap();
Ok((aggregate, seq_nr.parse::<usize>().unwrap()))
}
pub async fn get_events_by_id_and_seq_nr<T>(&self, aid: String, seq_nr: usize) -> Result<Vec<T>>
where
T: for<'de> de::Deserialize<'de>, {
let response = self
.client
.query()
.table_name(self.journal_table_name.clone())
.index_name("aid_index")
.key_condition_expression("#aid = :aid AND #skey >= :skey")
.expression_attribute_names("#aid", "aid")
.expression_attribute_values(":aid", AttributeValue::S(aid))
.expression_attribute_names("#skey", "skey")
.expression_attribute_values(":skey", AttributeValue::N(seq_nr.to_string()))
.send()
.await?;
let mut events = Vec::new();
if let Some(items) = response.items {
for item in items {
let payload = item.get("payload").unwrap();
let str = payload.as_s().unwrap();
let event = serde_json::from_str::<T>(str).unwrap();
events.push(event);
}
}
Ok(events)
}
pub async fn store_event_with_snapshot_opt<A, E>(
&mut self,
event: &E,
version: usize,
aggregate: Option<&A>,
) -> Result<()>
where
A: ?Sized + Serialize + Aggregate,
E: ?Sized + Serialize + Event, {
match (event.is_created(), aggregate) {
(true, Some(ar)) => {
let aggregate_id = AttributeValue::S(event.aggregate_id().to_string());
let put_snapshot = Put::builder()
.table_name(self.snapshot_table_name.clone())
.item("pkey", aggregate_id)
.item("payload", AttributeValue::S(serde_json::to_string(ar)?))
.item("seq_nr", AttributeValue::N(event.seq_nr().to_string()))
.item("version", AttributeValue::N("1".to_string()))
.condition_expression("attribute_not_exists(pkey)")
.build();
let put_journal = Put::builder()
.table_name(self.journal_table_name.clone())
.item("pkey", AttributeValue::S(event.id().to_string()))
.item("skey", AttributeValue::N(event.seq_nr().to_string()))
.item("aid", AttributeValue::S(event.aggregate_id().to_string()))
.item("payload", AttributeValue::S(serde_json::to_string(event)?))
.item(
"occurred_at",
AttributeValue::N(event.occurred_at().timestamp_millis().to_string()),
)
.build();
let twi1 = TransactWriteItem::builder().put(put_snapshot).build();
let twi2 = TransactWriteItem::builder().put(put_journal).build();
let _ = self
.client
.transact_write_items()
.set_transact_items(Some(vec![twi1, twi2]))
.send()
.await?;
}
(true, None) => {
panic!("Aggregate is not found");
}
(false, Some(ar)) => {
let aggregate_id = AttributeValue::S(event.aggregate_id().to_string());
let update_snapshot = Update::builder()
.table_name(self.snapshot_table_name.clone())
.update_expression("SET #payload=:payload, #seq_nr=:seq_nr, #version=:after_version")
.key("pkey", aggregate_id)
.expression_attribute_names("#payload", "payload")
.expression_attribute_names("#seq_nr", "seq_nr")
.expression_attribute_names("#version", "version")
.expression_attribute_values(":payload", AttributeValue::S(serde_json::to_string(ar)?))
.expression_attribute_values(":seq_nr", AttributeValue::N(event.seq_nr().to_string()))
.expression_attribute_values(":before_version", AttributeValue::N(version.to_string()))
.expression_attribute_values(":after_version", AttributeValue::N((version + 1).to_string()))
.condition_expression("#version=:before_version")
.build();
let put_journal = Put::builder()
.table_name(self.journal_table_name.clone())
.item("pkey", AttributeValue::S(event.id().to_string()))
.item("skey", AttributeValue::N(event.seq_nr().to_string()))
.item("aid", AttributeValue::S(event.aggregate_id().to_string()))
.item("payload", AttributeValue::S(serde_json::to_string(event)?))
.item(
"occurred_at",
AttributeValue::N(event.occurred_at().timestamp_millis().to_string()),
)
.build();
let twi1 = TransactWriteItem::builder().update(update_snapshot).build();
let twi2 = TransactWriteItem::builder().put(put_journal).build();
let _ = self
.client
.transact_write_items()
.set_transact_items(Some(vec![twi1, twi2]))
.send()
.await?;
}
(false, None) => {
let aggregate_id = AttributeValue::S(event.aggregate_id().to_string());
let update_snapshot = Update::builder()
.table_name(self.snapshot_table_name.clone())
.update_expression("SET #seq_nr=:seq_nr, #version=:after_version")
.key("pkey", aggregate_id)
.expression_attribute_names("#seq_nr", "seq_nr")
.expression_attribute_names("#version", "version")
.expression_attribute_values(":seq_nr", AttributeValue::N(event.seq_nr().to_string()))
.expression_attribute_values(":before_version", AttributeValue::N(version.to_string()))
.expression_attribute_values(":after_version", AttributeValue::N((version + 1).to_string()))
.condition_expression("#version=:before_version")
.build();
let put_journal = Put::builder()
.table_name(self.journal_table_name.clone())
.item("pkey", AttributeValue::S(event.id().to_string()))
.item("skey", AttributeValue::N(event.seq_nr().to_string()))
.item("aid", AttributeValue::S(event.aggregate_id().to_string()))
.item("payload", AttributeValue::S(serde_json::to_string(event)?))
.item(
"occurred_at",
AttributeValue::N(event.occurred_at().timestamp_millis().to_string()),
)
.build();
let twi1 = TransactWriteItem::builder().update(update_snapshot).build();
let twi2 = TransactWriteItem::builder().put(put_journal).build();
let _ = self
.client
.transact_write_items()
.set_transact_items(Some(vec![twi1, twi2]))
.send()
.await?;
}
}
Ok(())
}
}
// リポジトリではイベントとスナップショットを保存できる。
pub struct ThreadRepository<'a> {
event_persistence_gateway: EventPersistenceGateway<'a>,
}
impl<'a> ThreadRepository<'a> {
pub fn new(gateway: EventPersistenceGateway<'a>) -> Self {
Self {
event_persistence_gateway: gateway,
}
}
pub async fn store(&mut self, event: &ThreadEvent, version: usize, snapshot: Option<&Thread>) -> Result<()> {
self
.event_persistence_gateway
.store_event_with_snapshot_opt(event, version, snapshot)
.await
}
pub async fn find_by_id(&self, id: &ThreadID) -> Result<Thread> {
let (mut snapshot, seq_nr) = self
.event_persistence_gateway
.get_snapshot_by_id::<Thread>(id.to_string())
.await?;
let events = self
.event_persistence_gateway
.get_events_by_id_and_seq_nr::<ThreadEvent>(id.to_string(), seq_nr)
.await?;
let result = Thread::replay(events, Some(snapshot));
Ok(result)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment