Original Article: https://www.atlassian.com/blog/archives/replayable-transactions-event-sourcing-dynamodb
In the Engineering Services team at Atlassian, we’re busily building out a microservice-based architecture for our applications. This is a massive change for us, and it is imperative that our changes are ’safe’, i.e. we prove as much as possible that we cannot inadvertently destroy data, and we can recover from any data issues that we do encounter. This led us to implement an event sourcing model for our entity store in Scala, where we store full history of changes to entities so that we can recover to any point in time. We were able to build this on top of a highly available and scalable key-value store, Amazon’s DynamoDB, that has a basic API compared to a relational database.
We’re on a long journey re-architecting our applications to be a composition of stateless microservices; small pieces of functionality that we can compose together to create highly available and horizontally scalable applications. In terms of technology stack, we’re building our microservices in Scala using Functional Programming principles to reap the safety and productivity of immutability. Our microservices will be running in AWS infrastructure.
Rather than storing the view of an entity that can be inserted, updated or deleted, event sourcing involves storing the sequence of events that produce the current view of the entity. An event represents an operation or transformation from one view of the entity to another. Events are therefore tied to a starting view or snapshot, thus making any transformation idempotent. While events may just be ‘sets’ or ‘deltas’, the normal principle would be to relate events back to the business domain (though I won’t get into the whole domain-driven design argument).
To get to the current view of an entity, one would only need to read in all events for that particular entity and then replay the transformations in order.
The concept actually isn’t new; databases with their transaction logs have been doing it for years, and if you believe accountants they’ve been doing it even longer with double-entry bookkeeping. Now that space is (relatively) plentiful, we can afford to implement the concept at an entity level, and store events instead of just current snapshots of entities.
Event sourcing separates the storage of entities and querying of entities, and fits nicely into concepts like CQRS (command query responsibility segregation). This means that for a given store of events for an entity, any number of consumers can produce their own custom view of the entity most suitable for their application simply by providing their own way of interpreting the transformations upon replay (which is the reason by business-domain driven events are better than data-driven events).
In the simplest case, and what we’re using for our project, event sourcing gives us the ability to compute the view of an entity at any given point in time. This is great if we ever need to recover data in case an entity was ever inadvertently deleted or modified. Also, we get a full audit trail of changes to entities for free.
The code in this blog is in Scala, and is accessible at
https://bitbucket.org/sshek/scalasyd-scalaz-stream/. The interesting files are
EventSource.scala
and MappingEventSource.scala
in the eventsource
package.
Let’s consider a simple entity – a key-value mapping, with the value containing
most of an entity’s information. To start with, we have an EventSource[K, V]
trait that provides an interface to an event source for a K
key to V
value
mapping. An EventSource
has the following types:
Event
– wraps up an event with a uniqueEventId
identifier (theK
key, and aSequence
that is an incrementing number). An event is aTransform[V]
, which for our example just includes ‘inserting’ or ‘deleting’ a mapping but could also store some delta. Strictly speaking we could introduce aCommit
type that wraps up a series of events, but for our case a single event is sufficient.Snapshot
– represents a view of a valueV
at anEventId
.
Events are stored and retrieved from implementations of Storage[F]
:
trait Storage[F[_]] {
def M: Monad[F]
def C: Catchable[F]
/**
* Retrieve a stream of events from the underlying data store.
* This stream should take care of pagination and cleanup of
* any underlying resources (e.g. closing connections if required).
* @param key The key
* @return Stream of events.
*/
def get(key: K): Process[F, Event]
/**
* Save the given event.
* @param event The event to save.
* @return Either an EventSourceError or the event that was saved.
* Other non-specific errors should be available through the container F.
*/
def put(event: Event): F[EventSourceError \/ Event]
…
}
Essentially a Storage
implementation needs to provide a get
that returns a
(scalaz-stream
) ‘stream of Events
’ for a given key, and a put
for saving
an event, which may return an error (primarily when a concurrent save with the
same EventId
is attempted). Generation of Events
and EventId
s are handled
elsewhere (we’ll get to it in a minute). The F
type parameter is just some
container for wrapping operations on the underlying data store; it needs to be a
Monad
for sequencing, and Catchable
to deal with errors in underlying calls
to the data store. Typically F
would be a scalaz.concurrent.Task
which works
nicely with scalaz-stream
.
The really cool thing about Storage
is that a series of Events
for an entity
is just a scalaz-stream
, which provides a nice abstraction around streams from
any source, minimising in-memory buffering and lots of cool features and
examples that the scalaz-stream
team provide for creating and processing
streams. For example, the applyEvents
function in Storage
replays a stream
of events to generate a Snapshot
is simply:
def applyEvents(events: Process[F, Commit]): F[Snapshot] =
events.pipe {
process1.fold(Snapshot.zero)(Snapshot.update)
}.runLastOr(Snapshot.zero)(M, C)
Describing scalaz-stream
is worthy of multiple posts, but the crux of applyEvents
is:
process1.fold
produces a ‘single input transducer’. Basically when the transducer sees an input, it can produce zero or more outputs. In our case, our transducer takes an input ofEvent
, and produces exactly one output ofSnapshot
.fold
is what you would expect from a traversable fold; given a starting accumulator (in our caseSnapshot.zero
that returns an ‘empty’ entity), run the functionSnapshot.update
over each value in the stream, ‘adding’ each value to the accumulator.pipe
runs the stream of events through the transducer.runLastOr
‘runs’ the stream, basically turning the stream into anF
that can be ‘run’ to generate theSnapshot
resulting from accumulating all the events. We userunLastOr
to return the ‘empty’ snapshot in case there are no events. TheM
andC
references are just explicit references to theMonad
andCatchable
instances that the stream functions require.
The nice thing about applyCommits
is that it doesn’t care about how the stream
of events are obtained, just that it is a stream of events. Importantly, it
leaves implementation details like pagination to the implementor of the source
of events (i.e. Storage
). We’ll show an example of this when we get to the
Dynamo implementation of it.
To wrap up EventSource
, the consumer accesses it through an implementation of
API[F]
:
/**
* This is the main interface for consumers of the Event source.
* Implementation will contain logic to create a transform given a value to
* save. Upon construction of an API, a suitable Storage store needs to be
* provided.
*
* @tparam F Container type for API operations. It needs to be a Monad and
* a Catchable (e.g. scalaz Task)
*/
trait API[F[_]] {
def M: Monad[F]
def C: Catchable[F]
/**
* @return Underlying store of events
*/
def store: Storage[F]
/**
* Create a suitable transform from ‘old’ to ‘newValue’.
* @param old The old value
* @param newValue The new value
* @return The suitable transform.
*/
def createTransform(old: Option[V], newValue: Option[V]): Transform[V]
/**
* Return the current view of the data for key ‘key’
* @param key the key
* @return current view of the data
*/
def get(key: K): F[Option[V]] =
getAtUsing(key, _ => true)
/**
* Return the view of the data for the key ‘key’ at the specified
* sequence number.
* @param key the key
* @param seq the sequence number of the event at which we want
* the see the view of the data.
* @return view of the data at event with sequence ‘seq’
*/
def getAt(key: K, seq: Sequence): F[Option[V]] =
getAtUsing(key, { _.id.sequence.seq <= seq.seq })
/**
* Return the view of the data for the key ‘key’ at the specified timestamp.
* @param key The key
* @param time The timestamp at which we want to see the view of the data
* @return view of the data with events up to the given time stamp.
*/
def getAt(key: K, time: DateTime): F[Option[V]] = {
import com.github.nscala_time.time.Implicits._
getAtUsing(key, { _.time <= time })
}
/**
* Save the given value for the given key.
* @param key The key
* @param value The value to save
* @return The previous value if there was one.
*/
def save(key: K, value: Option[V]): F[Option[V]] =
saveUsing(store)(key, value)
…
}
Consumers of the API can save or get values of type V
for a given key K
.
They don’t have to handle anything to do with events (or scalaz-stream
for
that matter).
Implementations of API only need to provide createTransform
(a way of
generating an event or Transform
from an existing view of a value to the new
view that the consumer wants to publish to others i.e. save), and a Storage
implementation.
There are several ‘get’ functions provided: get
current value, getAt
event
Sequence, and getAt
a timestamp. These all reference getAtUsing
:
/**
* All a ‘get’ is doing is taking events up to a condition
* (e.g. sequence number or a date) and then applying
* them in order. This is quite trivial using something like Scalaz Stream.
* @param key The key for which to retrieve events.
* @param cond Conditional function for filtering events.
* @return View of the data obtained from applying all events in the stream
* up until the given condition is not met.
*/
private[store] def getAtUsing(key: K, cond: Event =>; Boolean): F[Option[V]] =
M.apply(store.applyEvents(store.get(key).takeWhile(cond)))
{ s: Snapshot => s.value }
A scalaz-stream Process
provides a takeWhile(predicate)
function that
produces another stream that takes elements from the stream while the predicate
is true. So in each of the getAt
cases, we can stop at either the appropriate
sequence or timestamp for an event, and then pass it to Storage.applyEvents
to
get a Snapshot
.
No sign of any pagination logic, buffering, or caring about how the stream is stored or retrieved. How easy is that!
To save an event, we basically need to get the current view of the value, create
a suitable transform, create a new Event
with that transform and incremented
EventId
, and then save it provided no one else has come in in front of us
(i.e. conditional put). If there was a collision, then retrying should
eventually solve the problem:
/**
* To save a new value, we need to get the latest snapshot in order to get the
* existing view of data and the latest event Id. Then we create a suitable
* transform and event and try to save it. Upon duplicate events,
* try the operation again (highly unlikely that this situation would occur).
* @param key The key
* @param value The value to store
* @return The previous view of the data.
*/
private[store] def saveUsing(key: K, value: Option[V]): F[Option[V]] =
{
import scalaz.syntax.monad._
implicit def MonadF = M
for {
latestSnapshot newCommit = Event.next(key, latestSnapshot, createTransform(latestSnapshot.value, value))
putResult lastValue
case -\/(EventSourceError.DuplicateCommit) => save(key, value)
case \/-(c) => latestSnapshot.value.point[F]
}
} yield lastValue
There’s probably value in making the recursive call trampoline, but given its unlikely to happen more than once for a record it shouldn’t be a big issue.
The DynamoDB implementation can be found in MappingEventSource
, most of which
is DynamoDB-specific rigamarole to get mappings between the Scala data types to
DynamoDB columns. The really nice function is the get that handles pagination,
shown below using Task as the concrete implementation of F just to highlight the
important parts:
/**
* To return a stream of events from Dynamo, we first need to execute a query,
* then emit results, and then optionally recursively execute the next query
* until there is nothing more to query.
*
* @param key The key
* @return Stream of events.
*/
override def get(key: MappedKey): Process[Task, Event] = {
val query = Query.forHash(key)
import Process._
def requestPage(q: Query[Event]): Task[Page[Event]] = Task.suspend {
DynamoDB.query(q).run(client)
}
def loop(pt: Task[Page[Event]]): Process[Task, Event] =
await(pt) { page =>
emitAll(page.result) ++ {
page.next match {
case None => halt
case Some(nextQuery) => loop(requestPage(nextQuery))
}
}
}
loop(requestPage(query))
}
Normally when running a query against DynamoDB, you get a list of result records
(up to 1MB) and a lastEvaluatedKey
that you can use to re-run the query to get
the next batch. The result of a query is represented by Page
.
To generate a scalaz-stream
of Events
, we basically want to run a query,
emit all the Events
from the query result, and then when we get asked for
another Event
beyond the current page, run a new query to get the next page of
results.
Execution of the query is done by requestPage
(strictly speaking requestPage
provides a Task
that when run will execute the query).
The loop of running the query, emitting, waiting and then running the next query
is handled by the loop function. loop generates a scalaz-stream
‘in waiting’
(i.e. it doesn’t eagerly load anything from the data store). We use
scalaz-stream
’s Process.await
to create a stream given:
- A value generator (
Task[Page[Event]]
) – The generator is run when there is a request on the stream for some values, and there aren’t any existing values in memory. The generator then generates some interim value, in our case aPage[Event]
, or a page of events. - A value processor – Given the interim value, the value processor generates a
suitable stream of actual event values via the
Process.emitAll
function. After all the events have been emitted, we may need to prepare to get the next page of events from the data store. A page has an optional query (next) to run to get the next page, so if there is a next query to run, we can generate another ‘waiting’ stream by calling loop again otherwise ‘halt’ is returned to signify the end of the stream of events. Note that loop appears recursive, but actually isn’t because it returns immediately with a ‘waiting’ stream; the next call to loop is run by the function that processes the entire stream.
Lots of pagination code completely hidden away from the caller of get! Also, if
we did have to close down the database connection (DynamoDB doesn’t need this,
but other implementations may), we can pass through a suitable ‘cleanup’
function to Process.await
to be run whenever the stream is finished, either
because there are no more events to emit, or the caller of get no longer needs
the stream.
The nice thing about EventSource
is that it doesn’t require anything from the
underlying data store except for a conditional put or a way to deterministically
resolve conflicts for colliding writes. We can wrap up implementation details,
such as how gets and queries work with or without pagination, in the Storage
implementation. This means we can replace our DynamoDB implementation with
another store such as Cassandra or Riak.
NB If we were to use something like Cassandra or Riak, we perhaps could optimise
EventSource.API.saveUsing
to make use of CRDT/CRDT-like functionality provided
by these stores, but that is for another day…
Reading in all events and replaying them for each read (and write) sounds slow, and it can be in general, especially if retrieving all events requires multiple round trips to the data store.
For our particular use case, we’re not expecting a huge number of updates and with DynamoDB we can retrieve a bunch of events with a single query, so we’re very likely to need just one round trip to calculate a snapshot. We still need a query before a write, so that does increase the required read capacity units and latency slightly, but since the operations on DynamoDB are from our microservice running in AWS the actual impact that we’ve measured is not too bad. It’s a price to pay for having a log of all entity changes.
In the general case, we may instead want to store snapshots in addition to events. Instead of having to retrieve all events for every entity read, we would retrieve the latest snapshot of the entity and the events after that snapshot, thus significantly reducing the number of data store reads required. What’s cool is that:
We already have a model of snapshots in our code; we just need to expose a way of saving/retrieving them much like events.
Snapshots can be written asynchronously (e.g. by a scheduled task), and hence do not need to impact normal read/write operations. We will still generate the correct view of an entity even with an old snapshot; it will just required reading more events from the data store.
We don’t even need to store more than 1 snapshot, meaning snapshot retrieval is just a simple get instead of a query.
Event sourcing, where all changes to entities are recorded instead of continually updating an entity, is a great model for storing data in today’s world of abundant storage. In our use case, it gives us the ability to replay transactions to recover state at a specific point in time, and for more sophisticated entities it would be possible to re-interpret events to ‘query’ data in ways beyond the original data model design. It also gives as history, or audit trail of changes, for free. In this post, we described how easy it is to implement an event sourcing system on top of a key-value store with a basic API like DynamoDB. This is yet another example of immutability in action and its benefits.
scalaz-stream
also gave us a great abstraction over accessing events from the
data store so that consumers don’t need to worry about overhead such as resource
management and pagination. We’re using scalaz-stream
in a few other places,
and while it is early days for the library, we’ve found it quite reliable and
pleasant to use. It is definitely something to keep an eye on.