Created
April 3, 2015 15:23
-
-
Save Horusiath/bd00d33a130e4add2491 to your computer and use it in GitHub Desktop.
At least once delivery snapshoting desync
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot | |
import akka.persistence.{SnapshotOffer, PersistentActor, AtLeastOnceDelivery} | |
import akka.actor.{Props, ActorSystem, ActorPath, Actor} | |
case class Message(data: String) | |
case class Confirmable(deliveryId: Long, data: String) | |
case class Confirmation(deliveryId: Long) | |
case class Snap(snapshot: AtLeastOnceDeliverySnapshot) | |
class ExampleAtLeastOnceDeliveryActor(val deliveryPath: ActorPath) extends PersistentActor with AtLeastOnceDelivery { | |
override def persistenceId = "guaranteed-1" | |
override def receiveRecover: Receive = { | |
case SnapshotOffer(_, s: Snap) => { | |
setDeliverySnapshot(s.snapshot) | |
println("restored undelivered: " + s.snapshot) | |
} | |
} | |
override def receiveCommand: Receive = { | |
case Message(data) => deliver(deliveryPath, id => { | |
println(s"sending: $data with deliveryId: $id") | |
Confirmable(id, data) | |
}) | |
case Confirmation(deliveryId) => { | |
confirmDelivery(deliveryId) | |
} | |
case "boom" => { | |
throw new Exception() | |
} | |
} | |
override def postStop = { | |
val unconfirmed = getDeliverySnapshot | |
saveSnapshot(Snap(unconfirmed)) | |
println("stored undelivered " + unconfirmed.unconfirmedDeliveries) | |
super.postStop() | |
} | |
} | |
class DeliveryActor extends Actor { | |
var confirming = false | |
override def receive: Actor.Receive = { | |
case "start" => confirming = true | |
case "stop" => confirming = false | |
case Confirmable(deliveryId, data) => { | |
if(confirming){ | |
println(s"Confirming message id: $deliveryId and data: $data") | |
context.sender() ! Confirmation(deliveryId) | |
}else{ | |
println(s"Ignoring message id: $deliveryId and data: $data") | |
} | |
} | |
} | |
} | |
object HelloApp extends App { | |
implicit val system = ActorSystem.create("system") | |
val delivery = system.actorOf(Props[DeliveryActor](), "delivery") | |
val deliverer = system.actorOf(Props(classOf[ExampleAtLeastOnceDeliveryActor], delivery.path)) | |
deliverer ! Message("foo") | |
Thread.sleep(1000) | |
deliverer ! "boom" | |
Thread.sleep(1000) | |
deliverer ! Message("bar") | |
Thread.sleep(1000) | |
println("Enabling confirmations") | |
delivery ! "start" | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment