Skip to content

Instantly share code, notes, and snippets.

@RayBenefield
Last active December 1, 2017 19:48
Show Gist options
  • Save RayBenefield/c07557877b930db053bbe627dc1a2517 to your computer and use it in GitHub Desktop.
Save RayBenefield/c07557877b930db053bbe627dc1a2517 to your computer and use it in GitHub Desktop.
Replayable ‘transactions’ with event sourcing on DynamoDB

Original Article: https://www.atlassian.com/blog/archives/replayable-transactions-event-sourcing-dynamodb

Replayable ‘transactions’ with event sourcing on DynamoDB

In the Engineering Services team at Atlassian, we’re busily building out a microservice-based architecture for our applications. This is a massive change for us, and it is imperative that our changes are ’safe’, i.e. we prove as much as possible that we cannot inadvertently destroy data, and we can recover from any data issues that we do encounter. This led us to implement an event sourcing model for our entity store in Scala, where we store full history of changes to entities so that we can recover to any point in time. We were able to build this on top of a highly available and scalable key-value store, Amazon’s DynamoDB, that has a basic API compared to a relational database.

Aside: What are we building?

We’re on a long journey re-architecting our applications to be a composition of stateless microservices; small pieces of functionality that we can compose together to create highly available and horizontally scalable applications. In terms of technology stack, we’re building our microservices in Scala using Functional Programming principles to reap the safety and productivity of immutability. Our microservices will be running in AWS infrastructure.

What is event sourcing?

Rather than storing the view of an entity that can be inserted, updated or deleted, event sourcing involves storing the sequence of events that produce the current view of the entity. An event represents an operation or transformation from one view of the entity to another. Events are therefore tied to a starting view or snapshot, thus making any transformation idempotent. While events may just be ‘sets’ or ‘deltas’, the normal principle would be to relate events back to the business domain (though I won’t get into the whole domain-driven design argument).

To get to the current view of an entity, one would only need to read in all events for that particular entity and then replay the transformations in order.

The concept actually isn’t new; databases with their transaction logs have been doing it for years, and if you believe accountants they’ve been doing it even longer with double-entry bookkeeping. Now that space is (relatively) plentiful, we can afford to implement the concept at an entity level, and store events instead of just current snapshots of entities.

Why is event sourcing a good idea?

Event sourcing separates the storage of entities and querying of entities, and fits nicely into concepts like CQRS (command query responsibility segregation). This means that for a given store of events for an entity, any number of consumers can produce their own custom view of the entity most suitable for their application simply by providing their own way of interpreting the transformations upon replay (which is the reason by business-domain driven events are better than data-driven events).

In the simplest case, and what we’re using for our project, event sourcing gives us the ability to compute the view of an entity at any given point in time. This is great if we ever need to recover data in case an entity was ever inadvertently deleted or modified. Also, we get a full audit trail of changes to entities for free.

Show me the code!

The code in this blog is in Scala, and is accessible at https://bitbucket.org/sshek/scalasyd-scalaz-stream/. The interesting files are EventSource.scala and MappingEventSource.scala in the eventsource package.

Let’s consider a simple entity – a key-value mapping, with the value containing most of an entity’s information. To start with, we have an EventSource[K, V] trait that provides an interface to an event source for a K key to V value mapping. An EventSource has the following types:

  • Event – wraps up an event with a unique EventId identifier (the K key, and a Sequence that is an incrementing number). An event is a Transform[V], which for our example just includes ‘inserting’ or ‘deleting’ a mapping but could also store some delta. Strictly speaking we could introduce a Commit type that wraps up a series of events, but for our case a single event is sufficient.
  • Snapshot – represents a view of a value V at an EventId.

Representing storage of Events

Events are stored and retrieved from implementations of Storage[F]:

trait Storage[F[_]] {
    def M: Monad[F]
    def C: Catchable[F]

    /**
     * Retrieve a stream of events from the underlying data store.
     * This stream should take care of pagination and cleanup of
     * any underlying resources (e.g. closing connections if required).
     * @param key The key
     * @return Stream of events.
     */
    def get(key: K): Process[F, Event]

    /**
     * Save the given event.
     * @param event The event to save.
     * @return Either an EventSourceError or the event that was saved.
     * Other non-specific errors should be available through the container F.
     */
    def put(event: Event): F[EventSourceError \/ Event]
…
}

Essentially a Storage implementation needs to provide a get that returns a (scalaz-stream) ‘stream of Events’ for a given key, and a put for saving an event, which may return an error (primarily when a concurrent save with the same EventId is attempted). Generation of Events and EventIds are handled elsewhere (we’ll get to it in a minute). The F type parameter is just some container for wrapping operations on the underlying data store; it needs to be a Monad for sequencing, and Catchable to deal with errors in underlying calls to the data store. Typically F would be a scalaz.concurrent.Task which works nicely with scalaz-stream.

scalaz-stream for the win!

The really cool thing about Storage is that a series of Events for an entity is just a scalaz-stream, which provides a nice abstraction around streams from any source, minimising in-memory buffering and lots of cool features and examples that the scalaz-stream team provide for creating and processing streams. For example, the applyEvents function in Storage replays a stream of events to generate a Snapshot is simply:

def applyEvents(events: Process[F, Commit]): F[Snapshot] =
    events.pipe {
        process1.fold(Snapshot.zero)(Snapshot.update)
    }.runLastOr(Snapshot.zero)(M, C)

Describing scalaz-stream is worthy of multiple posts, but the crux of applyEvents is:

  • process1.fold produces a ‘single input transducer’. Basically when the transducer sees an input, it can produce zero or more outputs. In our case, our transducer takes an input of Event, and produces exactly one output of Snapshot. fold is what you would expect from a traversable fold; given a starting accumulator (in our case Snapshot.zero that returns an ‘empty’ entity), run the function Snapshot.update over each value in the stream, ‘adding’ each value to the accumulator.
  • pipe runs the stream of events through the transducer.
  • runLastOr ‘runs’ the stream, basically turning the stream into an F that can be ‘run’ to generate the Snapshot resulting from accumulating all the events. We use runLastOr to return the ‘empty’ snapshot in case there are no events. The M and C references are just explicit references to the Monad and Catchable instances that the stream functions require.

The nice thing about applyCommits is that it doesn’t care about how the stream of events are obtained, just that it is a stream of events. Importantly, it leaves implementation details like pagination to the implementor of the source of events (i.e. Storage). We’ll show an example of this when we get to the Dynamo implementation of it.

The API for consumers

To wrap up EventSource, the consumer accesses it through an implementation of API[F]:

/**
 * This is the main interface for consumers of the Event source.
 * Implementation will contain logic to create a transform given a value to
 * save. Upon construction of an API, a suitable Storage store needs to be
 * provided.
 *
 * @tparam F Container type for API operations. It needs to be a Monad and
 * a Catchable (e.g. scalaz Task)
 */
trait API[F[_]] {
    def M: Monad[F]
    def C: Catchable[F]

    /**
     * @return Underlying store of events
     */
    def store: Storage[F]

    /**
     * Create a suitable transform from ‘old’ to ‘newValue’.
     * @param old The old value
     * @param newValue The new value
     * @return The suitable transform.
     */
    def createTransform(old: Option[V], newValue: Option[V]): Transform[V]

    /**
     * Return the current view of the data for key ‘key’
     * @param key the key
     * @return current view of the data
     */
    def get(key: K): F[Option[V]] =
        getAtUsing(key, _ => true)

    /**
     * Return the view of the data for the key ‘key’ at the specified
     * sequence number.
     * @param key the key
     * @param seq the sequence number of the event at which we want
     * the see the view of the data.
     * @return view of the data at event with sequence ‘seq’
     */
    def getAt(key: K, seq: Sequence): F[Option[V]] =
        getAtUsing(key, { _.id.sequence.seq <= seq.seq })

    /**
     * Return the view of the data for the key ‘key’ at the specified timestamp.
     * @param key The key
     * @param time The timestamp at which we want to see the view of the data
     * @return view of the data with events up to the given time stamp.
     */
    def getAt(key: K, time: DateTime): F[Option[V]] = {
        import com.github.nscala_time.time.Implicits._
        getAtUsing(key, { _.time <= time })
    }

    /**
     * Save the given value for the given key.
     * @param key The key
     * @param value The value to save
     * @return The previous value if there was one.
     */
    def save(key: K, value: Option[V]): F[Option[V]] =
        saveUsing(store)(key, value)
…
}

Consumers of the API can save or get values of type V for a given key K. They don’t have to handle anything to do with events (or scalaz-stream for that matter).

Implementations of API only need to provide createTransform (a way of generating an event or Transform from an existing view of a value to the new view that the consumer wants to publish to others i.e. save), and a Storage implementation.

Replaying events to get values – scalaz-stream for the win again!

There are several ‘get’ functions provided: get current value, getAt event Sequence, and getAt a timestamp. These all reference getAtUsing:

/**
 * All a ‘get’ is doing is taking events up to a condition
 * (e.g. sequence number or a date) and then applying
 * them in order. This is quite trivial using something like Scalaz Stream.
 * @param key The key for which to retrieve events.
 * @param cond Conditional function for filtering events.
 * @return View of the data obtained from applying all events in the stream
 * up until the given condition is not met.
 */
private[store] def getAtUsing(key: K, cond: Event =>; Boolean): F[Option[V]] =
    M.apply(store.applyEvents(store.get(key).takeWhile(cond)))
    { s: Snapshot => s.value }

A scalaz-stream Process provides a takeWhile(predicate) function that produces another stream that takes elements from the stream while the predicate is true. So in each of the getAt cases, we can stop at either the appropriate sequence or timestamp for an event, and then pass it to Storage.applyEvents to get a Snapshot.

No sign of any pagination logic, buffering, or caring about how the stream is stored or retrieved. How easy is that!

Saving events

To save an event, we basically need to get the current view of the value, create a suitable transform, create a new Event with that transform and incremented EventId, and then save it provided no one else has come in in front of us (i.e. conditional put). If there was a collision, then retrying should eventually solve the problem:

/**
 * To save a new value, we need to get the latest snapshot in order to get the
 * existing view of data and the latest event Id. Then we create a suitable
 * transform and event and try to save it. Upon duplicate events,
 * try the operation again (highly unlikely that this situation would occur).
 * @param key The key
 * @param value The value to store
 * @return The previous view of the data.
 */
private[store] def saveUsing(key: K, value: Option[V]): F[Option[V]] =
    {
        import scalaz.syntax.monad._
        implicit def MonadF = M
        for {
            latestSnapshot newCommit = Event.next(key, latestSnapshot, createTransform(latestSnapshot.value, value))
            putResult lastValue
                case -\/(EventSourceError.DuplicateCommit) => save(key, value)
                case \/-(c) => latestSnapshot.value.point[F]
        }
    } yield lastValue

There’s probably value in making the recursive call trampoline, but given its unlikely to happen more than once for a record it shouldn’t be a big issue.

DynamoDB implementation – pagination, what pagination?

The DynamoDB implementation can be found in MappingEventSource, most of which is DynamoDB-specific rigamarole to get mappings between the Scala data types to DynamoDB columns. The really nice function is the get that handles pagination, shown below using Task as the concrete implementation of F just to highlight the important parts:

/**
 * To return a stream of events from Dynamo, we first need to execute a query,
 * then emit results, and then optionally recursively execute the next query
 * until there is nothing more to query.
 *
 * @param key The key
 * @return Stream of events.
 */
override def get(key: MappedKey): Process[Task, Event] = {
    val query = Query.forHash(key)
    import Process._

    def requestPage(q: Query[Event]): Task[Page[Event]] = Task.suspend {
        DynamoDB.query(q).run(client)
    }

    def loop(pt: Task[Page[Event]]): Process[Task, Event] =
        await(pt) { page =>
            emitAll(page.result) ++ {
                page.next match {
                    case None => halt
                    case Some(nextQuery) => loop(requestPage(nextQuery))
                }
            }
        }

    loop(requestPage(query))
}

Normally when running a query against DynamoDB, you get a list of result records (up to 1MB) and a lastEvaluatedKey that you can use to re-run the query to get the next batch. The result of a query is represented by Page.

To generate a scalaz-stream of Events, we basically want to run a query, emit all the Events from the query result, and then when we get asked for another Event beyond the current page, run a new query to get the next page of results.

Execution of the query is done by requestPage (strictly speaking requestPage provides a Task that when run will execute the query).

The loop of running the query, emitting, waiting and then running the next query is handled by the loop function. loop generates a scalaz-stream ‘in waiting’ (i.e. it doesn’t eagerly load anything from the data store). We use scalaz-stream’s Process.await to create a stream given:

  • A value generator (Task[Page[Event]]) – The generator is run when there is a request on the stream for some values, and there aren’t any existing values in memory. The generator then generates some interim value, in our case a Page[Event], or a page of events.
  • A value processor – Given the interim value, the value processor generates a suitable stream of actual event values via the Process.emitAll function. After all the events have been emitted, we may need to prepare to get the next page of events from the data store. A page has an optional query (next) to run to get the next page, so if there is a next query to run, we can generate another ‘waiting’ stream by calling loop again otherwise ‘halt’ is returned to signify the end of the stream of events. Note that loop appears recursive, but actually isn’t because it returns immediately with a ‘waiting’ stream; the next call to loop is run by the function that processes the entire stream.

Lots of pagination code completely hidden away from the caller of get! Also, if we did have to close down the database connection (DynamoDB doesn’t need this, but other implementations may), we can pass through a suitable ‘cleanup’ function to Process.await to be run whenever the stream is finished, either because there are no more events to emit, or the caller of get no longer needs the stream.

What about Cassandra? or Riak?

The nice thing about EventSource is that it doesn’t require anything from the underlying data store except for a conditional put or a way to deterministically resolve conflicts for colliding writes. We can wrap up implementation details, such as how gets and queries work with or without pagination, in the Storage implementation. This means we can replace our DynamoDB implementation with another store such as Cassandra or Riak.

NB If we were to use something like Cassandra or Riak, we perhaps could optimise EventSource.API.saveUsing to make use of CRDT/CRDT-like functionality provided by these stores, but that is for another day…

Performance considerations

Reading in all events and replaying them for each read (and write) sounds slow, and it can be in general, especially if retrieving all events requires multiple round trips to the data store.

For our particular use case, we’re not expecting a huge number of updates and with DynamoDB we can retrieve a bunch of events with a single query, so we’re very likely to need just one round trip to calculate a snapshot. We still need a query before a write, so that does increase the required read capacity units and latency slightly, but since the operations on DynamoDB are from our microservice running in AWS the actual impact that we’ve measured is not too bad. It’s a price to pay for having a log of all entity changes.

In the general case, we may instead want to store snapshots in addition to events. Instead of having to retrieve all events for every entity read, we would retrieve the latest snapshot of the entity and the events after that snapshot, thus significantly reducing the number of data store reads required. What’s cool is that:

We already have a model of snapshots in our code; we just need to expose a way of saving/retrieving them much like events.

Snapshots can be written asynchronously (e.g. by a scheduled task), and hence do not need to impact normal read/write operations. We will still generate the correct view of an entity even with an old snapshot; it will just required reading more events from the data store.

We don’t even need to store more than 1 snapshot, meaning snapshot retrieval is just a simple get instead of a query.

Summary

Event sourcing, where all changes to entities are recorded instead of continually updating an entity, is a great model for storing data in today’s world of abundant storage. In our use case, it gives us the ability to replay transactions to recover state at a specific point in time, and for more sophisticated entities it would be possible to re-interpret events to ‘query’ data in ways beyond the original data model design. It also gives as history, or audit trail of changes, for free. In this post, we described how easy it is to implement an event sourcing system on top of a key-value store with a basic API like DynamoDB. This is yet another example of immutability in action and its benefits.

scalaz-stream also gave us a great abstraction over accessing events from the data store so that consumers don’t need to worry about overhead such as resource management and pagination. We’re using scalaz-stream in a few other places, and while it is early days for the library, we’ve found it quite reliable and pleasant to use. It is definitely something to keep an eye on.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment