-
-
Save calvinlfer/6347896c16aab31c1a65dd5b8b6c3204 to your computer and use it in GitHub Desktop.
Demo of an Event-driven Architecture in Akka and Scala. Show-casing Events-first DDD, Event Sourced Aggregates, Process Manager, etc.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sample.eventdriven.scala | |
import akka.actor.{Actor, ActorRef, ActorSystem, Inbox, Props} | |
import akka.persistence.PersistentActor | |
import scala.concurrent.ExecutionContext.Implicits._ | |
import scala.concurrent.duration._ | |
// =============================================================== | |
// Demo of an Event-driven Architecture in Akka and Scala. | |
// | |
// Show-casing: | |
// 1. Events-first Domain Driven Design using Commands and Events | |
// 2. Asynchronous communication through an Event Stream | |
// 3. Asynchronous Process Manager driving the workflow | |
// 4. Event-sourced Aggregates | |
// | |
// Used in my talk on 'How Events Are Reshaping Modern Systems'. | |
// | |
// NOTE: This is a very much simplified and dumbed down sample | |
// that is by no means a template for production use. | |
// F.e. in a real-world app I would not use Serializable | |
// but a JSON, Protobuf, Avro, or some other good lib. | |
// I would not use Akka's built in EventStream, but | |
// probably Kafka or Kinesis. Etc. | |
// =============================================================== | |
object OrderManagement extends App { | |
// ========================================================= | |
// Commands | |
// ========================================================= | |
sealed trait Command | |
case class CreateOrder(userId: Int, productId: Int) extends Command | |
case class ReserveProduct(userId: Int, productId: Int) extends Command | |
case class SubmitPayment(userId: Int, productId: Int) extends Command | |
case class ShipProduct(userId: Int, txId: Int) extends Command | |
// ========================================================= | |
// Events | |
// ========================================================= | |
sealed trait Event | |
case class ProductReserved(userId: Int, txId: Int) extends Event | |
case class ProductOutOfStock(userId: Int, productId: Int) extends Event | |
case class PaymentAuthorized(userId: Int, txId: Int) extends Event | |
case class PaymentDeclined(userId: Int, txId: Int) extends Event | |
case class ProductShipped(userId: Int, txId: Int) extends Event | |
case class OrderCompleted(userId: Int, txId: Int) extends Event | |
// ========================================================= | |
// Top-level service functioning as a Process Manager | |
// Coordinating the workflow on behalf of the Client | |
// ========================================================= | |
class Orders(client: ActorRef, inventory: ActorRef, payment: ActorRef) extends Actor { | |
override def preStart = { | |
system.eventStream.subscribe(self, classOf[ProductReserved]) // Subscribe to ProductReserved Events | |
system.eventStream.subscribe(self, classOf[ProductOutOfStock]) // Subscribe to ProductOutOfStock Events | |
system.eventStream.subscribe(self, classOf[ProductShipped]) // Subscribe to ProductShipped Events | |
system.eventStream.subscribe(self, classOf[PaymentAuthorized]) // Subscribe to PaymentAuthorized Events | |
system.eventStream.subscribe(self, classOf[PaymentDeclined]) // Subscribe to PaymentDeclined Events | |
} | |
def receive = { | |
case cmd: CreateOrder => // 1. Receive CreateOrder Command | |
inventory.tell(ReserveProduct(cmd.userId, cmd.productId), self) // 2. Send ReserveProduct Command to Inventory | |
println(s"COMMAND:\t\t$cmd => ${self.path.name}") | |
case evt: ProductReserved => // 3. Receive ProductReserved Event | |
payment.tell(SubmitPayment(evt.userId, evt.txId), self) // 4. Send SubmitPayment Command to Payment | |
println(s"EVENT:\t\t\t$evt => ${self.path.name}") | |
case evt: PaymentAuthorized => // 5. Receive PaymentAuthorized Event | |
inventory.tell(ShipProduct(evt.userId, evt.txId), self) // 6. Send ShipProduct Command to Inventory | |
println(s"EVENT:\t\t\t$evt => ${self.path.name}") | |
case evt: ProductShipped => // 7. Receive ProductShipped Event | |
client.tell(OrderCompleted(evt.userId, evt.txId), self) // 8. Send OrderCompleted Event back to Client | |
println(s"EVENT:\t\t\t$evt => ${self.path.name}") | |
} | |
} | |
// ========================================================= | |
// Event Sourced Aggregate | |
// ========================================================= | |
class Inventory extends PersistentActor { | |
val persistenceId = "Inventory" | |
var nrOfProductsShipped = 0 // Mutable state, persisted in memory (AKA Memory Image) | |
def reserveProduct(userId: Int, productId: Int): Event = { | |
println(s"SIDE-EFFECT:\tReserving Product => ${self.path.name}") | |
ProductReserved(userId, productId) | |
} | |
def shipProduct(userId: Int, txId: Int): Event = { | |
nrOfProductsShipped += 1 // Update internal state | |
println(s"SIDE-EFFECT:\tShipping Product => ${self.path.name}" + " - ProductsShipped: " + nrOfProductsShipped) | |
ProductShipped(userId, txId) | |
} | |
def receiveCommand = { | |
case cmd: ReserveProduct => // Receive ReserveProduct Command | |
val productStatus = reserveProduct(cmd.userId, cmd.productId) // Try to reserve the product | |
persist(productStatus) { event => // Try to persist the Event | |
context.system.eventStream.publish(event) // If successful, publish Event to Event Stream | |
} | |
println(s"COMMAND:\t\t$cmd => ${self.path.name}") | |
case cmd: ShipProduct => // Receive ShipProduct Command | |
val shippingStatus = shipProduct(cmd.userId, cmd.txId) // Try to ship the product | |
persist(shippingStatus) { event => // Try to persist the Event | |
context.system.eventStream.publish(event) // If successful, publish Event to Event Stream | |
} | |
println(s"COMMAND:\t\t$cmd => ${self.path.name}") | |
} | |
def receiveRecover = { | |
case event: ProductReserved => // Replay the ProductReserved events | |
println(s"EVENT (REPLAY):\t$event => ${self.path.name}") | |
case event: ProductShipped => // Replay the ProductShipped events | |
nrOfProductsShipped += 1 | |
println(s"EVENT (REPLAY):\t$event => ${self.path.name} - ProductsShipped: $nrOfProductsShipped") | |
} | |
} | |
// ========================================================= | |
// Event Sourced Aggregate | |
// ========================================================= | |
class Payment extends PersistentActor { | |
val persistenceId = "Payment" | |
var uniqueTransactionNr = 0 // Mutable state, persisted in memory (AKA Memory Image) | |
def processPayment(userId: Int, txId: Int): Event = { | |
uniqueTransactionNr += 1 | |
println(s"SIDE-EFFECT:\tProcessing Payment => ${self.path.name} - TxNumber: $uniqueTransactionNr") | |
PaymentAuthorized(userId, uniqueTransactionNr) | |
} | |
def receiveCommand = { | |
case cmd: SubmitPayment => // Receive SubmitPayment Command | |
val paymentStatus = processPayment(cmd.userId, cmd.productId) // Try to pay product | |
persist(paymentStatus) { event => // Try to persist Event | |
context.system.eventStream.publish(event) // If successful, publish Event to Event Stream | |
} | |
println(s"COMMAND:\t\t$cmd => ${self.path.name}") | |
} | |
def receiveRecover = { | |
case evt: PaymentAuthorized => // Replay the PaymentAuthorized events | |
uniqueTransactionNr += 1 | |
println(s"EVENT (REPLAY):\t$evt => ${self.path.name} - TxNumber: $uniqueTransactionNr") | |
case evt: PaymentDeclined => // Replay the PaymentDeclined events | |
println(s"EVENT (REPLAY):\t$evt => ${self.path.name}") | |
} | |
} | |
// ========================================================= | |
// Running the Order Management simulation | |
// ========================================================= | |
val system = ActorSystem("OrderManagement") | |
// Plumbing for "client" | |
val clientInbox = Inbox.create(system) | |
val client = clientInbox.getRef() | |
// Create the services (cheating with "DI" by exploiting enclosing object scope) | |
val inventory = system.actorOf(Props(classOf[Inventory]), "Inventory") | |
val payment = system.actorOf(Props(classOf[Payment]), "Payment") | |
val orders = system.actorOf(Props(classOf[Orders], client, inventory, payment), "Orders") | |
// Submit an order | |
clientInbox.send(orders, CreateOrder(9, 1337)) // Send a CreateOrder Command to the Orders service | |
clientInbox.receive(5.seconds) match { // Wait for OrderCompleted Event | |
case confirmation: OrderCompleted => | |
println(s"EVENT:\t\t\t$confirmation => Client") | |
} | |
system.terminate().foreach(_ => println("System has terminated")) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment