Skip to content

Instantly share code, notes, and snippets.

@whiter4bbit
Last active August 29, 2015 14:03
Show Gist options
  • Save whiter4bbit/fb0776ed562f019f649d to your computer and use it in GitHub Desktop.
Save whiter4bbit/fb0776ed562f019f649d to your computer and use it in GitHub Desktop.
package persistence
import akka.actor._
import akka.event.Logging
import akka.persistence._
import AtLeastOnceDelivery.{UnconfirmedDelivery, UnconfirmedWarning}
import scala.annotation.tailrec
import scala.collection.immutable.ListMap
import java.util.UUID
object JournalActor {
case class Command(entry: Any, id: String)
case class Confirm(id: String)
sealed trait JournalEntry
case class Append(entry: Any, id: String) extends JournalEntry
case class Remove(id: String) extends JournalEntry
}
class JournalActor(executorRef: ActorRef)
extends PersistentActor with AtLeastOnceDelivery {
import JournalActor._
private val log = Logging(context.system, this)
private var deliveryIds = Map.empty[String, Long]
private def execute(append: Append): Unit = {
deliver(executorRef.path, { deliveryId =>
deliveryIds += (append.id -> deliveryId)
Command(append.entry, append.id)
})
}
private def uuid = UUID.randomUUID().toString
def receiveCommand = {
case unconfirmed @ UnconfirmedDelivery(_, _, _) =>
log.warning("Unconfirmed delivery: {}", unconfirmed)
case unconfirmed @ UnconfirmedWarning(_) =>
log.warning("Unconfirmed warning: {}", unconfirmed)
case Confirm(id) => {
deliveryIds.get(id).map({ deliveryId =>
deliveryIds -= id
confirmDelivery(deliveryId)
persist(Remove(id))(_ => ())
}).getOrElse(log.warning("Confirmation for unexisting delivery id: {}", id))
}
case cmd => persist(Append(cmd, uuid))(execute)
}
private var journalEntries = ListMap.empty[String, Append]
private def replayJournal(): Unit = {
val appends = journalEntries.values
log.info("Journal contains {} entries", appends.size)
log.info("Replaying {} entries: {}", appends.size, appends)
appends.foreach(execute)
}
def receiveRecover = {
case entry: JournalEntry => entry match {
case a @ Append(_, id) => journalEntries += (id -> a)
case Remove(id) => journalEntries -= id
}
case RecoveryCompleted => replayJournal()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment