Last active
August 29, 2015 14:04
-
-
Save whiter4bbit/22cd3b0909bb390e80db to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package persistence | |
import akka.actor._ | |
import akka.persistence._ | |
import akka.event.Logging | |
import com.typesafe.config.ConfigFactory | |
import AtLeastOnceDelivery.{UnconfirmedDelivery, UnconfirmedWarning, AtLeastOnceDeliverySnapshot} | |
object SimpleJournalActor { | |
case class Command(cmd: Any, id: Long) | |
case class Confirm(id: Long) | |
sealed trait Event | |
case class Append(cmd: Any) extends Event | |
case class Remove(id: Long) extends Event | |
} | |
class SimpleJournalActor(destination: ActorPath) | |
extends PersistentActor with AtLeastOnceDelivery { | |
import SimpleJournalActor._ | |
val log = Logging(context.system, this) | |
val startedAt = System.currentTimeMillis | |
var compactToSeqNr: Option[Long] = None | |
def receiveCommand = { | |
case u: UnconfirmedWarning => log.warning("Unconfirmed warning: {}", u) | |
case Confirm(id) => persist(Remove(id))(updateState) | |
case SaveSnapshotSuccess(meta) => { | |
log.info("Snapshot stored: {}", meta) | |
compactToSeqNr.map(deleteMessages(_, true)) | |
compactToSeqNr = None | |
} | |
case SaveSnapshotFailure(meta, cause) => log.error(cause, "Error while saving snapshot: {}", meta) | |
case cmd => persist(Append(cmd))(updateState) | |
} | |
case class Recovery(appends: Int, removes: Int) | |
var recovery = Recovery(0, 0) | |
def receiveRecover = { | |
case SnapshotOffer(_, snapshot: AtLeastOnceDeliverySnapshot) => { | |
setDeliverySnapshot(snapshot) | |
log.info("Snapshot offered.") | |
} | |
case event: Event => { | |
recovery = event match { | |
case _: Append => recovery.copy(appends = recovery.appends + 1) | |
case _: Remove => recovery.copy(removes = recovery.removes + 1) | |
} | |
updateState(event) | |
} | |
case RecoveryCompleted => | |
log.info("Recovery completed in {}: {}", System.currentTimeMillis() - startedAt, recovery) | |
} | |
def compact(): Unit = if (numberOfUnconfirmed == 0) { | |
val nr = lastSequenceNr | |
log.info("No unconfirmed messages. Compacting journal. lastSequenceNr: {}.", nr) | |
compactToSeqNr = Some(nr) | |
saveSnapshot(getDeliverySnapshot) | |
} | |
def updateState(event: Event) = event match { | |
case Append(s) => deliver(destination, deliveryId => Command(s, deliveryId)) | |
case Remove(id) => { | |
confirmDelivery(id) | |
if (!recoveryRunning) compact() | |
} | |
} | |
} | |
object WorkerActor { | |
case class Work(n: Long) | |
case class RejectFrom(n: Long) | |
} | |
class WorkerActor extends Actor { | |
import SimpleJournalActor._ | |
import WorkerActor._ | |
val log = Logging(context.system, this) | |
def confirm(command: Command): Unit = { | |
log.info("Confirmation for: {}", command.cmd) | |
sender ! Confirm(command.id) | |
} | |
def confirmAll: Receive = { | |
case command: Command => confirm(command) | |
case RejectFrom(n) => context.become(rejectFrom(n)) | |
} | |
def rejectFrom(n: Long): Receive = { | |
case command @ Command(Work(workId), id) => if (workId.toLong < n) confirm(command) | |
} | |
def receive = confirmAll | |
} | |
object SimpleJournalActorTest { | |
import WorkerActor._ | |
def main(args: Array[String]): Unit = { | |
if (args.length < 1) System.err.println("SimpleActorTest play|recover") else { | |
val system = ActorSystem() | |
val worker = system.actorOf(Props[WorkerActor]) | |
val journal = system.actorOf(Props(classOf[SimpleJournalActor], worker.path)) | |
args(0) match { | |
case "play" => { | |
(0 until 10).foreach(i => journal ! Work(i)) | |
Thread.sleep(5000) | |
worker ! RejectFrom(15) | |
(10 until 20).foreach(i => journal ! Work(i)) | |
} | |
case _ => println("recovery") | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment