- 僕はおくみん
- @okumin
- Actorの内部状態をストレージへ保存するもの
- 保存方法はJournalとSnapshotの二種類
- Journalは更新イベントを1つずつ発生した順番に保存する
- MySQLのバイナリログやRedisのAppend Only Fileようなイメージ
- 更新イベントを最初から最後まで順番に適用していけば、元の状態が復元できる
- Snapshotはその時点でのActorの状態を直接保存する
- mysqldumpやRedisのRDB snapshotのようなイメージ
- ある時点tまでのデータはSnapshotから、残りのデータはJournalから取り出すことでリカバリーに要する時間を削減することができる
- Journalは更新イベントを1つずつ発生した順番に保存する
// Actor内の情報を更新するイベント。
case class UpdateEvent(key: String, value: String)
// クライアントがActorに投げるメッセージ
case class Get(key: String)
case class Set(key: String, value: String)
case class Add(key: String, value: String)
// 状態
case class State(map: Map[String, String]) {
def get(key: String): Option[String] = map.get(key)
def updated(key: String, value: String): State = State(map.updated(key, value))
}
class KeyValueStoreActor extends PersistentActor {
var state = State(Map.empty)
// 1分に一度スナップショットを取得する。
val interval = 1.minutes
context.system.scheduler.schedule(interval, interval, self, "snapshot")(context.dispatcher)
def updateState(event: UpdateEvent): Unit = state = state.updated(event.key, event.value)
/**
* PersistentActorの識別子。
* JournalやSnapshot storeへ保存されるデータはこの識別子に紐付いている。
*/
override def persistenceId: String = "key-value-store"
/**
* 永続化されたデータを取得すると、このメソッドが呼ばれる。
* 基本的にStart, Restartしたタイミングで発動する。
*/
override def receiveRecover: Receive = {
case SnapshotOffer(_, snapshot: State) => state = snapshot // from Snapshot store
case event: UpdateEvent => updateState(event) // from Journal
}
/**
* このActorに対して送信された通常のメッセージを処理する。
* 普通のActorの receive のようなもの。
*/
override def receiveCommand: Receive = {
case Get(key) => sender() ! state.get(key)
case Set(key, value) => persist(UpdateEvent(key, value))(updateState)
case Add(key, value) => // データがない場合だけ追加
state.get(key) match {
case Some(_) =>
case None => persist(UpdateEvent(key, value))(updateState)
}
case "snapshot" => saveSnapshot(state)
}
}
プラグインを作れば、好きなデータベースへJournalやSnapshotを保存することができる。デフォルトでは、JournalはLevelDBへ、Snapshotはファイルへ書きだされる。
- すでにRDBMS用のプラグインは存在していた
- でもakka-persistenceプラグインの作成APIには同期版と非同期版があるので、せっかくだし非同期版を作ってみた
- ScalikeJDBC-Asyncで
- みんなも作るとよいです
- 未来は明るい
- 簡単に作れる
- テスト書かなくてよい
What is Akka?に非常に重要そうな特徴と並べて紹介されている。
- Actors
- Fault Tolerance
- Location Transparency
- Persistence
ただしexperimentalという位置づけなので、使用する場合はAPIの変更へ追従する覚悟が必要。
以下のメソッドを実装するだけ。
ブロックするクライアントを使用する場合はAsyncWriteJournal
の代わりにSyncWriteJournal
をミックスインする。
class MySQLAsyncWriteJournal extends AsyncWriteJournal {
// PersistentReprを追記する。
override def asyncWriteMessages(messages: Seq[PersistentRepr]): Future[Unit] = ???
// 指定したpersistenceIdとsequenceNumberの組を持つPersistentReprを確認済み状態にする。
@scala.deprecated("writeConfirmations will be removed, since Channels will be removed.")
override def asyncWriteConfirmations(confirmations: Seq[PersistentConfirmation]): Future[Unit] = ???
// 指定したpersistenceIdとsequenceNumberの組を持つPersistentReprを削除する。
@scala.deprecated("asyncDeleteMessages will be removed.")
override def asyncDeleteMessages(messageIds: Seq[PersistentId], permanent: Boolean): Future[Unit] = ???
// 指定したpersistence idを持つPersistentReprのうち、sequence numberがtoSequenceNr以下のものをすべて削除する。
override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] = ???
// 指定したpersistence idに属するPersistentReprをすべて取得し、replayCallbackを適用する。
override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) => Unit): Future[Unit] = ???
// 最も大きなsequence numberを返す。
override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = ???
}
class MySQLSnapshotStore extends SnapshotStore {
// スナップショットを取得する。
// criteriaには、例えば「sequence number 1000以下のスナップショット」等の条件が格納されている。
override def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = ???
// スナップショットを保存する。
// metadataにはpersistence idやsequence number等が格納されている。
override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = ???
// saveAsyncが成功した場合に呼ばれる後処理。
override def saved(metadata: SnapshotMetadata): Unit = ???
// 指定したスナップショットを削除する。
override def delete(metadata: SnapshotMetadata): Unit = ???
// 指定した条件に合致するスナップショットを削除する。
// sequence number 1000以下のスナップショットを削除する、など。
override def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): Unit = ???
}
あとはapplication.conf
に設定を書けばプラグインが利用できる。
akka.persistence.journal.plugin = "mysql-journal"
akka.persistence.snapshot-store.plugin = "mysql-snapshot-store"
mysql-journal {
class = "docs.persistence.MySQLAsyncWriteJournal"
}
mysql-snapshot-store {
class = "com.okumin.MySQLSnapshotStore"
}
データベース接続部分は既存のライブラリがきっと使えるはず。
akka-persistence-tckという、実装が仕様に準拠しているか確かめるためのツールがある。トレイトをミックスインするだけで数多ものテストケースが勝手に追加される。
class MySQLJournalSpec extends JournalSpec {
override lazy val config: Config = ConfigFactory.load("mysql-application.conf")
protected override def beforeAll(): Unit = {
// DELETE FROM journal 的な処理
}
}
class MySQLSnapshotStoreSpec extends SnapshotStoreSpec {
override lazy val config: Config = ConfigFactory.load("mysql-application.conf")
protected override def beforeAll(): Unit = {
// DELETE FROM snapshot 的な処理
}
}
[info] MySQLJournalSpec:
[info] A journal
[info] - must replay all messages
[info] - must replay messages using a lower sequence number bound
[info] - must replay messages using an upper sequence number bound
[info] - must replay messages using a count limit
[info] - must replay messages using a lower and upper sequence number bound
[info] - must replay messages using a lower and upper sequence number bound and a count limit
[info] - must replay a single if lower sequence number bound equals upper sequence number bound
[info] - must replay a single message if count limit equals 1
[info] - must not replay messages if count limit equals 0
[info] - must not replay messages if lower sequence number bound is greater than upper sequence number bound
[info] - must not replay permanently deleted messages (range deletion)
[info] - must replay logically deleted messages with deleted field set to true (range deletion)
[info] - must replay confirmed messages with corresponding channel ids contained in the confirmed field
[info] - must ignore orphan deletion markers
[info] - must ignore orphan confirmation markers
[info] - must return a highest stored sequence number > 0 if the persistent actor has already written messages and the message log is non-empty
[info] - must return a highest stored sequence number == 0 if the persistent actor has not yet written messages
[info] MySQLSnapshotStoreSpec:
[info] A snapshot store
[info] - must not load a snapshot given an invalid processor id
[info] - must not load a snapshot given non-matching timestamp criteria
[info] - must not load a snapshot given non-matching sequence number criteria
[info] - must load the most recent snapshot
[info] - must load the most recent snapshot matching an upper sequence number bound
[info] - must load the most recent snapshot matching upper sequence number and timestamp bounds
[info] - must delete a single snapshot identified by snapshot metadata
[info] - must delete all snapshots matching upper sequence number and timestamp bounds
さらに、JournalPerfSpec
をミックスインすると簡単なパフォーマンステストまで実行してくれる。
[info] A PersistentActor's performance
[info] - must measure: persistAsync()-ing 10000 events
[info] + PersistAsync()-ing 10000 took 6046 ms
[info] + PersistAsync()-ing 10000 took 3257 ms
[info] + PersistAsync()-ing 10000 took 2164 ms
[info] + PersistAsync()-ing 10000 took 1990 ms
[info] + PersistAsync()-ing 10000 took 1945 ms
[info] + PersistAsync()-ing 10000 took 1773 ms
[info] + PersistAsync()-ing 10000 took 1588 ms
[info] + PersistAsync()-ing 10000 took 1274 ms
[info] + PersistAsync()-ing 10000 took 1085 ms
[info] + PersistAsync()-ing 10000 took 1094 ms
[info] + Average time: 2222 ms
[info] - must measure: persist()-ing 10000 events
[info] + Persist()-ing 10000 took 10563 ms
[info] + Persist()-ing 10000 took 8412 ms
[info] + Persist()-ing 10000 took 8492 ms
[info] + Persist()-ing 10000 took 8301 ms
[info] + Persist()-ing 10000 took 8211 ms
[info] + Persist()-ing 10000 took 8298 ms
[info] + Persist()-ing 10000 took 8223 ms
[info] + Persist()-ing 10000 took 8203 ms
[info] + Persist()-ing 10000 took 8316 ms
[info] + Persist()-ing 10000 took 8230 ms
[info] + Average time: 8525 ms
[info] - must measure: recovering 10000 events
[info] + Recovering 10000 took 1500 ms
[info] + Recovering 10000 took 602 ms
[info] + Recovering 10000 took 383 ms
[info] + Recovering 10000 took 340 ms
[info] + Recovering 10000 took 325 ms
[info] + Recovering 10000 took 346 ms
[info] + Recovering 10000 took 331 ms
[info] + Recovering 10000 took 311 ms
[info] + Recovering 10000 took 317 ms
[info] + Recovering 10000 took 328 ms
[info] + Average time: 478 ms
OSS作って一山当てましょう。
Happy hAkking!