Skip to content

Instantly share code, notes, and snippets.

@whiter4bbit
Last active August 29, 2015 14:04
Show Gist options
  • Save whiter4bbit/22cd3b0909bb390e80db to your computer and use it in GitHub Desktop.
Save whiter4bbit/22cd3b0909bb390e80db to your computer and use it in GitHub Desktop.
package persistence
import akka.actor._
import akka.persistence._
import akka.event.Logging
import com.typesafe.config.ConfigFactory
import AtLeastOnceDelivery.{UnconfirmedDelivery, UnconfirmedWarning, AtLeastOnceDeliverySnapshot}
object SimpleJournalActor {
case class Command(cmd: Any, id: Long)
case class Confirm(id: Long)
sealed trait Event
case class Append(cmd: Any) extends Event
case class Remove(id: Long) extends Event
}
class SimpleJournalActor(destination: ActorPath)
extends PersistentActor with AtLeastOnceDelivery {
import SimpleJournalActor._
val log = Logging(context.system, this)
val startedAt = System.currentTimeMillis
var compactToSeqNr: Option[Long] = None
def receiveCommand = {
case u: UnconfirmedWarning => log.warning("Unconfirmed warning: {}", u)
case Confirm(id) => persist(Remove(id))(updateState)
case SaveSnapshotSuccess(meta) => {
log.info("Snapshot stored: {}", meta)
compactToSeqNr.map(deleteMessages(_, true))
compactToSeqNr = None
}
case SaveSnapshotFailure(meta, cause) => log.error(cause, "Error while saving snapshot: {}", meta)
case cmd => persist(Append(cmd))(updateState)
}
case class Recovery(appends: Int, removes: Int)
var recovery = Recovery(0, 0)
def receiveRecover = {
case SnapshotOffer(_, snapshot: AtLeastOnceDeliverySnapshot) => {
setDeliverySnapshot(snapshot)
log.info("Snapshot offered.")
}
case event: Event => {
recovery = event match {
case _: Append => recovery.copy(appends = recovery.appends + 1)
case _: Remove => recovery.copy(removes = recovery.removes + 1)
}
updateState(event)
}
case RecoveryCompleted =>
log.info("Recovery completed in {}: {}", System.currentTimeMillis() - startedAt, recovery)
}
def compact(): Unit = if (numberOfUnconfirmed == 0) {
val nr = lastSequenceNr
log.info("No unconfirmed messages. Compacting journal. lastSequenceNr: {}.", nr)
compactToSeqNr = Some(nr)
saveSnapshot(getDeliverySnapshot)
}
def updateState(event: Event) = event match {
case Append(s) => deliver(destination, deliveryId => Command(s, deliveryId))
case Remove(id) => {
confirmDelivery(id)
if (!recoveryRunning) compact()
}
}
}
object WorkerActor {
case class Work(n: Long)
case class RejectFrom(n: Long)
}
class WorkerActor extends Actor {
import SimpleJournalActor._
import WorkerActor._
val log = Logging(context.system, this)
def confirm(command: Command): Unit = {
log.info("Confirmation for: {}", command.cmd)
sender ! Confirm(command.id)
}
def confirmAll: Receive = {
case command: Command => confirm(command)
case RejectFrom(n) => context.become(rejectFrom(n))
}
def rejectFrom(n: Long): Receive = {
case command @ Command(Work(workId), id) => if (workId.toLong < n) confirm(command)
}
def receive = confirmAll
}
object SimpleJournalActorTest {
import WorkerActor._
def main(args: Array[String]): Unit = {
if (args.length < 1) System.err.println("SimpleActorTest play|recover") else {
val system = ActorSystem()
val worker = system.actorOf(Props[WorkerActor])
val journal = system.actorOf(Props(classOf[SimpleJournalActor], worker.path))
args(0) match {
case "play" => {
(0 until 10).foreach(i => journal ! Work(i))
Thread.sleep(5000)
worker ! RejectFrom(15)
(10 until 20).foreach(i => journal ! Work(i))
}
case _ => println("recovery")
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment