Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save jhosteny/305cd50acaaca0428dbcbfdb84564535 to your computer and use it in GitHub Desktop.
Save jhosteny/305cd50acaaca0428dbcbfdb84564535 to your computer and use it in GitHub Desktop.
Building a CQRS/ES web application in Elixir using Phoenix

Background

I've been interested in Command Query Responsibility Segregation and event sourcing since hearing Greg Young talk on the subject in early 2010. During the past seven years I've built an open-source Ruby CQRS library (rcqrs); worked professionally on .NET applications following the pattern; and more recently built an Event Store (eventstore) and CQRS library (commanded) in Elixir.

Building applications following domain-driven design and using CQRS feels really natural with the Elixir -- and Erlang -- actor model. An aggregate root fits well within an Elixir process, which are driven by immutable messages through their own message mailboxes, allowing them to run concurrently and in isolation.

The web application I built to implement these ideas in Elixir was Segment Challenge. This helped me develop a resilient and robust CQRS/ES application.


Article content


About Segment Challenge

Segment Challenge

Compete against your friends, teammates and fellow club riders — ride a different segment each month

If you're a keen cyclist or runner then you'll know about Strava. It's the social network for athletes, who record their rides and runs, and upload them to the site.

Strava users create segments from sections of their routes. As an example, a segment will cover a climb up a hill; starting at the bottom and finishing at the top. Each segment gets its own leaderboard. This displays ranked attempts by each athlete who has cycled or run through it. The fastest man is King of the Mountain (KOM), and fastest woman is Queen of the Mountain (QOM). Athletes can compare themselves against other Strava users who cycle or run along the same routes.

Segment Challenge allows an athlete to create a competition for a cycling club and its members. A different Strava segment is used each month. Points are accumulated based on each athlete's position at the end of the stage. The site uses Strava's API to fetch segment attempts by the club's members. It ranks their attempts and tallies their points at the end of each stage, replacing the tedium of manually tracking this information in a spreadsheet.

The site is entirely self-service. Any registered Strava user can create and host a challenge for a cycling club they are a member of. It was deployed at the end of 2016 and is now hosting active challenges for three local clubs. I'll be promoting to clubs to host their own challenges. It's a fantastic way for them to encourage their members to go out and ride throughout the year, helping to bring out their competitive spirit.

Command Query Responsibility Segregation (CQRS)

At it's simplest CQRS is the separation of commands from queries. Commands are used to mutate state in a write model. Queries are used to retrieve a value from a read model.

  • Commands are used to instruct an application to do something. They are named in the imperative: register account; transfer funds; mark fraudulent activity.

  • Domain events indicate something of importance has occurred within a domain model. They are named in the past tense: account registered; funds transferred; fraudulent activity detected.

The read and write models are different logical models. They may also be separated physically by using different database or storage mechanisms.

The read model is optimised for querying, using whatever technology is most appropriate: relational database; in-memory store; NoSQL database; full-text search index. Domain events from the write model are used to update the read model.

A specialised time series data storage is used for event sourcing the write model.

Event sourcing

Application state changes are modelled as domain events. They are persisted in order -- as a logical stream -- for each aggregate. An aggregate's current state is built by replaying its domain events.

A typical event sourcing example is an ecommerce shopping cart. In a CRUD system the cart's current state would be recorded. The cart contains two items. Using event sourcing, the events that took the cart from an empty state to its current are recorded. Item added to cart, item removed from cart, item added to cart.

The event stream is the canonical source of truth. It is a perfect audit log. All other state in the system may be rebuilt from these events. You can rebuild the read model by replaying every event from the beginning of time.

Benefits & costs of using CQRS

Domain events describe your system activity over time using a rich, domain-specific language. They are an immutable source of truth for the system. They support temporal queries.

A separate logical read model allows optimised and highly specialised query models to be built. You can scale the read and write models independently. The processing of commands and queries is asymmetrical. So you can dedicate the appropriate number of servers to each side as needed.

Events and their schema provide the ideal integration point for other systems. They allow migration of read-only data between persistence technologies by replaying and projecting all events.

Unfortunately events also provide a history of your poor design decisions. Events are immutable. It's an alternative -- and less common -- approach to building applications than basic CRUD. It demands a richer understanding of the domain being modelled. CQRS adds risky complexity. Eventual consistency.

Building a CQRS/ES application in Elixir

To build the application and website we require:

  • A domain model containing our aggregates, commands, and domain events.
  • Hosting of an aggregate root and a way to send it commands.
  • An event store to persist the domain events.
  • Read model store for querying.
  • Event handlers to build and update the read model.
  • A web front-end UI to display read model data and dispatch commands to the write model.

Segment Challenge uses the following libraries to help fulfil these requirements.

Write model event store

EventStore is an Elixir library using PostgreSQL as the underlying storage engine. It provides an API to append events to, and read events from, a logical event stream, and to subscribe to events.

Subscriptions to an individual stream, or all event streams, allows handlers to be notified of persisted events. A subscription will guarantee at least once delivery of every persisted event. Each subscription may be independently paused, then later resumed from where it stopped.

Aggregate host & command dispatch

Commanded provides the building blocks for creating CQRS applications in Elixir. It has support for command registration and dispatch; hosting and delegation to aggregate roots; event handling; and long running process managers.

Read model store

Ecto and a PostgreSQL database is used to build the read model for querying.

Web front-end

The Phoenix Framework is used for the web front-end. I've also implemented the forms using Elm. A functional language for building webapps that compiles to JavaScript.


Using an umbrella application

For Segment Challenge I created an Elixir umbrella application.

mix new segment_challenge --module SegmentChallenge --umbrella

It contains the following apps.

  • authorisation - Policies to authorise command dispatch.
  • challenges - Core domain model, command router, process managers, read model projections, queries, and periodic tasks.
  • commands - Modules (structs) for each command (e.g. SegmentChallenge.Commands.ApproveChallenge).
  • events - Modules (structs) for each domain event (e.g. SegmentChallenge.Events.ChallengeApproved).
  • infrastructure - Serialization and command middleware.
  • projections - Ecto repository and database migrations to build the read model database schema.
  • web - Phoenix web front-end containing a router, controllers, plugs, templates, views, and static assets.
cd segment_challenge
cd apps
mix new authorisation --module SegmentChallenge.Authorisation
mix new challenges --module SegmentChallenge.Challenges --sup
mix new commands --module SegmentChallenge.Commands
mix new events --module SegmentChallenge.Events
mix new infrastructure --module SegmentChallenge.Infrastructure
mix new projections --module SegmentChallenge.Projections --sup
mix phoenix.new web --app web --module SegmentChallenge.Web --no-brunch --no-ecto

The challenges app is an ideal candidate to be further split up by logical area.

  • Athletes
  • Challenges
  • Clubs
  • Leaderboards
  • Stages

An application's top-level structure should inform you of it's intent (e.g. challenges, leaderboards), not the delivery mechanism or technology it uses (e.g. projections, queries, tasks). Uncle Bob describes this ideal architecture in his "Architecture the Lost Years" talk.


Domain model

In domain-driven design, the domain model is a conceptual model of the core business domain in an application containing behaviour and data. It includes the aggregates, commands, domain events, and process managers that comprise the business logic.

An event sourced domain model

An aggregate root must conform to the following behaviour to implement the event sourcing pattern.

  • Each public function must accept a command and return any resultant domain events, or an error.
  • Its internal state may only be modified by applying a domain event to its current state.
  • Its internal state can be rebuilt from an initial empty state by replaying all domain events in the order they were created.

Using Elixir for the domain model

Building an event sourced aggregate in Elixir requires defining a module containing: its state; command functions; and state mutator functions. I use a struct for the aggregate root's state.

Aggregates can be defined without external dependencies. By using event sourcing they have no persistence concerns. Relationships between aggregate roots are by identity only. Orchestration of aggregate roots is handled by process managers, who respond to events from one aggregate root and dispatch commands to another.

Example: Challenge aggregate root

The challenge aggregate root is used by Segment Challenge to track each hosted challenge.

Here's a snippet of the Challenge module that implements the aggregate root. You can see how the public command functions (e.g. create_challenge/2) accept the challenge state (a %Challenge{} struct) and a command (e.g. %CreateChallenge{} struct). They return zero, one, or many domain events in response.

An aggregate root must protect itself against commands that would cause an invariant to be broken. As an example, attempting to start a challenge that has not been approved returns an error tagged tuple: {:error, :challenge_not_approved}. Pattern matching is used to validate the state of the aggregate. A finite state machine can be used to formalise the allowed state changes within an aggregate root.

defmodule SegmentChallenge.Challenges.Challenge do
  @moduledoc """
  Challenges are multi-stage competitions, hosted by a club.
  Athletes compete every month during the challenge to set the fastest time for the current stage.
  """

  defstruct [
    challenge_uuid: nil,
    name: nil,
    description: nil,
    start_date: nil,
    start_date_local: nil,
    challenge_state: nil,
    # ...
  ]

  alias SegmentChallenge.Commands.{
    CreateChallenge,
    IncludeCompetitorsInChallenge,
    HostChallenge,
    StartChallenge,
    EndChallenge,
  }

  alias SegmentChallenge.Events.{
    ChallengeCreated,
    CompetitorsJoinedChallenge,
    ChallengeHosted,
    ChallengeStarted,
    ChallengeEnded,
  }

  alias SegmentChallenge.Challenges.Challenge

  @doc """
  Create a new challenge
  """
  def create_challenge(challenge, create_challenge)

  def create_challenge(%Challenge{challenge_state: nil}, %CreateChallenge{} = create_challenge) do
    %ChallengeCreated{
      challenge_uuid: create_challenge.challenge_uuid,
      name: create_challenge.name,
      description: create_challenge.description,
      # ...
    }
  end

  def create_challenge(%Challenge{}, %CreateChallenge{}), do: {:error, :challenge_already_created}

  @doc """
  Start the challenge, making it active
  """
  def start_challenge(challenge, start_challenge)

  def start_challenge(%Challenge{challenge_uuid: challenge_uuid, challenge_state: :approved} = challenge, %StartChallenge{}) do
    %ChallengeStarted{
      challenge_uuid: challenge_uuid,
      start_date: challenge.start_date,
      start_date_local: challenge.start_date_local,
    }
  end

  def start_challenge(%Challenge{}, %StartChallenge{}), do: {:error, :challenge_not_approved}

  def apply(%Challenge{} = challenge, %ChallengeCreated{challenge_uuid: challenge_uuid, name: name, description: description}) do
    %Challenge{challenge |
      challenge_uuid: challenge_uuid,
      name: name,
      description: description,
      challenge_state: :created,
      # ...
    }
  end

  def apply(%Challenge{} = challenge, %ChallengeStarted{}) do
    %Challenge{challenge |
      challenge_state: :active,
    }
  end
end

Every domain event returned by the aggregate must have a corresponding apply/2 function. Its arguments are the aggregate root state and domain event. It is used to mutate the aggregate's state.

These functions are also used when the aggregate is rebuilt from an empty state by replaying its events. Apply functions must never fail. You cannot reject an event once it has occurred.

Commands & Events

In Segment Challenge I implemented each command and domain event in its own module, with the modules being located in the relevant commands or events app. You could combine the two into a single app (e.g. messages).

I use Elixir structs to define the fields, providing compile-time checks and default values.

Example: Start a challenge command

Starting a challenge is used to transition a hosted challenge into an active state. This command is dispatched from a periodic task, scheduled using Quantum. Each challenge begins at midnight. The scheduled task runs each day to locate challenges ready to start and dispatches this command for each challenge.

defmodule SegmentChallenge.Commands.StartChallenge do
  defstruct [
    :challenge_uuid,
  ]

  use Vex.Struct

  validates :challenge_uuid, uuid: true
end

Vex is used to provide basic command validation. These are simple presence, formatting, and data type validation rules. Business rule validation belongs within the aggregate.

Commands are validated before being passed to the aggregate root using a command validation middleware.

Example: Challenge started event

The challenge aggregate root returns a ChallengeStarted event in response to the start command when in a valid state to begin.

defmodule SegmentChallenge.Events.ChallengeStarted do
  defstruct [
    :challenge_uuid,
    :start_date,
    :start_date_local,
  ]
end

Events returned by an aggregate root are serialised to JSON using the Poison library and persisted to the Event Store. Persisted events are published out to interested subscribers: process managers; read model projections; and event handlers.

Process managers

A process manager is responsible for coordinating one or more aggregate roots. It handles events and may dispatch commands in response. Each process manager has state used to track which aggregate roots are being orchestrated. They are vital for inter-aggregate communication, coordination, and long-running business processes.

A saga can be implemented using a process manager as an alternative to using a distributed transaction for managing a long-running business process. Each step of the business process has a defined compensating action. When the business process encounters an error condition and is unable to continue, it can execute the compensating actions for the steps that have already completed. This undoes the work completed so far to maintain the consistency of the system.

Typically, you would use a process manager to route messages between aggregates within a bounded context. You would use a saga to manage a long-running business process that spans multiple bounded contexts.

Example: Include stages in a challenge

In Segment Challenge, a challenge and stage are separate aggregate roots. I use a process manager to allow a challenge to track which stages are included.

Commanded provides the building block for defining a process manager. It uses an interested?/1 routing function to indicate which events the process manager receives. The response is used to route the event to an existing process manager instance, or create a new one.

In this example the process manager is started by a ChallengeCreated event. It uses the challenge_uuid to identify an instance of the process.

When a StageCreated event is received, an IncludeStageInChallenge command is sent to the corresponding challenge aggregate root. All other events are ignored by this process manager.

defmodule SegmentChallenge.Challenges.ChallengeStageProcessManager do
  @moduledoc """
  Track the stages that comprise a challenge.
  """

  defstruct [
    challenge_uuid: nil,
    stages: [],
  ]

  defmodule Stage do
    defstruct [
      :stage_uuid,
      :stage_number,
      :name,
    ]
  end

  alias SegmentChallenge.Events.{
    ChallengeCreated,
    StageCreated,
  }
  alias SegmentChallenge.Commands.{IncludeStageInChallenge}
  alias SegmentChallenge.Challenges.ChallengeStageProcessManager
  alias ChallengeStageProcessManager.Stage

  def interested?(%ChallengeCreated{challenge_uuid: challenge_uuid}), do: {:start, challenge_uuid}
  def interested?(%StageCreated{challenge_uuid: challenge_uuid}), do: {:continue, challenge_uuid}
  def interested?(_event), do: false

  def handle(%ChallengeStageProcessManager{}, %ChallengeCreated{}), do: []

  def handle(%ChallengeStageProcessManager{challenge_uuid: challenge_uuid}, %StageCreated{stage_uuid: stage_uuid, stage_number: stage_number, name: name, start_date: start_date, start_date_local: start_date_local, end_date: end_date, end_date_local: end_date_local}) do
    %IncludeStageInChallenge{
      challenge_uuid: challenge_uuid,
      stage_uuid: stage_uuid,
      stage_number: stage_number,
      name: name,
      start_date: start_date,
      start_date_local: start_date_local,
      end_date: end_date,
      end_date_local: end_date_local,
    }
  end

  def apply(%ChallengeStageProcessManager{} = process_manager, %ChallengeCreated{challenge_uuid: challenge_uuid}) do
    %ChallengeStageProcessManager{process_manager |
      challenge_uuid: challenge_uuid
    }
  end

  def apply(%ChallengeStageProcessManager{stages: stages} = process_manager, %StageCreated{stage_uuid: stage_uuid, stage_number: stage_number, name: name}) do
    stage = %Stage{
      stage_uuid: stage_uuid,
      stage_number: stage_number,
      name: name
    }

    %ChallengeStageProcessManager{process_manager |
      stages: stages ++ [stage]
    }
  end
end

The state of a process manager instance is modified following the aggregate root approach. An apply/2 function must exist for each handled domain event. The process manager's state is mutated and returned.

This is a simple process manager example. Segment Challenge contains more complex process managers following the same principles. Examples include tracking club members to include them in challenges hosted by the club and any active stages and applying scores based on stage rank at the end of the stage to update the overall challenge leaderboards.

Supervision

Each process manager must be supervised to ensure it starts with the application and restarts on failure. Commanded provides the Commanded.ProcessManagers.ProcessRouter module to host a process manager.

defmodule SegmentChallenge.Challenges.Supervisor do
  use Supervisor

  alias SegmentChallenge.Challenges
  alias Challenges.{
    ChallengeStageProcessManager,
  }

  def start_link do
    Supervisor.start_link(__MODULE__, nil)
  end

  def init(_) do
    children = [
      # projections
      supervisor(SegmentChallenge.Challenges.Projections.Supervisor, []),

      # process managers
      worker(Commanded.ProcessManagers.ProcessRouter, ["ChallengeStageProcessManager", ChallengeStageProcessManager, Challenges.Router], id: :challenge_stage_process_manager),
    ]

    supervise(children, strategy: :one_for_one)
  end
end

Commanded takes care of creating a subscription to the event store for each process manager. It serialises a snapshot of an instance's state to the event store after every handled event to ensure its state can be rehydrated on restart.


Commands (write model)

Once the core domain model is built, it's time to provide an external interface to allow commands to be sent in.

The Commanded library provides aggregate root hosting, command registration, and dispatch. Every command has exactly one registered handler. It can be sent to a command handler module, or directly to the target aggregate root. Attempting to dispatch an unregistered command results in an exception.

A command router is used to configure which aggregate root or command handler responds to a command. This is analogous to routing in the Phoenix web framework. However, it sends commands to handlers rather than HTTP requests to controllers.

During command dispatch, an Elixir GenServer process is started to host the aggregate root instance. It will fetch the aggregate's event stream from the event store, and rebuild its state. Any returned domain events are appended to the event stream. The aggregate root host process remains alive, so subsequent commands routed to the same instance will not require rebuilding its state from the event store.

Command routing and dispatch

Segment Challenge defines a SegmentChallenge.Challenges.Router module that uses the Commanded.Commands.Router macro. Here I register every command available within the application.

Example: Router

The snippet below shows the commands relating to the challenge aggregate root.

defmodule SegmentChallenge.Challenges.Router do
  use Commanded.Commands.Router

  alias SegmentChallenge.Challenges.{Challenge,ChallengeCommandHandler}

  middleware Commanded.Middleware.Auditing
  middleware Commanded.Middleware.Logger
  middleware SegmentChallenge.Infrastructure.Validation.Middleware

  dispatch [
    SegmentChallenge.Commands.CreateChallenge,
    SegmentChallenge.Commands.IncludeCompetitorsInChallenge,
    SegmentChallenge.Commands.RemoveCompetitorFromChallenge,
    SegmentChallenge.Commands.ExcludeCompetitorFromChallenge,
    SegmentChallenge.Commands.HostChallenge,
    SegmentChallenge.Commands.StartChallenge,
    SegmentChallenge.Commands.EndChallenge,
    ], to: ChallengeCommandHandler, aggregate: Challenge, identity: :challenge_uuid
end

These commands are sent to the aggregate via the ChallengeCommandHandler module.

Middleware

The router allows configuration of middleware. All dispatched commands pass through the middleware chain, in the order defined. A middleware can choose to halt execution. This provides the integration point for cross-cutting concerns, including command auditing, logging, and validation. These are concerns applicable to all commands.

Command handler to aggregate root

A registered command handler module receives the target aggregate root state and the dispatched command. This allows additional processing to be done before delegating to the aggregate root.

Example: Challenge command handler

The example below shows how a unique URL slug is created, using the challenge name, and included in the command.

defmodule SegmentChallenge.Challenges.ChallengeCommandHandler do
  @behaviour Commanded.Commands.Handler

  alias SegmentChallenge.Commands.{
    CreateChallenge,
    StartChallenge,
  }
  alias SegmentChallenge.Challenges.Challenge
  alias SegmentChallenge.Challenges.Services.UrlSlugs.UniqueSlugger

  def handle(%Challenge{} = challenge, %CreateChallenge{challenge_uuid: challenge_uuid, name: name} = create_challenge) do
    # assign a unique URL slug from the challenge name
    {:ok, url_slug} = UniqueSlugger.slugify(Challenge.slug_source, challenge_uuid, name)

    challenge
    |> Challenge.create_challenge(%CreateChallenge{create_challenge | url_slug: url_slug})
  end

  def handle(%Challenge{} = challenge, %StartChallenge{} = start_challenge) do
    challenge
    |> Challenge.start_challenge(start_challenge)
  end
end

Example: Command dispatch

A command is dispatched using the configured router module.

alias SegmentChallenge.Challenges.Router
alias SegmentChallenge.Commands.StartChallenge

:ok = Router.dispatch(%StartChallenge{challenge_uuid: challenge_uuid})

Command dispatching web controller

In Segment Challenge, all commands are sent to a single Phoenix controller. The Phoenix web router is configured to accept JSON requests posted to /api/commands which are sent to controller API.CommandController.

defmodule SegmentChallenge.Web.Router do
  use SegmentChallenge.Web.Web, :router

  pipeline :api do
    plug :accepts, ["json"]
    plug :fetch_session
    plug :assign_current_user
  end

  scope "/api", SegmentChallenge.Web, as: :api do
    pipe_through :api

    post "/commands", API.CommandController, :dispatch
  end
end

Example: Phoenix command controller

The router contains a single public dispatch function which:

  • Builds and populates the command struct defined by the command parameter using the ExConstructor library.
  • Authorises the current user can dispatch the command using the Canada library.
  • Dispatches the command.
  • Returns an appropriate HTTP response code, depending upon the outcome of the command dispatch.
defmodule SegmentChallenge.Web.API.CommandController do
  use SegmentChallenge.Web.Web, :controller

  import Canada.Can, only: [can?: 3]

  alias SegmentChallenge.Authorisation.User
  alias SegmentChallenge.Challenges.Router
  alias SegmentChallenge.Web.CommandBuilder

  @empty_json_response "{}"

  @doc """
  Dispatch the command defined in the `params`
  """
  def dispatch(conn, %{"command" => command} = params) do
    command = CommandBuilder.build(command, conn, Map.delete(params, "command"))
    user = current_user(conn)

    if can?(user, :dispatch, command) do
      case Router.dispatch(command) do
        :ok ->
          conn
          |> send_resp(201, @empty_json_response)

        {:error, :validation_failure, errors} ->
          conn
          |> put_status(:unprocessable_entity)
          |> render(:dispatch, errors: errors)

        {:error, _reason} ->
          conn
          |> send_resp(400, @empty_json_response)
      end
    else
      conn
      |> send_resp(403, @empty_json_response)
    end
  end

  @doc """
  Attempted to dispatch missing command
  """
  def dispatch(conn, _params) do
    conn
    |> send_resp(400, @empty_json_response)
  end

  defp current_user(conn) do
    case current_athlete_uuid(conn) do
      nil -> nil
      athlete_uuid -> %User{athlete_uuid: athlete_uuid}
    end
  end
end

Command validation errors are returned to the client as JSON. The view CommandView deals with formatting the data. By default Phoenix uses Poison to serialize to JSON.

defmodule SegmentChallenge.Web.API.CommandView do
  use SegmentChallenge.Web.Web, :view

  def render("dispatch.json", %{errors: errors}) do
    Enum.map(errors, &to_json/1)
  end

  defp to_json({:error, field, _type, message}) do
    %{
      name: field,
      message: message,
    }
  end
end

Command validation

All dispatched commands are validated before being passed onto the target aggregate root. I use the following middleware which verifies the simple validation rules defined in a command using Vex.

defmodule SegmentChallenge.Infrastructure.Validation.Middleware do
  @behaviour Commanded.Middleware

  require Logger

  alias Commanded.Middleware.Pipeline
  import Pipeline

  def before_dispatch(%Pipeline{command: command} = pipeline) do
    case Vex.valid?(command) do
      true -> pipeline
      false -> failed_validation(pipeline)
    end
  end

  def after_dispatch(pipeline), do: pipeline
  def after_failure(pipeline), do: pipeline

  defp failed_validation(%Pipeline{command: command} = pipeline) do
    errors = Vex.errors(command)

    Logger.warn(fn -> "Command #{inspect command.__struct__} failed validation, errors: #{inspect errors}, command: #{inspect command}" end)

    pipeline
    |> respond({:error, :validation_failure, errors})
    |> halt
  end
end

Middleware is registered in the command routing module using the Commanded.Commands.Router macro.

defmodule SegmentChallenge.Challenges.Router do
  use Commanded.Commands.Router

  middleware SegmentChallenge.Infrastructure.Validation.Middleware
end

Validation failures are returned to the command dispatcher and may be shown to the end user. The example Phoenix command controller demonstrates how these errors are handled.

Command authorisation

For Segment Challenge, authentication is provided by Strava using the OAuth2 authentication protocol. The Strava library provides a strategy to generate the relevant Strava login URL and handle the authentication response.

Canada is used to implement authorisation and define permission rules.

An authenticated user is used to authorise command dispatch.

Example: Authorisation module

To use Canada, I implemented the Canada.Can protocol for each command dispatched from the web front-end. Any unconfigured commands were disallowed.

defimpl Canada.Can, for: SegmentChallenge.Authorisation.User do
  alias SegmentChallenge.Authorisation.User

  alias SegmentChallenge.Authorisation.Policies.{
    ChallengePolicy,
    StagePolicy,
  }

  alias SegmentChallenge.Commands.{
    CreateChallenge,
    CreateStage,
  }

  def can?(%User{} = user, :dispatch, %CreateChallenge{} = command), do: ChallengePolicy.can?(user, :dispatch, command)
  def can?(%User{} = user, :dispatch, %CreateStage{} = command), do: StagePolicy.can?(user, :dispatch, command)

  def can?(_user, _action, _command), do: false
end

The above Canada.Can protocol implementation simply delegates to the appropriate policy module (e.g. ChallengePolicy) containing the permission rules.

Elixir pattern matching provides a convenient way of defining rules. A can? function without matches provides the default disallow response.

Example: Challenge policy module

In the snippet below, the host challenge command uses the read model projection to enforce the rules. The user must be the original creator of the challenge and it must be in a pending state.

defmodule SegmentChallenge.Authorisation.Policies.ChallengePolicy do
  alias SegmentChallenge.Authorisation.User
  alias SegmentChallenge.Commands.{
    HostChallenge,
  }
  alias SegmentChallenge.Challenges.Projections.ChallengeProjection
  alias SegmentChallenge.Projections.Repo

  def can?(
    %User{athlete_uuid: athlete_uuid} = user,
    :dispatch,
    %HostChallenge{challenge_uuid: challenge_uuid, hosted_by_athlete_uuid: athlete_uuid} = command)
  do
    challenge = Repo.get(ChallengeProjection, challenge_uuid)

    can?(user, :dispatch, command, challenge)
  end

  def can?(_user, _action, _command), do: false

  def can?(
    %User{athlete_uuid: athlete_uuid},
    :dispatch,
    %HostChallenge{challenge_uuid: challenge_uuid, hosted_by_athlete_uuid: athlete_uuid},
    %ChallengeProjection{challenge_uuid: challenge_uuid, created_by_athlete_uuid: athlete_uuid, status: :pending}), do: true

  def can?(_user, _action, _command, _challenge), do: false
end

Querying (read model)

Reporting and querying the state of an application is handled by building a read model. I use the Ecto library and a dedicated read store PostgreSQL database in Segment Challenge.

Ecto provides a domain specific language for writing queries and interacting with databases in Elixir. It includes a mix command line tool to create database schema migrations and execute them. This is used to migrate the development and production databases.

The read model is optimised for querying. Data is duplicated and denormalised as required. Queries with table joins are infrequent.

Example: Ecto repo

The read store uses a single Ecto repository to execute queries against the database.

defmodule SegmentChallenge.Projections.Repo do
  use Ecto.Repo, otp_app: :projections
end

To create and migrate the database.

mix ecto.create -r SegmentChallenge.Projections.Repo
mix ecto.migrate -r SegmentChallenge.Projections.Repo

Projections

All read models are populated using projections.

A projection is an event handler that receives every persisted event from the event store. It executes queries against the database to add, update, and delete data. Event handlers run concurrently and are eventually consistent.

Example: Club projection

Each projection includes at least one Ecto schema definition and a Projector event handler module. The projector handles all events relevant to the read model it builds. In this example, this includes any event related to clubs.

defmodule SegmentChallenge.Challenges.Projections.Clubs do
  defmodule ClubProjection do
    use SegmentChallenge.Projections.Projection

    @primary_key {:club_uuid, :string, []}

    schema "clubs" do
      field :name, :string
      field :profile, :string

      timestamps
    end
  end

  alias SegmentChallenge.Challenges.Projections.Clubs.ClubProjection

  defmodule Projector do
    import Ecto.Query, only: [from: 2]

    alias SegmentChallenge.Events.{
      ClubImported,
    }

    @behaviour Commanded.Event.Handler
    @projection_name "club"

    def handle(
      %ClubImported{
        club_uuid: club_uuid,
        name: name,
        profile: profile,
      },
      %{event_id: event_id})
    do
      ClubProjection.update_projection(@projection_name, event_id, fn multi ->
        Ecto.Multi.insert(multi, :club, %ClubProjection{
          club_uuid: club_uuid,
          name: name,
          profile: profile,
        })
      end)
    end

    # ignore all other events
    def handle(_event, _metadata), do: :ok
  end
end

I use Ecto.Multi to execute queries to insert, update, and delete data.

The SegmentChallenge.Projections.Projection macro ensures each event is only processed once. Event handlers may receive an event more than once. Each projection records its last seen event within the same database transaction as the data manipulation. Already seen events can then be ignored; the transaction containing the duplicate change gets rolled back.

defmodule SegmentChallenge.Projections.Projection do
  @moduledoc false

  defmacro __using__(_) do
    quote do
      use Ecto.Schema

      import Ecto.Changeset
      import Ecto.Query

      alias SegmentChallenge.Projections.{Repo,Projection,ProjectionVersion}

      def update_projection(projection_name, event_id, multi_fn) do
        multi =
          Ecto.Multi.new
          |> Ecto.Multi.run(:verify_projection_version, fn _ ->
            version = case Repo.get(ProjectionVersion, projection_name) do
              nil -> Repo.insert!(%ProjectionVersion{projection_name: projection_name, last_seen_event_id: 0})
              version -> version
            end

            if version.last_seen_event_id < event_id do
              {:ok, %{version: version}}
            else
              {:error, :already_seen_event}
            end
          end)
          |> Ecto.Multi.update(:projection_version, ProjectionVersion.changeset(%ProjectionVersion{projection_name: projection_name}, %{last_seen_event_id: event_id}))

        multi = apply(multi_fn, [multi])

        case Repo.transaction(multi) do
          {:ok, _changes} -> :ok
          {:error, :verify_projection_version, :already_seen_event, _changes_so_far} -> :ok
          {:error, stage, reason, _changes_so_far} -> {:error, reason}
        end
      end
    end
  end
end

Example: Projection supervision

All read model projectors are supervised to ensure they start with the application and restart on failure.

defmodule SegmentChallenge.Challenges.Projections.Supervisor do
  use Supervisor

  alias Projections.Clubs

  def start_link do
    Supervisor.start_link(__MODULE__, nil)
  end

  def init(_) do
    children = [
      # projections
      worker(Commanded.Event.Handler, ["ClubProjection", Clubs.Projector], id: :club_projection),
    ]

    supervise(children, strategy: :one_for_one)
  end
end

Queries

The read model is optimised for the queries required by the application. The read model projections contain denormalised data so every query can be fulfilled using predicates on indexed columns and without complex joins. This provides performant read querying.

Example: Challenges created by athlete query

Here the Ecto.Query domain specific query language is used to build a query to retrieve the challenges created by a given athlete. This uses an index on the created_by_athlete_uuid column.

defmodule SegmentChallenge.Challenges.Queries.Challenges.ChallengesCreatedByAthleteQuery do
  import Ecto.Query, only: [from: 2]
  alias SegmentChallenge.Challenges.Projections.ChallengeProjection

  def new(athlete_uuid) do
  	from c in ChallengeProjection,
  	where: c.created_by_athlete_uuid == ^athlete_uuid,
    order_by: [desc: c.start_date_local]
  end
end

The query is constructed and executed by the Ecto Repo module.

challenges = ChallengesCreatedByAthleteQuery.new(current_athlete_uuid(conn)) |> Repo.all

Rebuild the read model

In event sourced systems the event stream is the canonical source of truth. This allows the read model to be entirely rebuilt, replaced, and significantly altered, assuming that the domain events contain the relevant information. You can repurpose the read model as future needs dictate.

To rebuild, you define the new schema, modify the affected projections, and replay all events from the beginning of time through the projectors. This approach allows you to migrate the data from one storage mechanism (e.g. a relational database) to another (e.g. document database).

A rebuild and data migration can be done while the application is online. You configure the new projection and it will replay from the first event. Once caught up, you switch to use the new read model for reads from the application.

Testing

Applications built using CQRS/ES are great for unit and integration testing.

Unit testing an aggregate root

Within the domain model, commands are the input and domain events are the output. Unit tests verify the expected events are produced.

I use ExMachina to define fixture data for tests in Segment Challenge.

defmodule SegmentChallenge.Factory do
  use ExMachina

  def challenge_factory do
    %{
      name: "Segment of the Month",
      start_date: ~N[2017-01-01 00:00:00],
      start_date_local: ~N[2017-01-01 00:00:00],
      end_date: ~N[2017-10-31 23:59:59],
      end_date_local: ~N[2017-10-31 23:59:59],
      hosted_by_club_uuid: "club-1234",
      hosted_by_club_name: "Example club",
      created_by_athlete_uuid: "athlete-5678",
      created_by_athlete_name: "Ben Smith",
      url_slug: "segment-of-the-month",
    }
  end
end

This is used by calling the build function included by importing the factory module into a test.

create_challenge = struct(CreateChallenge, build(:challenge, %{challenge_uuid: "1234"}))

Example: Challenge aggregate root unit tests

Two unit test examples are shown in this aggregate root test. The second test -- excluding a competitor -- shows how an aggregate root's state must be mutated by applying any returned events.

defmodule SegmentChallenge.Challenges.ChallengeTest do
  use ExUnit.Case

  import SegmentChallenge.Factory
  import SegmentChallenge.Aggregate, only: [evolve: 2]

  alias SegmentChallenge.Commands.{
    CreateChallenge,
    IncludeCompetitorsInChallenge,
    ExcludeCompetitorFromChallenge,
  }
  alias SegmentChallenge.Events.{
    ChallengeCreated,
    CompetitorsJoinedChallenge,
    CompetitorExcludedFromChallenge,    
  }
  alias SegmentChallenge.Challenges.Challenge

  defp create_challenge(challenge_uuid) do
    Challenge.create_challenge(%Challenge{}, struct(CreateChallenge, build(:challenge, %{challenge_uuid: challenge_uuid})))
  end

  @tag :unit
  test "create a challenge" do
    challenge_uuid = UUID.uuid4

    assert create_challenge(challenge_uuid) == struct(ChallengeCreated, build(:challenge, %{challenge_uuid: challenge_uuid}))
  end

  describe "exclude a competitor" do
    @tag :unit
    test "should remove competitor" do
      challenge_uuid = UUID.uuid4
      athlete_uuid = UUID.uuid4
      reason = "Not a paid club member"

      competitor_excluded =
        with challenge <- evolve(%Challenge{}, create_challenge(challenge_uuid)),
             challenge <- evolve(challenge, Challenge.include_competitors(challenge, %IncludeCompetitorsInChallenge{challenge_uuid: challenge_uuid, competitors: [ %IncludeCompetitorsInChallenge.Competitor{athlete_uuid: athlete_uuid} ]})),
        do: Challenge.exclude_competitor(challenge, %ExcludeCompetitorFromChallenge{challenge_uuid: challenge_uuid, athlete_uuid: athlete_uuid, reason: reason})

      assert competitor_excluded == %CompetitorExcludedFromChallenge{challenge_uuid: challenge_uuid, athlete_uuid: athlete_uuid, reason: reason}
    end
  end
end

I use Elixir's with keyword to chain the command functions. The evolve/2 function is a unit test helper that mutates the aggregate root state. By calling the apply/2 function for each of the given events. Starting from an empty state (e.g. %Challenge{}).

defmodule SegmentChallenge.Aggregate do
  def evolve(aggregate, events) do
    Enum.reduce(List.wrap(events), aggregate, &aggregate.__struct__.apply(&2, &1))
  end
end

I apply a :unit tag to all unit tests. This allows me to execute the very fast unit test suite on its own.

mix test --only unit

Integration testing the application

For integration tests I follow the same approach: use commands as input and verify the expected domain events are published.

In Segment Challenge I execute the full application during integration test, including the event store, aggregate root and process manager hosting, read store projections, and external HTTP requests. The event and read stores are both reset between each test.

Example: Host challenge integration test

Here's an example integration test to create a challenge. The command dispatch is hidden in the :create_challenge function call during setup, leaving the test to assert the expected domain event is received.

defmodule SegmentChallenge.Challenges.HostChallengeTest do
  use SegmentChallenge.StorageCase

  import Commanded.Assertions.EventAssertions
  import SegmentChallenge.UseCases.CreateChallengeUseCase, only: [create_challenge: 11]

  alias SegmentChallenge.Events.{
    ChallengeCreated,
    CompetitorsJoinedChallenge,
  }

  setup do
    HTTPoison.start
    :ok
  end

  describe "creating a challenge" do
    setup [:create_challenge]

    @tag :integration
    test "should create the challenge", context do
      assert_receive_event(ChallengeCreated, fn event ->
        assert event.challenge_uuid == context[:challenge_uuid]
        assert event.hosted_by_club_uuid == context[:club_uuid]
        assert event.url_slug == "segment-of-the-month"
      end)
    end

    @tag :integration
    test "should include competitors", context do
      assert_receive_event(CompetitorsJoinedChallenge, fn event ->
        assert event.challenge_uuid == context[:challenge_uuid]
        assert Enum.any?(event.competitors, fn competitor -> competitor.athlete_uuid == "athlete-5678" end)
      end)
    end
  end
end

The assert_receive_event function is provided by the Commanded.Assertions.EventAssertion module. It creates a new subscription to the event store. For each received event matching the given module (e.g. ChallengeCreated), it attempts to verify using the provided assertion function. It will wait until the expected event is received, within a limited timeout period then fail.

Example: Create challenge test use case

I've created test use case modules; reusable functions that cover an end-user scenario. The test above creates a challenge with the CreateChallengeUseCase module. ExUnit supports chaining function calls using setup inside a describe block. Each function receives a context map. It may append new values to it on return, allowing functions to build upon work done in those previous.

This example test use case makes an external HTTP call to the Strava API. I use ExVCR to record the initial response to disk, then replay the cached response for subsequent test runs. This guarantees my test works end-to-end, yet allows a short test feedback loop as the external request is only made when a cached request is not present.

defmodule SegmentChallenge.UseCases.CreateChallengeUseCase do
  use ExVCR.Mock, adapter: ExVCR.Adapter.Hackney

  import Commanded.Assertions.EventAssertions
  import SegmentChallenge.Factory

  alias SegmentChallenge.Events.{AthleteImported}
  alias SegmentChallenge.Commands.{
    ImportClub,
    ImportClubMembers,
    CreateChallenge,
  }
  alias SegmentChallenge.Events.{
    AthleteImported,
    ClubImported,
    CompetitorsJoinedChallenge,
  }
  alias SegmentChallenge.Challenges.Router

  def create_challenge(_context) do
    strava_club_id = 1234
    club_uuid = UUID.uuid4
    challenge_uuid = UUID.uuid4
    athlete_uuid = "athlete-5678"

    use_cassette "challenge/create_challenge##{strava_club_id}", match_requests_on: [:query] do
      :ok = Router.dispatch(%ImportClub{club_uuid: club_uuid, strava_id: strava_club_id, strava_access_token: strava_access_token})
      :ok = Router.dispatch(%ImportClubMembers{club_uuid: club_uuid, strava_id: strava_club_id, strava_access_token: strava_access_token})

      # wait for the athlete who will create and club who will host the challenge to be imported
      wait_for_event ClubImported, fn event -> event.club_uuid == club_uuid end
      wait_for_event AthleteImported, fn event -> event.athlete_uuid == athlete_uuid end

      :ok = Router.dispatch(struct(CreateChallenge, build(:challenge, %{challenge_uuid: challenge_uuid, hosted_by_club_uuid: club_uuid})))

      wait_for_event CompetitorsJoinedChallenge, fn event -> event.challenge_uuid == challenge_uuid end
    end

    [
      strava_club_id: strava_club_id,
      club_uuid: club_uuid,
      challenge_uuid: challenge_uuid,
      athlete_uuid: athlete_uuid,
    ]
  end
end

The keyword list returned by the function is merged into the context map. This is made available to the subsequent setup functions and the test. Allowing key-based access (e.g. context[:challenge_uuid]).

Example: Challenge projection integration test

Integration tests for read model projections follow a similar pattern. I reuse the use cases for succinct tests containing assertions.

defmodule SegmentChallenge.Projections.Challenges.ChallengeProjectionTest do
  use SegmentChallenge.StorageCase

  import SegmentChallenge.Factory
  import Commanded.Assertions.EventAssertions
  import SegmentChallenge.UseCases.CreateChallengeUseCase, only: [create_challenge: 1]

  alias SegmentChallenge.Wait
  alias SegmentChallenge.Projections.Repo
  alias SegmentChallenge.Challenges.Projections.ChallengeProjection

  setup do
    HTTPoison.start
    :ok
  end

  describe "creating a challenge" do
    setup [:create_challenge]

    @tag :integration
    @tag :projection
    test "should create challenge projection", context do
      Wait.until fn ->
        challenge = Repo.get(ChallengeProjection, context[:challenge_uuid])

        assert challenge != nil
        assert challenge.name: "Segment of the Month"
        assert challenge.start_date: ~N[2017-01-01 00:00:00]
      end
    end
  end
end

The read model is eventually consistent. So I use the following Wait helper module, to allow the projection to be built within a timeout period before the test fails.

defmodule SegmentChallenge.Wait do
  def until(fun), do: until(500, fun)

  def until(0, fun), do: fun.()

  def until(timeout, fun) do
    try do
      fun.()
    rescue
      ExUnit.AssertionError ->
        :timer.sleep(10)
        until(max(0, timeout - 10), fun)
    end
  end
end

Integration and projection tests are tagged with :integration and :projection. This allows me to execute these slower test suites on their own.

mix test --only integration
mix test --only projection

Deployment

Segment Challenge uses Distillery to create the Elixir release. Build and deployment to the production host is handled by edeliver.

Deployment and administration of a production CQRS/ES application deserves it's own full article. Subscribe to the mailing list below to be notified when new and relevant content is published.

Conclusion

Applying the Command Query Responsibility Segregation and event sourcing pattern to an Elixir and Phoenix web application is an unorthodox approach. I hope this case study has demonstrated why -- and briefly how -- you might do so.

The eventstore and commanded Elixir libraries provide the building blocks to help you. Event store relies upon PostgreSQL for its persistence. Commanded uses OTP behaviours and supervision to provide concurrency, reliability, and resiliency.

Please get in touch with feedback, ideas, requests for further articles, and criticism. Subscribe to the mailing list below if you'd like to stay informed.

@slashdotdash
Copy link

slashdotdash commented Jan 20, 2017

@jhosteny Finally applied your edits to the published article. Thanks again for taking the time to edit and improve.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment