Created
November 10, 2017 09:06
-
-
Save jboner/4361c710d2e2386be8bddc39edb0528a to your computer and use it in GitHub Desktop.
Demo of an Event-driven Architecture in Akka and Java. Show-casing Events-first DDD, Event Sourced Aggregates, Process Manager, etc.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sample.eventdriven.java; | |
import akka.actor.*; | |
import akka.persistence.AbstractPersistentActor; | |
import scala.concurrent.duration.Duration; | |
import java.io.Serializable; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.TimeoutException; | |
// =============================================================== | |
// Demo of an Event-driven Architecture in Akka and Java. | |
// | |
// Show-casing: | |
// 1. Events-first Domain Driven Design using Commands and Events | |
// 2. Asynchronous communication through an Event Stream | |
// 3. Asynchronous Process Manager driving the workflow | |
// 4. Event-sourced Aggregates | |
// | |
// Used in my talk on 'How Events Are Reshaping Modern Systems'. | |
// | |
// NOTE: This is a very much simplified and dumbed down sample | |
// that is by no means a template for production use. | |
// F.e. in a real-world app I would not use Serializable | |
// but a JSON, Protobuf, Avro, or some other good lib. | |
// I would not use Akka's built in EventStream, but | |
// probably Kafka or Kinesis. Etc. | |
// =============================================================== | |
// ========================================================= | |
// Commands | |
// ========================================================= | |
interface Command extends Serializable { | |
} | |
class CreateOrder implements Command { | |
final int userId; | |
final int productId; | |
CreateOrder(int userId, int productId) { | |
this.userId = userId; | |
this.productId = productId; | |
} | |
} | |
class ReserveProduct implements Command { | |
final int userId; | |
final int productId; | |
ReserveProduct(int userId, int productId) { | |
this.userId = userId; | |
this.productId = productId; | |
} | |
} | |
class SubmitPayment implements Command { | |
final int userId; | |
final int productId; | |
SubmitPayment(int userId, int productId) { | |
this.userId = userId; | |
this.productId = productId; | |
} | |
} | |
class ShipProduct implements Command { | |
final int userId; | |
final int txId; | |
ShipProduct(int userId, int txId) { | |
this.userId = userId; | |
this.txId = txId; | |
} | |
} | |
// ========================================================= | |
// Events | |
// ========================================================= | |
interface Event extends Serializable { | |
} | |
class ProductReserved implements Event { | |
final int userId; | |
final int txId; | |
ProductReserved(int userId, int txId) { | |
this.userId = userId; | |
this.txId = txId; | |
} | |
} | |
class ProductOutOfStock implements Event { | |
final int userId; | |
final int productId; | |
ProductOutOfStock(int userId, int productId) { | |
this.userId = userId; | |
this.productId = productId; | |
} | |
} | |
class PaymentAuthorized implements Event { | |
final int userId; | |
final int txId; | |
PaymentAuthorized(int userId, int txId) { | |
this.userId = userId; | |
this.txId = txId; | |
} | |
} | |
class PaymentDeclined implements Event { | |
final int userId; | |
final int txId; | |
PaymentDeclined(int userId, int txId) { | |
this.userId = userId; | |
this.txId = txId; | |
} | |
} | |
class ProductShipped implements Event { | |
final int userId; | |
final int txId; | |
ProductShipped(int userId, int txId) { | |
this.userId = userId; | |
this.txId = txId; | |
} | |
} | |
class OrderCompleted implements Event { | |
final int userId; | |
final int txId; | |
OrderCompleted(int userId, int txId) { | |
this.userId = userId; | |
this.txId = txId; | |
} | |
} | |
// ========================================================= | |
// Top-level service functioning as a Process Manager | |
// Coordinating the workflow on behalf of the Client | |
// ========================================================= | |
class Orders extends AbstractActor { | |
final ActorRef client; | |
final ActorRef inventory; | |
final ActorRef payment; | |
public Orders(ActorRef client, ActorRef inventory, ActorRef payment) { | |
this.client = client; | |
this.inventory = inventory; | |
this.payment = payment; | |
} | |
@Override | |
public Receive createReceive() { | |
return receiveBuilder() | |
.match(CreateOrder.class, cmd -> { | |
System.out.println("COMMAND:\t\t" + cmd + " => " + getSelf().path().name()); | |
inventory.tell(new ReserveProduct(cmd.userId, cmd.productId), getSelf()); | |
}) | |
.match(ProductReserved.class, evt -> { | |
System.out.println("EVENT:\t\t\t" + evt + " => " + getSelf().path().name()); | |
payment.tell(new SubmitPayment(evt.userId, evt.txId), getSelf()); | |
}) | |
.match(PaymentAuthorized.class, evt -> { | |
System.out.println("EVENT:\t\t\t" + evt + " => " + getSelf().path().name()); | |
inventory.tell(new ShipProduct(evt.userId, evt.txId), getSelf()); | |
}) | |
.match(ProductShipped.class, evt -> { | |
System.out.println("EVENT:\t\t\t" + evt + " => " + getSelf().path().name()); | |
client.tell(new OrderCompleted(evt.userId, evt.txId), getSelf()); | |
}) | |
.build(); | |
} | |
@Override | |
public void preStart() { | |
// Subscribe to Events from the Event Stream | |
getContext().system().eventStream().subscribe(getSelf(), ProductReserved.class); | |
getContext().system().eventStream().subscribe(getSelf(), ProductOutOfStock.class); | |
getContext().system().eventStream().subscribe(getSelf(), ProductShipped.class); | |
getContext().system().eventStream().subscribe(getSelf(), PaymentAuthorized.class); | |
getContext().system().eventStream().subscribe(getSelf(), PaymentDeclined.class); | |
} | |
} | |
// ========================================================= | |
// Event Sourced Aggregate | |
// ========================================================= | |
class Inventory extends AbstractPersistentActor { | |
@Override | |
public String persistenceId() { | |
return "inventory"; | |
} | |
int nrOfProductsShipped = 0; // Mutable state, persisted in memory (AKA Memory Image) | |
Event reserveProduct(int userId, int productId) { | |
System.out.println("SIDE-EFFECT:\tReserving Product => " + getSelf().path().name()); | |
return new ProductReserved(userId, productId); | |
} | |
Event shipProduct(int userId, int txId) { | |
nrOfProductsShipped += 1; // Update internal state | |
System.out.println("SIDE-EFFECT:\tShipping Product => " + getSelf().path().name() + | |
" - ProductsShipped: " + nrOfProductsShipped); | |
return new ProductShipped(userId, txId); | |
} | |
@Override | |
public Receive createReceive() { | |
return receiveBuilder() | |
.match(ReserveProduct.class, cmd -> { // Receive ReserveProduct Command | |
System.out.println("COMMAND:\t\t" + cmd + " => " + getSelf().path().name()); | |
Event productStatus = reserveProduct(cmd.userId, cmd.productId); // Try to reserve the product | |
persist(productStatus, evt -> { // Try to persist the Event | |
getContext().system().eventStream().publish(evt); // Publish Event to Event Stream | |
}); | |
}) | |
.match(ShipProduct.class, cmd -> { // Receive ShipProduct Command | |
System.out.println("COMMAND:\t\t" + cmd + " => " + getSelf().path().name()); | |
Event shippingStatus = shipProduct(cmd.userId, cmd.txId); // Try to ship the product | |
persist(shippingStatus, evt -> { // Try to persist the Event | |
getContext().system().eventStream().publish(evt); // Publish Event to Event Stream | |
}); | |
}) | |
.build(); | |
} | |
@Override | |
public Receive createReceiveRecover() { | |
return receiveBuilder() | |
.match(ProductReserved.class, evt -> { // Replay ProductReserved | |
System.out.println("EVENT (REPLAY):\t" + evt + " => " + getSelf().path().name()); | |
}) | |
.match(ProductShipped.class, evt -> { // Replay ProductShipped | |
nrOfProductsShipped += 1; // Update the internal state | |
System.out.println("EVENT (REPLAY):\t" + evt + " => " + getSelf().path().name() + | |
" - ProductsShipped: " + nrOfProductsShipped); | |
}) | |
.build(); | |
} | |
} | |
// ========================================================= | |
// Event Sourced Aggregate | |
// ========================================================= | |
class Payment extends AbstractPersistentActor { | |
@Override | |
public String persistenceId() { | |
return "payment"; | |
} | |
int uniqueTransactionNr = 0; // Mutable state, persisted in memory (AKA Memory Image) | |
Event processPayment(int userId, int txId) { | |
uniqueTransactionNr += 1; // Update the internal state | |
System.out.println("SIDE-EFFECT:\tProcessing payment => " + getSelf().path().name() + | |
" - TxNumber: " + uniqueTransactionNr); | |
return new PaymentAuthorized(userId, uniqueTransactionNr); | |
} | |
@Override | |
public Receive createReceive() { | |
return receiveBuilder() | |
.match(SubmitPayment.class, cmd -> { // Receive SubmitPayment Command | |
System.out.println("COMMAND:\t\t" + cmd + " => " + getSelf().path().name()); | |
Event paymentStatus = processPayment(cmd.userId, cmd.productId); // Try to pay product | |
persist(paymentStatus, evt -> { // Try to persist the Event | |
getContext().system().eventStream().publish(evt); // Publish Event to Event Stream | |
}); | |
}) | |
.build(); | |
} | |
@Override | |
public Receive createReceiveRecover() { | |
return receiveBuilder() | |
.match(PaymentAuthorized.class, evt -> { // Replay PaymentAuthorized | |
uniqueTransactionNr += 1; // Update the internal state | |
System.out.println("EVENT (REPLAY):\t" + evt + " => " + getSelf().path().name() + | |
" - TxNumber: " + uniqueTransactionNr); | |
}) | |
.build(); | |
} | |
} | |
// ========================================================= | |
// Running the Order Management simulation | |
// ========================================================= | |
public class OrderManagement { | |
public static void main(String... args) throws Exception { | |
// Create the Order Management actor system | |
final ActorSystem system = ActorSystem.create("OrderManagement"); | |
// Plumbing for "client" | |
final Inbox clientInbox = Inbox.create(system); | |
final ActorRef client = clientInbox.getRef(); | |
// Create the services | |
final ActorRef inventory = system.actorOf(Props.create(Inventory.class), "Inventory"); | |
final ActorRef payment = system.actorOf(Props.create(Payment.class), "Payment"); | |
final ActorRef orders = system.actorOf(Props.create(Orders.class, client, inventory, payment), "Orders"); | |
// Send a CreateOrder Command to the Orders service | |
clientInbox.send(orders, new CreateOrder(9, 1337)); | |
try { | |
// Wait for the order confirmation | |
Object confirmation = clientInbox.receive(Duration.create(5, TimeUnit.SECONDS)); | |
System.out.println("EVENT:\t\t\t" + confirmation + " => Client"); | |
} catch (TimeoutException e) { | |
System.out.println("Waited 5 seconds for the OrderCompleted event, giving up..."); | |
} | |
System.out.println("Order completed. Shutting down system."); | |
system.terminate(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment