Last active
December 26, 2015 15:49
-
-
Save DrewEaster/7175926 to your computer and use it in GitHub Desktop.
An adapted version of Vaughn Vernon's ideas from his very useful blog post http://vaughnvernon.co/?p=770
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
trait AggregateType { | |
val description: String | |
val classType: Class[_ <: Actor] | |
} | |
object DomainModel { | |
def apply(name: String)(aggregateTypes: AggregateType*) = { | |
new DomainModel(name, aggregateTypes) | |
} | |
} | |
class DomainModel(name: String, aggregateTypes: Seq[AggregateType]) { | |
val system = ActorSystem(name) | |
val aggregateTypeRegistry = aggregateTypes.foldLeft(Map[Class[_ <: Actor], ActorRef]()) { | |
(acc, at) => acc + (at.classType -> system.actorOf(Props(new AggregateCache(at)), at.classType.getName)) | |
} | |
def aggregateOf(id: String, aggregateType: AggregateType) = { | |
if (aggregateTypeRegistry.contains(aggregateType.classType)) { | |
AggregateRef(id, aggregateTypeRegistry(aggregateType.classType)) | |
} else { | |
throw new IllegalArgumentException("The aggregate type is not supported by this domain model!") | |
} | |
} | |
def shutdown() = { | |
system.shutdown() | |
} | |
} | |
case class CacheMessage(id: String, actualMessage: Any, sender: ActorRef) | |
case class AggregateRef(id: String, cache: ActorRef) { | |
def tell(message: Any)(implicit sender: ActorRef = null) { | |
cache ! CacheMessage(id, message, sender) | |
} | |
def !(message: Any)(implicit sender: ActorRef = null) { | |
cache ! CacheMessage(id, message, sender) | |
} | |
} | |
class AggregateCache(aggregateType: AggregateType) extends Actor { | |
def receive = { | |
case message: CacheMessage => | |
val aggregate = context.child(message.id).getOrElse { | |
context.actorOf(Props.create(aggregateType.classType), message.id) | |
} | |
aggregate.tell(message.actualMessage, message.sender) | |
} | |
} | |
// Prototype | |
case class OrderItem(id: String, product: String, price: BigDecimal) | |
case class CreateOrder(items: Seq[OrderItem]) | |
class Order extends Actor { | |
def receive = { | |
case o: CreateOrder => | |
println(s"Creating Order $o") | |
case a: Any => | |
println(s"message is $a") | |
} | |
} | |
object Order extends AggregateType { | |
val description = "Order Aggregate Root" | |
val classType = classOf[Order] | |
} | |
object DomainModelPrototype extends App { | |
val model = DomainModel("prototype") { | |
Order | |
// TODO: Register other aggregates here... | |
} | |
val order = model.aggregateOf("123", Order) | |
order ! CreateOrder(List(OrderItem("1234", "Banana", BigDecimal("10.99")))) | |
awaitCompletion() | |
model.shutdown() | |
println("DomainModelPrototype: is completed.") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I have been pondering something similar recently and found both Vernon's blog and the akka-users discussion. There is one thing I don't quite get : why do you need to register the aggregate types before hand ?
wouldn't it be simpler to auto register on the first call of aggregateOf ?