Skip to content

Instantly share code, notes, and snippets.

@erikrozendaal
Created July 22, 2012 12:32
Show Gist options
  • Save erikrozendaal/3159521 to your computer and use it in GitHub Desktop.
Save erikrozendaal/3159521 to your computer and use it in GitHub Desktop.
Event sourcing example - part 2
/**
* The result of a commit attempt is either a `Conflict` or a successful `Commit`.
*/
type CommitResult[+Event] = Either[Conflict[Event], Commit[Event]]
/**
* A successful commit to `streamId`.
*/
case class Commit[+Event](
storeRevision: StoreRevision,
timestamp: Long,
streamId: String,
streamRevision: StreamRevision,
events: Seq[Event])
/**
* The conflict that occurred while trying to commit to `streamId`.
*/
case class Conflict[+Event](
streamId: String,
actual: StreamRevision,
expected: StreamRevision,
conflicting: Seq[Commit[Event]])
/**
* Publishes successful commits to subscribers.
*/
trait CommitPublisher[Event] {
/**
* Notifies `listener` of all commits that happened `since`. Notification happens asynchronously.
*/
def subscribe(since: StoreRevision)(listener: Commit[Event] => Unit): Subscription
}
/**
* A subscription that can be cancelled.
*/
trait Subscription {
def cancel(): Unit
}
/**
* Commits events to an event store.
*/
trait EventCommitter[Event] {
def tryCommit(streamId: String, expected: StreamRevision, event: Event): CommitResult[Event]
}
/**
* Reads commits from the event store.
*/
trait CommitReader[Event] {
def storeRevision: StoreRevision
def readCommits(since: StoreRevision, to: StoreRevision): Stream[Commit[Event]]
def streamRevision(streamId: String): StreamRevision
def readStream(streamId: String, since: StreamRevision, to: StreamRevision): Stream[Commit[Event]]
}
/**
* The event store API.
*/
trait EventStore[Event] {
def reader: CommitReader[Event]
def committer: EventCommitter[Event]
def publisher: CommitPublisher[Event]
def close(): Unit
}
class FakeEventStore[Event] extends EventStore[Event] {
// [...]
private[this] val commits = Ref(Vector.empty[Commit[Event]]).single
private[this] val streams = Ref(Map.empty[String, Vector[Commit[Event]]]).single
override object reader extends CommitReader[Event] {
override def storeRevision = StoreRevision(commits().size)
override def readCommits(since: StoreRevision, to: StoreRevision): Stream[Commit[Event]] = {
commits().slice(
(since.value min Int.MaxValue).toInt,
(to.value min Int.MaxValue).toInt).toStream
}
// [...]
}
// [...]
}
override def tryCommit(streamId: String, expected: StreamRevision, event: Event): CommitResult[Event] = {
require(Txn.findCurrent.isEmpty, "the fake event store cannot participate in an STM transaction, just like a real event store")
atomic { implicit txn =>
val actual = streamRevision(streamId)
if (expected < actual) {
val conflicting = readStream(streamId, since = expected)
Left(Conflict(streamId, actual, expected, conflicting))
} else if (expected > actual) {
throw new IllegalArgumentException("expected revision %d greater than actual revision %d" format (expected.value, actual.value))
} else {
val commit = Commit(storeRevision.next, DateTimeUtils.currentTimeMillis, streamId, actual.next, Seq(event))
commits.transform(_ :+ commit)
streams.transform(streams => streams.updated(streamId, streams.getOrElse(streamId, Vector.empty) :+ commit))
Right(commit)
}
}
}
private[this] val closed = Ref(false).single
private[this] val executor = Executors.newCachedThreadPool
override object publisher extends CommitPublisher[Event] {
override def subscribe(since: StoreRevision)(listener: Commit[Event] => Unit): Subscription = {
val cancelled = Ref(false).single
val last = Ref(since).single
executor.execute(new Runnable {
@tailrec override def run {
// Wait for new commits or subscription termination.
val pending = atomic { implicit txn =>
if (closed() || cancelled()) None else {
val pending = commits().drop(last().value.toInt)
if (pending.isEmpty) retry
else Some(pending)
}
}
pending match {
case None => // Stop.
case Some(commits) =>
// Notify listener and go back to run.
commits.foreach { commit =>
listener(commit)
last() = commit.storeRevision
}
run
}
}
})
// Return a subscription instance that can be used for cancellation.
new Subscription {
override def cancel() = cancelled.set(true)
override def toString = "Subscription(" + last() + ", " + cancelled() + ", " + FakeEventStore.this + ")"
}
}
}
/**
* A `MemoryImage` tracks an underlying event store and uses the provided
* `initialState` and `update` to project the committed events onto the
* current state.
*/
class MemoryImage[State, Event] private
(eventStore: EventStore[Event])
(initialState: State)
(update: (State, Commit[Event]) => State)
extends EventCommitter[Event] {
private[this] val state = Ref(initialState)
private[this] val revision = Ref(StoreRevision.Initial)
/**
* The current state of the memory image with at least all commits applied
* that have been committed to the underlying event store.
*/
def get: State = {
val minimum = eventStore.reader.storeRevision
atomic { implicit txn =>
if (revision() < minimum) retry
else state()
}
}
/**
* Commits an event to the underlying event store. The memory image will be
* updated if the commit succeeds.
*/
override def tryCommit(streamId: String, expected: StreamRevision, event: Event): CommitResult[Event] =
eventStore.committer.tryCommit(streamId, expected, event)
override def toString = "MemoryImage(%s, %s)".format(revision.single.get, eventStore)
// Subscribe to the underlying event store and apply every commit to the
// current state using the provided `update` function.
eventStore.publisher.subscribe(StoreRevision.Initial) { commit =>
atomic { implicit txn =>
require(revision().next == commit.storeRevision, "expected: " + revision().next + ", got " + commit.storeRevision)
state.transform(s => update(s, commit))
revision() = commit.storeRevision
}
}
}
class PostsController(memoryImage: MemoryImage[Posts, PostEvent]) extends Controller {
/**
* The current blog posts from the memory image.
*/
def posts(): Posts = memoryImage.get
/**
* Commits an event and applies it to the current state. If successful the
* provided callback `f` is applied to the `commit`. Otherwise a conflict
* result is returned.
*/
private[this] def commit(expected: StreamRevision, event: PostEvent)
(f: Commit[PostEvent] => Result): Result = {
memoryImage.tryCommit(event.postId.toString, expected, event) match {
case Left(conflict) => Conflict(todo())
case Right(commit) => f(commit)
}
}
def show(id: PostId) = Action { implicit request =>
posts().get(id) match {
case Some(post) => Ok(views.html.posts.edit(post.id, post.revision, postContentForm.fill(post.content)))
case None => NotFound(notFound(request, None))
}
}
def submit(id: PostId, expected: StreamRevision) = Action { implicit request =>
postContentForm.bindFromRequest.fold(
formWithErrors => BadRequest(views.html.posts.edit(id, expected, formWithErrors)),
postContent =>
commit(expected, PostEdited(id, postContent)) { commit =>
Redirect(routes.PostsController.show(id)).flashing("info" -> "Post saved.")
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment