Last active
September 10, 2015 10:11
-
-
Save jpallari/953d7b96826121223dbd to your computer and use it in GitHub Desktop.
At least once delivery with async persistence
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ainakin.kerran | |
import akka.actor._ | |
import akka.persistence.{AtLeastOnceDelivery, PersistentActor} | |
import com.typesafe.config.ConfigFactory | |
import scala.concurrent.duration._ | |
object AinakinKerran { | |
def start(): Unit = { | |
val as = ActorSystem("AinakinKerran", ConfigFactory.load("common")) | |
as.actorOf(Props[AinakinKerran], "root") | |
} | |
sealed trait Command | |
case object StopCmd extends Command | |
case object StartCmd extends Command | |
case class MsgCmd(contents: String) extends Command | |
case class ConfirmableCmd(deliveryId: Long, contents: String) extends Command | |
case class ConfirmCmd(deliveryId: Long) extends Command | |
case class StatusChangedCmd(isStarted: Boolean) extends Command | |
case object TransitionDoneCmd extends Command | |
sealed trait Event | |
case object StopEvt extends Event | |
case object StartEvt extends Event | |
case class DeliveredEvt(contents: String) extends Event | |
case class ConfirmedEvt(deliveryId: Long) extends Event | |
} | |
class AinakinKerran extends Actor with ActorLogging { | |
import AinakinKerran._ | |
val sink = context.actorOf(Props[Sink], "sink") | |
val processor = context.actorOf(Props(new Processor(sink)), "processor") | |
override def receive: Receive = { | |
case StatusChangedCmd(true) => | |
// These will be delivered to the sink. | |
1 to 5 foreach { send } | |
// Stop processor. | |
processor ! StopCmd | |
// These wont be delivered to the sink because of the ongoing transition. | |
6 to 10 foreach { send } | |
case StatusChangedCmd(false) => | |
// These wont be delivered to the sink because we waited for the processor to stop. | |
11 to 13 foreach { send } | |
} | |
private def send(a: Any): Unit = { | |
processor ! MsgCmd(a.toString) | |
} | |
// This wont be delivered to the sink | |
send("ohai") | |
// Start processor | |
processor ! StartCmd | |
} | |
class Processor(sink: ActorRef) extends PersistentActor with AtLeastOnceDelivery with ActorLogging { | |
import AinakinKerran._ | |
override def receiveRecover: Receive = { | |
case StopEvt => context.become(stopped) | |
case StartEvt => context.become(running) | |
case DeliveredEvt(contents) => deliverMsg(contents) | |
case ConfirmedEvt(deliveryId) => confirmDelivery(deliveryId) | |
} | |
override def receiveCommand: Receive = stopped | |
private def stopped: Receive = { | |
case StartCmd => | |
log.info("Starting!") | |
transitionEvent(StartEvt) { | |
sender() ! StatusChangedCmd(isStarted = true) | |
running | |
} | |
case ConfirmCmd(id) => confirmMsg(id) | |
case MsgCmd(contents) => | |
log.info("Received message! Not sending it to the sink! {}", contents) | |
case m => | |
log.warning("Processor got unknown message in stopped state: {}", m) | |
} | |
private def running: Receive = { | |
case StopCmd => | |
log.info("Stopping!") | |
transitionEvent(StopEvt) { | |
sender() ! StatusChangedCmd(isStarted = false) | |
stopped | |
} | |
case ConfirmCmd(id) => confirmMsg(id) | |
case MsgCmd(contents) => | |
persistAsync(DeliveredEvt(contents)) { _ => } | |
deliverMsg(contents) | |
case m => | |
log.warning("Processor got unknown message in running state: {}", m) | |
} | |
private def transitionEvent(evt: Event)(afterTransition: => Receive): Unit = { | |
persistAsync(evt) { _ => self forward TransitionDoneCmd } // transition is finished after persist succeeds | |
context.become(transition(afterTransition)) | |
} | |
private def transition(afterTransition: => Receive): Receive = { | |
case TransitionDoneCmd => // transition done => continue with message processing | |
unstashAll() | |
context.become(afterTransition) | |
case _ => stash() // stash until transition is done | |
} | |
private def confirmMsg(id: Long): Unit = { | |
persistAsync(ConfirmedEvt(id)) { evt => | |
confirmDelivery(evt.deliveryId) | |
log.info("Confirmed message: {}", evt.deliveryId) | |
} | |
} | |
private def deliverMsg(contents: String): Unit = { | |
deliver(sink.path, deliveryId => ConfirmableCmd(deliveryId, contents)) | |
} | |
} | |
class Sink extends Actor with ActorLogging { | |
import AinakinKerran._ | |
import context.dispatcher | |
override def receive: Receive = { | |
case ConfirmableCmd(id, contents) => | |
log.info("Sink got message {}: {}", id, contents) | |
val s = sender() | |
context.system.scheduler.scheduleOnce(100.millis) { | |
// Slow message handling ;^) | |
s ! ConfirmCmd(id) | |
} | |
case m => | |
log.warning("Sink got unknown message: {}", m) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment