Created
September 2, 2010 19:26
-
-
Save muteor/562799 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
error_reporting(E_ALL | E_STRICT); | |
/** | |
* To runs this example you will need the UUID PHP Extension http://pecl.php.net/package/uuid | |
*/ | |
/** | |
* DomainModel Aggregate root for a Tweet | |
* | |
* The Aggregate root tracks all uncommitted events and can have old | |
* events replayed to it. | |
*/ | |
class Domain_Tweet | |
{ | |
protected $_id; | |
protected $_version = -1; | |
protected $_message; | |
protected $_who; | |
protected $_when; | |
protected $_uncommittedEvents = array(); | |
public function __construct($message, $who) | |
{ | |
$event = new Event_NewTweetPostedEvent( | |
uuid_create(UUID_TYPE_RANDOM), | |
$message, | |
$who, | |
time() | |
); | |
$this->_applyEvent($event); | |
} | |
protected function _OnNewTweetPosted(Event_NewTweetPostedEvent $e) | |
{ | |
$this->_id = $e->tweetId; | |
$this->_message = $e->message; | |
$this->_who = $e->who; | |
$this->_when = $e->when; | |
} | |
protected function _applyEvent($event, $fromHistory = false) | |
{ | |
// would find event handler, we have hard-coded here | |
$this->_OnNewTweetPosted($event); | |
$this->_version++; | |
if (false === $fromHistory) { | |
$this->_uncommittedEvents[] = $event; | |
} | |
} | |
public function initializeFromHistory($events) | |
{ | |
foreach ($events as $event) { | |
$this->_applyEvent($event, true); | |
} | |
} | |
public function getUncommittedEvents() | |
{ | |
return $this->_uncommittedEvents; | |
} | |
public function getId() | |
{ | |
return $this->_id; | |
} | |
} | |
/** | |
* DomainEvent a new tweet post, this will be stored in the | |
* event store and replayed to the aggregate root when it is | |
* called back into memory | |
*/ | |
class Event_NewTweetPostedEvent | |
{ | |
public $id; | |
public $eventTime; | |
public $tweetId; | |
public $message; | |
public $who; | |
public $when; | |
public function __construct($tweetId, $message, $who, $when) | |
{ | |
$this->id = uuid_create(UUID_TYPE_RANDOM); | |
$this->eventTime = time(); | |
$this->tweetId = $tweetId; | |
$this->message = $message; | |
$this->who = $who; | |
$this->when = $when; | |
} | |
} | |
/** | |
* Command to post a new tweet, holds all the required data | |
* for the command. This would be sent from the client to | |
* the command service. | |
*/ | |
class Command_PostNewTweetCommand | |
{ | |
public $message; | |
public $who; | |
public function __construct($message, $who) | |
{ | |
$this->message = $message; | |
$this->who = $who; | |
} | |
} | |
/** | |
* Matching Executor for the Command, this actually changes the state | |
* of our domain. Executors are called from the command service. | |
*/ | |
class Command_PostNewTweetExecutor | |
{ | |
protected $_cmdService; | |
public function __construct(CommandService $commandService) | |
{ | |
$this->_cmdService = $commandService; | |
} | |
public function execute($command) | |
{ | |
$tweet = new Domain_Tweet( | |
$command->message, | |
$command->who | |
); | |
$this->_cmdService->getRepository()->save($tweet); | |
} | |
} | |
/** | |
* Classic Domain Repository, handles the persisting of state and recall | |
* thereafter. In CQRS we dont end up with a bloated repository though! | |
*/ | |
class Repository | |
{ | |
protected $_eventStore; | |
protected $_eventBus; | |
public function __construct(EventStore $store, EventBus $bus) | |
{ | |
$this->_eventStore = $store; | |
$this->_eventBus = $bus; | |
} | |
public function getById($id, $type) | |
{ | |
// trick php to not call constructor | |
$aggregate = unserialize( | |
sprintf('O:%d:"%s":0:{}', strlen($type), $type) | |
); | |
$events = $this->_eventStore->getHistoryForAggregateId($id); | |
$aggregate->initializeFromHistory($events); | |
return $aggregate; | |
} | |
public function save($aggregate) | |
{ | |
$eventsToSave = $aggregate->getUncommittedEvents(); | |
foreach ($eventsToSave as $event) { | |
$this->_eventStore->store($aggregate->getId(), $event); | |
$this->_eventBus->enqueue($aggregate->getId(), $event); | |
} | |
$this->_eventBus->publish(); | |
} | |
} | |
/** | |
* An In-Memory store of events that have been applied to the Domain. | |
*/ | |
class EventStore | |
{ | |
protected $_events; | |
public function __construct() | |
{ | |
$this->_events = array(); | |
} | |
public function store($aggregateIdent, $event) | |
{ | |
if (!isset($this->_events[$aggregateIdent])) { | |
$this->_events[$aggregateIdent] = array(); | |
} | |
$this->_events[$aggregateIdent][] = $event; | |
} | |
public function getHistoryForAggregateId($id) | |
{ | |
if (!isset($this->_events[$id])) { | |
return null; | |
} | |
return $this->_events[$id]; | |
} | |
} | |
/** | |
* The event bus sends events over to the read model for processing. | |
* This will then create our de-normalized storage. | |
*/ | |
class EventBus | |
{ | |
protected $_db; | |
protected $_events = array(); | |
public function __construct($db) | |
{ | |
$this->_db = $db; | |
} | |
public function enqueue($aggregateIdent, $event) | |
{ | |
if (!isset($this->_events[$aggregateIdent])) { | |
$this->_events[$aggregateIdent] = array(); | |
} | |
$this->_events[$aggregateIdent] = $event; | |
} | |
public function publish() | |
{ | |
// find the denormalizer, hardcoded here... | |
$denormalizer = new Read_PostNewTweetDenormalizer($this->_db); | |
// write events to the readModel, this would be | |
// where we did transactions etc... | |
foreach ($this->_events as $ident => $event) { | |
$denormalizer->write($ident, $event); | |
} | |
// clear the processed events | |
$this->_events = array(); | |
} | |
} | |
/** | |
* The denormalizer for the PostNewTweet event, this will take the event | |
* and translate its data into a de-normalized form. | |
*/ | |
class Read_PostNewTweetDenormalizer | |
{ | |
protected $_db; | |
public function __construct($db) | |
{ | |
$this->_db = $db; | |
} | |
public function write($aggregateIdent, $event) | |
{ | |
$this->_db->exec("INSERT INTO tweets | |
VALUES('$aggregateIdent', '$event->message', '$event->who', '$event->when')"); | |
} | |
} | |
/** | |
* Command Service, directs the incoming commands to the correct | |
* Command Executor. | |
*/ | |
class CommandService | |
{ | |
protected $_repository; | |
public function __construct(Repository $repository) | |
{ | |
$this->_repository = $repository; | |
} | |
public function execute($command) | |
{ | |
// find the command executor, again we hardcode here... | |
$executor = new Command_PostNewTweetExecutor($this); | |
$executor->execute($command); | |
} | |
public function getRepository() | |
{ | |
return $this->_repository; | |
} | |
} | |
// in-memory read db | |
$db = new PDO('sqlite::memory:'); | |
$db->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); | |
// create the readModel table, stores the denormalized data | |
$db->exec('CREATE TABLE tweets | |
( | |
`id` CHAR(36), | |
`message` VARCHAR(140), | |
`who` VARCHAR(128), | |
`when` TIMESTAMP, | |
PRIMARY KEY(`id`) | |
)'); | |
// setup all the components | |
$eventBus = new EventBus($db); | |
$eventStore = new EventStore(); | |
$repository = new Repository($eventStore, $eventBus); | |
$commandService = new CommandService($repository); | |
// adding a new tweet | |
$tweetCommand = new Command_PostNewTweetCommand('Another really important tweet', 'Keith Pope'); | |
$commandService->execute($tweetCommand); | |
// reading tweets | |
$stmt = $db->query('SELECT * FROM tweets'); | |
$result = $stmt->fetchAll(PDO::FETCH_ASSOC); | |
title('Tweets from ReadModel'); | |
foreach ($result as $tweet) { | |
$id = $tweet['id']; | |
echo $tweet['message'] . '(' . $tweet['who'] . ')' . ' - ' . $tweet['when'] . "\n"; | |
} | |
// whats in the event store? | |
title('Event Store'); | |
print_r($eventStore); | |
// recalling aggregates, this would never happen here but we haven't got an update event :) | |
title('Our Tweet'); | |
$recalled = $repository->getById($id, 'Domain_Tweet'); | |
print_r($recalled); | |
// Ignore this.... | |
function title($title) { | |
echo str_repeat('-', 70); | |
echo "\n$title\n"; | |
echo str_repeat('-', 70) . "\n"; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Example Output:
----------------------------------------------------------------------
Tweets from ReadModel
----------------------------------------------------------------------
Another really important tweet(Keith Pope) - 1283456671
----------------------------------------------------------------------
Event Store
----------------------------------------------------------------------
EventStore Object
(
[_events:protected] => Array
(
[E2069990-7EF8-4E0C-BCD8-3583D75BFD6C] => Array
(
[0] => Event_NewTweetPostedEvent Object
(
[id] => E4DA269B-8E73-41C3-96E9-186472F495CE
[eventTime] => 1283456671
[tweetId] => E2069990-7EF8-4E0C-BCD8-3583D75BFD6C
[message] => Another really important tweet
[who] => Keith Pope
[when] => 1283456671
)