Last active
December 12, 2015 00:18
-
-
Save krasserm/4682655 to your computer and use it in GitHub Desktop.
Event sourcing and external servce integration (blog post snippets)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
case class Order(id: Int = -1, details: String, | |
creditCardNumber: String, | |
creditCardValidation: Validation = Validation.Pending) | |
sealed trait Validation | |
object Validation { | |
case object Pending extends Validation | |
case object Success extends Validation | |
case object Failure extends Validation | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
case class OrderSubmitted(order: Order) | |
case class CreditCardValidationRequested(orderId: Int, creditCardNumber: String) | |
case class CreditCardValidated(orderId: Int) | |
case class CreditCardValidationFailed(orderId: Int) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
new OrderProcessor(id) with Receiver with Eventsourced |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class OrderProcessor(val id: Int) extends Actor { this: Receiver => | |
var validationRequestChannel: ActorRef = … | |
var orders = Map.empty[Int, Order] // state | |
def receive = { | |
case OrderSubmitted(order) => { | |
val id = orders.size | |
val upd = order.copy(id = id) | |
orders = orders + (id -> upd) | |
sender ! OrderStored(upd) | |
validationRequestChannel ! message.copy(CreditCardValidationRequested(id, order.creditCardNumber)) | |
} | |
… | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
case class SetCreditCardValidator(validator: ActorRef) | |
class OrderProcessor(val id: Int) extends Actor { this: Receiver => | |
val ext = EventsourcingExtension(context.system) | |
var validationRequestChannel: ActorRef = … | |
var orders = Map.empty[Int, Order] | |
def receive = { | |
… | |
case SetCreditCardValidator(validator) => { | |
validationRequestChannel = ext.channelOf(ReliableRequestReplyChannelProps(…, validator) | |
.withRedeliveryMax(10) | |
.withRedeliveryDelay(3 seconds) | |
… | |
.withReplyTimeout(5 second)) | |
} | |
… | |
} | |
} | |
val extension: EventsourcingExtension = … | |
val processor: ActorRef = extension.processorOf(ProcessorProps(1, id => new OrderProcessor(id) with Receiver with Eventsourced)) | |
val validator: ActorRef = … // remote lookup | |
processor ! SetCreditCardValidator(validator) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class OrderProcessor(val id: Int) extends Actor with ActorLogging { this: Receiver => | |
import Validation._ | |
var orders = Map.empty[Int, Order] | |
def receive = { | |
… | |
case CreditCardValidated(orderId) => { | |
onValidationSuccess(orderId) | |
confirm(true) | |
} | |
case CreditCardValidationFailed(orderId) => { | |
onValidationFailure(orderId) | |
confirm(true) | |
} | |
case DestinationNotResponding(channelId, failureCount, request) => { | |
log.warning("Destination of channel {} does not respond (failure count = {}). Negatively confirm message receipt.", channelId, failureCount) | |
confirm(false) // retry (or escalate) | |
} | |
case DestinationFailure(channelId, failureCount, CreditCardValidationRequested(orderId, _), throwable) => { | |
if (failureCount > 2) { | |
onValidationFailure(orderId) | |
confirm(true) | |
} else { | |
log.warning("Destination of channel {} returned a failure (failure count = {}). Negatively confirm message receipt.", channelId, failureCount) | |
confirm(false) // retry | |
} | |
} | |
… | |
} | |
def onValidationSuccess(orderId: Int) { | |
orders.get(orderId).filter(_.creditCardValidation == Pending).foreach { order => | |
val upd = order.copy(creditCardValidation = Success) | |
orders = orders + (orderId -> upd) | |
// notify others about accepted order | |
// … | |
} | |
} | |
def onValidationFailure(orderId: Int) { | |
orders.get(orderId).filter(_.creditCardValidation == Pending).foreach { order => | |
val upd = order.copy(creditCardValidation = Failure) | |
orders = orders + (orderId -> upd) | |
// notify others about rejected order | |
// … | |
} | |
} | |
… | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class OrderProcessor(val id: Int) extends Actor with ActorLogging { this: Receiver => | |
… | |
def receive = { | |
… | |
case DeliveryStopped(channelId) => { | |
val delay = FiniteDuration(5, "seconds") | |
log.warning("Channel {} stopped delivery. Reactivation in {}.", channelId, delay) | |
context.system.scheduler.scheduleOnce(delay, validationRequestChannel, Deliver)(context.dispatcher) | |
} | |
… | |
} | |
override def preStart() { | |
context.system.eventStream.subscribe(self, classOf[DeliveryStopped]) | |
} | |
override def postStop() { | |
context.system.eventStream.unsubscribe(self, classOf[DeliveryStopped]) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment