Skip to content

Instantly share code, notes, and snippets.

@okumin
Created September 28, 2014 08:55
Show Gist options
  • Save okumin/b591d2bd54470a067349 to your computer and use it in GitHub Desktop.
Save okumin/b591d2bd54470a067349 to your computer and use it in GitHub Desktop.
akka-persistenceのプラグインをつくろう

akka-persistenceのプラグインをつくろう

参考

Akka persistence is 何

  • Actorの内部状態をストレージへ保存するもの
  • 保存方法はJournalとSnapshotの二種類
    • Journalは更新イベントを1つずつ発生した順番に保存する
      • MySQLのバイナリログやRedisのAppend Only Fileようなイメージ
      • 更新イベントを最初から最後まで順番に適用していけば、元の状態が復元できる
    • Snapshotはその時点でのActorの状態を直接保存する
      • mysqldumpやRedisのRDB snapshotのようなイメージ
      • ある時点tまでのデータはSnapshotから、残りのデータはJournalから取り出すことでリカバリーに要する時間を削減することができる

// Actor内の情報を更新するイベント。
case class UpdateEvent(key: String, value: String)

// クライアントがActorに投げるメッセージ
case class Get(key: String)
case class Set(key: String, value: String)
case class Add(key: String, value: String)

// 状態
case class State(map: Map[String, String]) {
  def get(key: String): Option[String] = map.get(key)
  def updated(key: String, value: String): State = State(map.updated(key, value))
}

class KeyValueStoreActor extends PersistentActor {
  var state = State(Map.empty)
  // 1分に一度スナップショットを取得する。
  val interval = 1.minutes
  context.system.scheduler.schedule(interval, interval, self, "snapshot")(context.dispatcher)

  def updateState(event: UpdateEvent): Unit = state = state.updated(event.key, event.value)

  /**
   * PersistentActorの識別子。
   * JournalやSnapshot storeへ保存されるデータはこの識別子に紐付いている。
   */
  override def persistenceId: String = "key-value-store"

  /**
   * 永続化されたデータを取得すると、このメソッドが呼ばれる。
   * 基本的にStart, Restartしたタイミングで発動する。
   */
  override def receiveRecover: Receive = {
    case SnapshotOffer(_, snapshot: State) => state = snapshot // from Snapshot store
    case event: UpdateEvent => updateState(event) // from Journal
  }

  /**
   * このActorに対して送信された通常のメッセージを処理する。
   * 普通のActorの receive のようなもの。
   */
  override def receiveCommand: Receive = {
    case Get(key) => sender() ! state.get(key)
    case Set(key, value) => persist(UpdateEvent(key, value))(updateState)
    case Add(key, value) => // データがない場合だけ追加
      state.get(key) match {
        case Some(_) =>
        case None => persist(UpdateEvent(key, value))(updateState)
      }
    case "snapshot" => saveSnapshot(state)
  }
}

Akka persistence is pluggable

プラグインを作れば、好きなデータベースへJournalやSnapshotを保存することができる。デフォルトでは、JournalはLevelDBへ、Snapshotはファイルへ書きだされる。

つくった

akka-persistence-sql-async

  • すでにRDBMS用のプラグインは存在していた
  • でもakka-persistenceプラグインの作成APIには同期版と非同期版があるので、せっかくだし非同期版を作ってみた
  • ScalikeJDBC-Async
  • みんなも作るとよいです

オススメ理由

  1. 未来は明るい
  2. 簡単に作れる
  3. テスト書かなくてよい

未来は明るい

What is Akka?に非常に重要そうな特徴と並べて紹介されている。

  • Actors
  • Fault Tolerance
  • Location Transparency
  • Persistence

ただしexperimentalという位置づけなので、使用する場合はAPIの変更へ追従する覚悟が必要。

簡単に作れる

以下のメソッドを実装するだけ。

Journal

ブロックするクライアントを使用する場合はAsyncWriteJournalの代わりにSyncWriteJournalをミックスインする。

class MySQLAsyncWriteJournal extends AsyncWriteJournal {
  // PersistentReprを追記する。
  override def asyncWriteMessages(messages: Seq[PersistentRepr]): Future[Unit] = ???

  // 指定したpersistenceIdとsequenceNumberの組を持つPersistentReprを確認済み状態にする。
  @scala.deprecated("writeConfirmations will be removed, since Channels will be removed.")
  override def asyncWriteConfirmations(confirmations: Seq[PersistentConfirmation]): Future[Unit] = ???

  // 指定したpersistenceIdとsequenceNumberの組を持つPersistentReprを削除する。
  @scala.deprecated("asyncDeleteMessages will be removed.")
  override def asyncDeleteMessages(messageIds: Seq[PersistentId], permanent: Boolean): Future[Unit] = ???

  // 指定したpersistence idを持つPersistentReprのうち、sequence numberがtoSequenceNr以下のものをすべて削除する。
  override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] = ???

  // 指定したpersistence idに属するPersistentReprをすべて取得し、replayCallbackを適用する。
  override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) => Unit): Future[Unit] = ???

  // 最も大きなsequence numberを返す。
  override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = ???
}

Snapshot

class MySQLSnapshotStore extends SnapshotStore {
  // スナップショットを取得する。
  // criteriaには、例えば「sequence number 1000以下のスナップショット」等の条件が格納されている。
  override def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = ???

  // スナップショットを保存する。
  // metadataにはpersistence idやsequence number等が格納されている。
  override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = ???

  // saveAsyncが成功した場合に呼ばれる後処理。
  override def saved(metadata: SnapshotMetadata): Unit = ???

  // 指定したスナップショットを削除する。
  override def delete(metadata: SnapshotMetadata): Unit = ???

  // 指定した条件に合致するスナップショットを削除する。
  // sequence number 1000以下のスナップショットを削除する、など。
  override def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): Unit = ???
}

あとはapplication.confに設定を書けばプラグインが利用できる。

akka.persistence.journal.plugin = "mysql-journal"
akka.persistence.snapshot-store.plugin = "mysql-snapshot-store"

mysql-journal {
  class = "docs.persistence.MySQLAsyncWriteJournal"
}
mysql-snapshot-store {
  class = "com.okumin.MySQLSnapshotStore"
}

データベース接続部分は既存のライブラリがきっと使えるはず。

テスト書かなくてよい

akka-persistence-tckという、実装が仕様に準拠しているか確かめるためのツールがある。トレイトをミックスインするだけで数多ものテストケースが勝手に追加される。

テストコード

class MySQLJournalSpec extends JournalSpec {
  override lazy val config: Config = ConfigFactory.load("mysql-application.conf")
  
  protected override def beforeAll(): Unit = {
    // DELETE FROM journal 的な処理
  }
}
class MySQLSnapshotStoreSpec extends SnapshotStoreSpec {
  override lazy val config: Config = ConfigFactory.load("mysql-application.conf")
  
  protected override def beforeAll(): Unit = {
    // DELETE FROM snapshot 的な処理
  }
}

テスト結果

[info] MySQLJournalSpec:
[info] A journal
[info] - must replay all messages
[info] - must replay messages using a lower sequence number bound
[info] - must replay messages using an upper sequence number bound
[info] - must replay messages using a count limit
[info] - must replay messages using a lower and upper sequence number bound
[info] - must replay messages using a lower and upper sequence number bound and a count limit
[info] - must replay a single if lower sequence number bound equals upper sequence number bound
[info] - must replay a single message if count limit equals 1
[info] - must not replay messages if count limit equals 0
[info] - must not replay messages if lower  sequence number bound is greater than upper sequence number bound
[info] - must not replay permanently deleted messages (range deletion)
[info] - must replay logically deleted messages with deleted field set to true (range deletion)
[info] - must replay confirmed messages with corresponding channel ids contained in the confirmed field
[info] - must ignore orphan deletion markers
[info] - must ignore orphan confirmation markers
[info] - must return a highest stored sequence number > 0 if the persistent actor has already written messages and the message log is non-empty
[info] - must return a highest stored sequence number == 0 if the persistent actor has not yet written messages
[info] MySQLSnapshotStoreSpec:
[info] A snapshot store
[info] - must not load a snapshot given an invalid processor id
[info] - must not load a snapshot given non-matching timestamp criteria
[info] - must not load a snapshot given non-matching sequence number criteria
[info] - must load the most recent snapshot
[info] - must load the most recent snapshot matching an upper sequence number bound
[info] - must load the most recent snapshot matching upper sequence number and timestamp bounds
[info] - must delete a single snapshot identified by snapshot metadata
[info] - must delete all snapshots matching upper sequence number and timestamp bounds

さらに、JournalPerfSpecをミックスインすると簡単なパフォーマンステストまで実行してくれる。

[info] A PersistentActor's performance
[info] - must measure: persistAsync()-ing 10000 events
[info]   + PersistAsync()-ing 10000 took 6046 ms
[info]   + PersistAsync()-ing 10000 took 3257 ms
[info]   + PersistAsync()-ing 10000 took 2164 ms
[info]   + PersistAsync()-ing 10000 took 1990 ms
[info]   + PersistAsync()-ing 10000 took 1945 ms
[info]   + PersistAsync()-ing 10000 took 1773 ms
[info]   + PersistAsync()-ing 10000 took 1588 ms
[info]   + PersistAsync()-ing 10000 took 1274 ms
[info]   + PersistAsync()-ing 10000 took 1085 ms
[info]   + PersistAsync()-ing 10000 took 1094 ms
[info]   + Average time: 2222 ms
[info] - must measure: persist()-ing 10000 events
[info]   + Persist()-ing 10000 took 10563 ms
[info]   + Persist()-ing 10000 took 8412 ms
[info]   + Persist()-ing 10000 took 8492 ms
[info]   + Persist()-ing 10000 took 8301 ms
[info]   + Persist()-ing 10000 took 8211 ms
[info]   + Persist()-ing 10000 took 8298 ms
[info]   + Persist()-ing 10000 took 8223 ms
[info]   + Persist()-ing 10000 took 8203 ms
[info]   + Persist()-ing 10000 took 8316 ms
[info]   + Persist()-ing 10000 took 8230 ms
[info]   + Average time: 8525 ms
[info] - must measure: recovering 10000 events
[info]   + Recovering 10000 took 1500 ms
[info]   + Recovering 10000 took 602 ms
[info]   + Recovering 10000 took 383 ms
[info]   + Recovering 10000 took 340 ms
[info]   + Recovering 10000 took 325 ms
[info]   + Recovering 10000 took 346 ms
[info]   + Recovering 10000 took 331 ms
[info]   + Recovering 10000 took 311 ms
[info]   + Recovering 10000 took 317 ms
[info]   + Recovering 10000 took 328 ms
[info]   + Average time: 478 ms

まとめ

OSS作って一山当てましょう。

Happy hAkking!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment