-
-
Save dwijnand/6d05633e08b563f59710bfa79c8f7dbf to your computer and use it in GitHub Desktop.
Demo of an Event-driven Architecture in Akka and Scala. Show-casing Events-first DDD, Event Sourced Aggregates, Process Manager, etc.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
project/ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// sbt.version=1.2.6 | |
val ordermgtm = project in file(".") | |
organization in ThisBuild := "com.dwijnand" | |
version in ThisBuild := "0.1.0-SNAPSHOT" | |
scalaVersion in ThisBuild := "2.12.7" | |
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.17" | |
libraryDependencies += "com.typesafe.akka" %% "akka-persistence" % "2.5.17" | |
libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.5.17" | |
libraryDependencies += "com.typesafe.akka" %% "akka-persistence-typed" % "2.5.17" | |
libraryDependencies += "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8" | |
fork in run := true // for leveldb | |
// workaround to avoid defining in src/main/resources/application.conf | |
// because GitHub Gist doesn't support directories | |
javaOptions ++= Seq( | |
//"-Dakka.persistence.journal.plugin=akka.persistence.journal.inmem", | |
"-Dakka.persistence.journal.plugin=akka.persistence.journal.leveldb", | |
"-Dakka.persistence.journal.leveldb.dir=target/journal", | |
) | |
cancelable in Global := true |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sample.eventdriven.scala | |
import akka.actor.{Actor, ActorRef, ActorSystem, Inbox, Props} | |
import akka.actor.typed.Behavior | |
import akka.actor.typed.scaladsl.Behaviors | |
import akka.actor.typed.scaladsl.adapter._ | |
import akka.persistence.PersistentActor | |
import akka.persistence.typed.scaladsl.PersistentBehaviors | |
import akka.persistence.typed.scaladsl.Effect | |
import scala.concurrent.ExecutionContext.Implicits._ | |
import scala.concurrent.duration._ | |
// =============================================================== | |
// Demo of an Event-driven Architecture in Akka and Scala. | |
// | |
// Show-casing: | |
// 1. Events-first Domain Driven Design using Commands and Events | |
// 2. Asynchronous communication through an Event Stream | |
// 3. Asynchronous Process Manager driving the workflow | |
// 4. Event-sourced Aggregates | |
// | |
// Used in my talk on 'How Events Are Reshaping Modern Systems'. | |
// | |
// NOTE: This is a very much simplified and dumbed down sample | |
// that is by no means a template for production use. | |
// F.e. in a real-world app I would not use Serializable | |
// but a JSON, Protobuf, Avro, or some other good lib. | |
// I would not use Akka's built in EventStream, but | |
// probably Kafka or Kinesis. Etc. | |
// =============================================================== | |
object OrderManagement extends App { | |
sealed trait OrderMessage | |
sealed trait OrderCommand extends OrderMessage | |
sealed trait OrderEvent extends OrderMessage | |
sealed trait InventoryCommand | |
sealed trait InventoryEvent | |
final case class InventoryState(nrOfProductsShipped: Int = 0) | |
// ========================================================= | |
// Commands | |
// ========================================================= | |
sealed trait Command | |
final case class CreateOrder(userId: Int, productId: Int) extends Command with OrderCommand | |
final case class ReserveProduct(userId: Int, productId: Int) extends Command with InventoryCommand | |
final case class SubmitPayment(userId: Int, productId: Int) extends Command | |
final case class ShipProduct(userId: Int, txId: Int) extends Command with InventoryCommand | |
// ========================================================= | |
// Events | |
// ========================================================= | |
sealed trait Event | |
final case class ProductReserved(userId: Int, txId: Int) extends Event with OrderEvent with InventoryEvent | |
final case class ProductOutOfStock(userId: Int, productId: Int) extends Event with OrderEvent | |
final case class PaymentAuthorized(userId: Int, txId: Int) extends Event with OrderEvent | |
final case class PaymentDeclined(userId: Int, txId: Int) extends Event with OrderEvent | |
final case class ProductShipped(userId: Int, txId: Int) extends Event with OrderEvent with InventoryEvent | |
final case class OrderFailed(userId: Int, txId: Int, reason: String) extends Event | |
final case class OrderCompleted(userId: Int, txId: Int) extends Event | |
// ========================================================= | |
// Top-level service functioning as a Process Manager | |
// Coordinating the workflow on behalf of the Client | |
// ========================================================= | |
def mkOrders(client: ActorRef, inventory: ActorRef, payment: ActorRef) = Behaviors.setup[OrderCommand] { context => | |
val system = context.system.toUntyped | |
val self = context.self.toUntyped | |
{ | |
system.eventStream.subscribe(self, classOf[ProductReserved]) // Subscribe to ProductReserved Events | |
system.eventStream.subscribe(self, classOf[ProductOutOfStock]) // Subscribe to ProductOutOfStock Events | |
system.eventStream.subscribe(self, classOf[ProductShipped]) // Subscribe to ProductShipped Events | |
system.eventStream.subscribe(self, classOf[PaymentAuthorized]) // Subscribe to PaymentAuthorized Events | |
system.eventStream.subscribe(self, classOf[PaymentDeclined]) // Subscribe to PaymentDeclined Events | |
} | |
Behaviors.receiveMessage[OrderMessage] { | |
case cmd: CreateOrder => // 1. Receive CreateOrder Command | |
inventory.tell(ReserveProduct(cmd.userId, cmd.productId), self) // 2. Send ReserveProduct Command to Inventory | |
println(s"COMMAND:\t\t$cmd => ${self.path.name}") | |
Behaviors.same | |
case evt: ProductReserved => // 3. Receive ProductReserved Event | |
payment.tell(SubmitPayment(evt.userId, evt.txId), self) // 4. Send SubmitPayment Command to Payment | |
println(s"EVENT:\t\t\t$evt => ${self.path.name}") | |
Behaviors.same | |
case evt: ProductOutOfStock => // ALT 3. Receive ProductOutOfStock Event | |
client.tell(OrderFailed(evt.userId, evt.txId, "out of stock"), self) // ALT 4. Send OrderFailed Event back to Client | |
println(s"EVENT:\t\t\t$evt => ${self.path.name}") | |
Behaviors.same | |
case evt: PaymentAuthorized => // 5. Receive PaymentAuthorized Event | |
inventory.tell(ShipProduct(evt.userId, evt.txId), self) // 6. Send ShipProduct Command to Inventory | |
println(s"EVENT:\t\t\t$evt => ${self.path.name}") | |
Behaviors.same | |
case evt: PaymentDeclined => // ALT 5. Receive PaymentDeclined Event | |
client.tell(OrderFailed(evt.userId, evt.txId, "out of stock"), self) // ALT 6. Send OrderFailed Event back to Client | |
println(s"EVENT:\t\t\t$evt => ${self.path.name}") | |
Behaviors.same | |
case evt: ProductShipped => // 7. Receive ProductShipped Event | |
client.tell(OrderCompleted(evt.userId, evt.txId), self) // 8. Send OrderCompleted Event back to Client | |
println(s"EVENT:\t\t\t$evt => ${self.path.name}") | |
Behaviors.same | |
}.narrow | |
} | |
// ========================================================= | |
// Event Sourced Aggregate | |
// ========================================================= | |
def mkInventory = Behaviors.setup[InventoryCommand] { context => | |
val persistenceId = "Inventory" | |
val system = context.system.toUntyped | |
val self = context.self.toUntyped | |
def reserveProduct(userId: Int, productId: Int): InventoryEvent = { | |
println(s"SIDE-EFFECT:\tReserving Product => ${self.path.name}") | |
ProductReserved(userId, productId) | |
} | |
def shipProduct(userId: Int, txId: Int): InventoryEvent = { | |
println(s"SIDE-EFFECT:\tShipping Product => ${self.path.name}") | |
ProductShipped(userId, txId) | |
} | |
def receiveCommand: (InventoryState, InventoryCommand) ⇒ Effect[InventoryEvent, InventoryState] = { | |
case (state, cmd: ReserveProduct) => // Receive ReserveProduct Command | |
val productStatus = reserveProduct(cmd.userId, cmd.productId) // Try to reserve the product | |
println(s"COMMAND:\t\t$cmd => ${self.path.name}") | |
Effect.persist(productStatus).thenRun { event => // Try to persist the Event | |
system.eventStream.publish(event) // If successful, publish Event to Event Stream | |
} | |
case (state, cmd: ShipProduct) => // Receive ShipProduct Command | |
val shippingStatus = shipProduct(cmd.userId, cmd.txId) // Try to ship the product | |
println(s"COMMAND:\t\t$cmd => ${self.path.name}") | |
Effect.persist(shippingStatus).thenRun { event => // Try to persist the Event | |
system.eventStream.publish(event) // If successful, publish Event to Event Stream | |
} | |
} | |
def receiveRecover: (InventoryState, InventoryEvent) ⇒ InventoryState = { | |
case (state, event: ProductReserved) => // Replay the ProductReserved events | |
println(s"EVENT (REPLAY):\t$event => ${self.path.name}") | |
state | |
case (state, event: ProductShipped) => // Replay the ProductShipped events | |
val newState = state.copy(nrOfProductsShipped = state.nrOfProductsShipped + 1) | |
println(s"EVENT (REPLAY):\t$event => ${self.path.name} - ProductsShipped: ${newState.nrOfProductsShipped}") | |
newState | |
} | |
PersistentBehaviors.receive[InventoryCommand, InventoryEvent, InventoryState]( | |
persistenceId = persistenceId, | |
emptyState = InventoryState(), | |
commandHandler = receiveCommand, | |
eventHandler = receiveRecover, | |
) | |
} | |
// ========================================================= | |
// Event Sourced Aggregate | |
// ========================================================= | |
class Payment extends PersistentActor { | |
val persistenceId = "Payment" | |
var uniqueTransactionNr = 0 // Mutable state, persisted in memory (AKA Memory Image) | |
def processPayment(userId: Int, txId: Int): Event = { | |
uniqueTransactionNr += 1 | |
println(s"SIDE-EFFECT:\tProcessing Payment => ${self.path.name} - TxNumber: $uniqueTransactionNr") | |
PaymentAuthorized(userId, uniqueTransactionNr) | |
} | |
def receiveCommand = { | |
case cmd: SubmitPayment => // Receive SubmitPayment Command | |
val paymentStatus = processPayment(cmd.userId, cmd.productId) // Try to pay product | |
persist(paymentStatus) { event => // Try to persist Event | |
context.system.eventStream.publish(event) // If successful, publish Event to Event Stream | |
} | |
println(s"COMMAND:\t\t$cmd => ${self.path.name}") | |
} | |
def receiveRecover = { | |
case evt: PaymentAuthorized => // Replay the PaymentAuthorized events | |
uniqueTransactionNr += 1 | |
println(s"EVENT (REPLAY):\t$evt => ${self.path.name} - TxNumber: $uniqueTransactionNr") | |
case evt: PaymentDeclined => // Replay the PaymentDeclined events | |
println(s"EVENT (REPLAY):\t$evt => ${self.path.name}") | |
} | |
} | |
// ========================================================= | |
// Running the Order Management simulation | |
// ========================================================= | |
val system = ActorSystem("OrderManagement") | |
// Plumbing for "client" | |
val clientInbox = Inbox.create(system) | |
val client = clientInbox.getRef() | |
// Create the services (cheating with "DI" by exploiting enclosing object scope) | |
val inventory = system.spawn(mkInventory, "Inventory").toUntyped | |
val payment = system.actorOf(Props(classOf[Payment]), "Payment") | |
val orders = system.spawn(mkOrders(client, inventory, payment), "Orders").toUntyped | |
// Submit an order | |
clientInbox.send(orders, CreateOrder(9, 1337)) // Send a CreateOrder Command to the Orders service | |
clientInbox.receive(5.seconds) match { // Wait for OrderCompleted Event | |
case confirmation: OrderCompleted => | |
println(s"EVENT:\t\t\t$confirmation => Client") | |
} | |
system.terminate().foreach(_ => println("System has terminated")) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment