Last active
August 29, 2015 14:03
-
-
Save whiter4bbit/fb0776ed562f019f649d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package persistence | |
import akka.actor._ | |
import akka.event.Logging | |
import akka.persistence._ | |
import AtLeastOnceDelivery.{UnconfirmedDelivery, UnconfirmedWarning} | |
import scala.annotation.tailrec | |
import scala.collection.immutable.ListMap | |
import java.util.UUID | |
object JournalActor { | |
case class Command(entry: Any, id: String) | |
case class Confirm(id: String) | |
sealed trait JournalEntry | |
case class Append(entry: Any, id: String) extends JournalEntry | |
case class Remove(id: String) extends JournalEntry | |
} | |
class JournalActor(executorRef: ActorRef) | |
extends PersistentActor with AtLeastOnceDelivery { | |
import JournalActor._ | |
private val log = Logging(context.system, this) | |
private var deliveryIds = Map.empty[String, Long] | |
private def execute(append: Append): Unit = { | |
deliver(executorRef.path, { deliveryId => | |
deliveryIds += (append.id -> deliveryId) | |
Command(append.entry, append.id) | |
}) | |
} | |
private def uuid = UUID.randomUUID().toString | |
def receiveCommand = { | |
case unconfirmed @ UnconfirmedDelivery(_, _, _) => | |
log.warning("Unconfirmed delivery: {}", unconfirmed) | |
case unconfirmed @ UnconfirmedWarning(_) => | |
log.warning("Unconfirmed warning: {}", unconfirmed) | |
case Confirm(id) => { | |
deliveryIds.get(id).map({ deliveryId => | |
deliveryIds -= id | |
confirmDelivery(deliveryId) | |
persist(Remove(id))(_ => ()) | |
}).getOrElse(log.warning("Confirmation for unexisting delivery id: {}", id)) | |
} | |
case cmd => persist(Append(cmd, uuid))(execute) | |
} | |
private var journalEntries = ListMap.empty[String, Append] | |
private def replayJournal(): Unit = { | |
val appends = journalEntries.values | |
log.info("Journal contains {} entries", appends.size) | |
log.info("Replaying {} entries: {}", appends.size, appends) | |
appends.foreach(execute) | |
} | |
def receiveRecover = { | |
case entry: JournalEntry => entry match { | |
case a @ Append(_, id) => journalEntries += (id -> a) | |
case Remove(id) => journalEntries -= id | |
} | |
case RecoveryCompleted => replayJournal() | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment