Created
July 22, 2012 12:32
-
-
Save erikrozendaal/3159521 to your computer and use it in GitHub Desktop.
Event sourcing example - part 2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* The result of a commit attempt is either a `Conflict` or a successful `Commit`. | |
*/ | |
type CommitResult[+Event] = Either[Conflict[Event], Commit[Event]] | |
/** | |
* A successful commit to `streamId`. | |
*/ | |
case class Commit[+Event]( | |
storeRevision: StoreRevision, | |
timestamp: Long, | |
streamId: String, | |
streamRevision: StreamRevision, | |
events: Seq[Event]) | |
/** | |
* The conflict that occurred while trying to commit to `streamId`. | |
*/ | |
case class Conflict[+Event]( | |
streamId: String, | |
actual: StreamRevision, | |
expected: StreamRevision, | |
conflicting: Seq[Commit[Event]]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Publishes successful commits to subscribers. | |
*/ | |
trait CommitPublisher[Event] { | |
/** | |
* Notifies `listener` of all commits that happened `since`. Notification happens asynchronously. | |
*/ | |
def subscribe(since: StoreRevision)(listener: Commit[Event] => Unit): Subscription | |
} | |
/** | |
* A subscription that can be cancelled. | |
*/ | |
trait Subscription { | |
def cancel(): Unit | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Commits events to an event store. | |
*/ | |
trait EventCommitter[Event] { | |
def tryCommit(streamId: String, expected: StreamRevision, event: Event): CommitResult[Event] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Reads commits from the event store. | |
*/ | |
trait CommitReader[Event] { | |
def storeRevision: StoreRevision | |
def readCommits(since: StoreRevision, to: StoreRevision): Stream[Commit[Event]] | |
def streamRevision(streamId: String): StreamRevision | |
def readStream(streamId: String, since: StreamRevision, to: StreamRevision): Stream[Commit[Event]] | |
} | |
/** | |
* The event store API. | |
*/ | |
trait EventStore[Event] { | |
def reader: CommitReader[Event] | |
def committer: EventCommitter[Event] | |
def publisher: CommitPublisher[Event] | |
def close(): Unit | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class FakeEventStore[Event] extends EventStore[Event] { | |
// [...] | |
private[this] val commits = Ref(Vector.empty[Commit[Event]]).single | |
private[this] val streams = Ref(Map.empty[String, Vector[Commit[Event]]]).single | |
override object reader extends CommitReader[Event] { | |
override def storeRevision = StoreRevision(commits().size) | |
override def readCommits(since: StoreRevision, to: StoreRevision): Stream[Commit[Event]] = { | |
commits().slice( | |
(since.value min Int.MaxValue).toInt, | |
(to.value min Int.MaxValue).toInt).toStream | |
} | |
// [...] | |
} | |
// [...] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
override def tryCommit(streamId: String, expected: StreamRevision, event: Event): CommitResult[Event] = { | |
require(Txn.findCurrent.isEmpty, "the fake event store cannot participate in an STM transaction, just like a real event store") | |
atomic { implicit txn => | |
val actual = streamRevision(streamId) | |
if (expected < actual) { | |
val conflicting = readStream(streamId, since = expected) | |
Left(Conflict(streamId, actual, expected, conflicting)) | |
} else if (expected > actual) { | |
throw new IllegalArgumentException("expected revision %d greater than actual revision %d" format (expected.value, actual.value)) | |
} else { | |
val commit = Commit(storeRevision.next, DateTimeUtils.currentTimeMillis, streamId, actual.next, Seq(event)) | |
commits.transform(_ :+ commit) | |
streams.transform(streams => streams.updated(streamId, streams.getOrElse(streamId, Vector.empty) :+ commit)) | |
Right(commit) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private[this] val closed = Ref(false).single | |
private[this] val executor = Executors.newCachedThreadPool | |
override object publisher extends CommitPublisher[Event] { | |
override def subscribe(since: StoreRevision)(listener: Commit[Event] => Unit): Subscription = { | |
val cancelled = Ref(false).single | |
val last = Ref(since).single | |
executor.execute(new Runnable { | |
@tailrec override def run { | |
// Wait for new commits or subscription termination. | |
val pending = atomic { implicit txn => | |
if (closed() || cancelled()) None else { | |
val pending = commits().drop(last().value.toInt) | |
if (pending.isEmpty) retry | |
else Some(pending) | |
} | |
} | |
pending match { | |
case None => // Stop. | |
case Some(commits) => | |
// Notify listener and go back to run. | |
commits.foreach { commit => | |
listener(commit) | |
last() = commit.storeRevision | |
} | |
run | |
} | |
} | |
}) | |
// Return a subscription instance that can be used for cancellation. | |
new Subscription { | |
override def cancel() = cancelled.set(true) | |
override def toString = "Subscription(" + last() + ", " + cancelled() + ", " + FakeEventStore.this + ")" | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* A `MemoryImage` tracks an underlying event store and uses the provided | |
* `initialState` and `update` to project the committed events onto the | |
* current state. | |
*/ | |
class MemoryImage[State, Event] private | |
(eventStore: EventStore[Event]) | |
(initialState: State) | |
(update: (State, Commit[Event]) => State) | |
extends EventCommitter[Event] { | |
private[this] val state = Ref(initialState) | |
private[this] val revision = Ref(StoreRevision.Initial) | |
/** | |
* The current state of the memory image with at least all commits applied | |
* that have been committed to the underlying event store. | |
*/ | |
def get: State = { | |
val minimum = eventStore.reader.storeRevision | |
atomic { implicit txn => | |
if (revision() < minimum) retry | |
else state() | |
} | |
} | |
/** | |
* Commits an event to the underlying event store. The memory image will be | |
* updated if the commit succeeds. | |
*/ | |
override def tryCommit(streamId: String, expected: StreamRevision, event: Event): CommitResult[Event] = | |
eventStore.committer.tryCommit(streamId, expected, event) | |
override def toString = "MemoryImage(%s, %s)".format(revision.single.get, eventStore) | |
// Subscribe to the underlying event store and apply every commit to the | |
// current state using the provided `update` function. | |
eventStore.publisher.subscribe(StoreRevision.Initial) { commit => | |
atomic { implicit txn => | |
require(revision().next == commit.storeRevision, "expected: " + revision().next + ", got " + commit.storeRevision) | |
state.transform(s => update(s, commit)) | |
revision() = commit.storeRevision | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class PostsController(memoryImage: MemoryImage[Posts, PostEvent]) extends Controller { | |
/** | |
* The current blog posts from the memory image. | |
*/ | |
def posts(): Posts = memoryImage.get | |
/** | |
* Commits an event and applies it to the current state. If successful the | |
* provided callback `f` is applied to the `commit`. Otherwise a conflict | |
* result is returned. | |
*/ | |
private[this] def commit(expected: StreamRevision, event: PostEvent) | |
(f: Commit[PostEvent] => Result): Result = { | |
memoryImage.tryCommit(event.postId.toString, expected, event) match { | |
case Left(conflict) => Conflict(todo()) | |
case Right(commit) => f(commit) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def show(id: PostId) = Action { implicit request => | |
posts().get(id) match { | |
case Some(post) => Ok(views.html.posts.edit(post.id, post.revision, postContentForm.fill(post.content))) | |
case None => NotFound(notFound(request, None)) | |
} | |
} | |
def submit(id: PostId, expected: StreamRevision) = Action { implicit request => | |
postContentForm.bindFromRequest.fold( | |
formWithErrors => BadRequest(views.html.posts.edit(id, expected, formWithErrors)), | |
postContent => | |
commit(expected, PostEdited(id, postContent)) { commit => | |
Redirect(routes.PostsController.show(id)).flashing("info" -> "Post saved.") | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment