I've been interested in Command Query Responsibility Segregation and event sourcing since hearing Greg Young talk on the subject in early 2010. During the past seven years I've built an open-source Ruby CQRS library (rcqrs); worked professionally on .NET applications following the pattern; and more recently built an Event Store (eventstore) and CQRS library (commanded) in Elixir.
Building applications following domain-driven design and using CQRS feels really natural with the Elixir -- and Erlang -- actor model. An aggregate root fits well within an Elixir process, which are driven by immutable messages through their own message mailboxes, allowing them to run concurrently and in isolation.
The web application I built to implement these ideas in Elixir was Segment Challenge. This helped me develop a resilient and robust CQRS/ES application.
- Background
- Command Query Responsibility Segregation (CQRS)
- Event sourcing
- Building a CQRS/ES application in Elixir
- Domain model
- Commands (write model)
- Querying (read model)
- Testing
- Deployment
- Conclusion
- Want to discover more CQRS/ES in Elixir?
Compete against your friends, teammates and fellow club riders — ride a different segment each month
If you're a keen cyclist or runner then you'll know about Strava. It's the social network for athletes, who record their rides and runs, and upload them to the site.
Strava users create segments from sections of their routes. As an example, a segment will cover a climb up a hill; starting at the bottom and finishing at the top. Each segment gets its own leaderboard. This displays ranked attempts by each athlete who has cycled or run through it. The fastest man is King of the Mountain (KOM), and fastest woman is Queen of the Mountain (QOM). Athletes can compare themselves against other Strava users who cycle or run along the same routes.
Segment Challenge allows an athlete to create a competition for a cycling club and its members. A different Strava segment is used each month. Points are accumulated based on each athlete's position at the end of the stage. The site uses Strava's API to fetch segment attempts by the club's members. It ranks their attempts and tallies their points at the end of each stage, replacing the tedium of manually tracking this information in a spreadsheet.
The site is entirely self-service. Any registered Strava user can create and host a challenge for a cycling club they are a member of. It was deployed at the end of 2016 and is now hosting active challenges for three local clubs. I'll be promoting to clubs to host their own challenges. It's a fantastic way for them to encourage their members to go out and ride throughout the year, helping to bring out their competitive spirit.
At it's simplest CQRS is the separation of commands from queries. Commands are used to mutate state in a write model. Queries are used to retrieve a value from a read model.
-
Commands are used to instruct an application to do something. They are named in the imperative: register account; transfer funds; mark fraudulent activity.
-
Domain events indicate something of importance has occurred within a domain model. They are named in the past tense: account registered; funds transferred; fraudulent activity detected.
The read and write models are different logical models. They may also be separated physically by using different database or storage mechanisms.
The read model is optimised for querying, using whatever technology is most appropriate: relational database; in-memory store; NoSQL database; full-text search index. Domain events from the write model are used to update the read model.
A specialised time series data storage is used for event sourcing the write model.
Application state changes are modelled as domain events. They are persisted in order -- as a logical stream -- for each aggregate. An aggregate's current state is built by replaying its domain events.
A typical event sourcing example is an ecommerce shopping cart. In a CRUD system the cart's current state would be recorded. The cart contains two items. Using event sourcing, the events that took the cart from an empty state to its current are recorded. Item added to cart, item removed from cart, item added to cart.
The event stream is the canonical source of truth. It is a perfect audit log. All other state in the system may be rebuilt from these events. You can rebuild the read model by replaying every event from the beginning of time.
Domain events describe your system activity over time using a rich, domain-specific language. They are an immutable source of truth for the system. They support temporal queries.
A separate logical read model allows optimised and highly specialised query models to be built. You can scale the read and write models independently. The processing of commands and queries is asymmetrical. So you can dedicate the appropriate number of servers to each side as needed.
Events and their schema provide the ideal integration point for other systems. They allow migration of read-only data between persistence technologies by replaying and projecting all events.
Unfortunately events also provide a history of your poor design decisions. Events are immutable. It's an alternative -- and less common -- approach to building applications than basic CRUD. It demands a richer understanding of the domain being modelled. CQRS adds risky complexity. Eventual consistency.
To build the application and website we require:
- A domain model containing our aggregates, commands, and domain events.
- Hosting of an aggregate root and a way to send it commands.
- An event store to persist the domain events.
- Read model store for querying.
- Event handlers to build and update the read model.
- A web front-end UI to display read model data and dispatch commands to the write model.
Segment Challenge uses the following libraries to help fulfil these requirements.
EventStore is an Elixir library using PostgreSQL as the underlying storage engine. It provides an API to append events to, and read events from, a logical event stream, and to subscribe to events.
Subscriptions to an individual stream, or all event streams, allows handlers to be notified of persisted events. A subscription will guarantee at least once delivery of every persisted event. Each subscription may be independently paused, then later resumed from where it stopped.
Commanded provides the building blocks for creating CQRS applications in Elixir. It has support for command registration and dispatch; hosting and delegation to aggregate roots; event handling; and long running process managers.
Ecto and a PostgreSQL database is used to build the read model for querying.
The Phoenix Framework is used for the web front-end. I've also implemented the forms using Elm. A functional language for building webapps that compiles to JavaScript.
For Segment Challenge I created an Elixir umbrella application.
mix new segment_challenge --module SegmentChallenge --umbrella
It contains the following apps.
authorisation
- Policies to authorise command dispatch.challenges
- Core domain model, command router, process managers, read model projections, queries, and periodic tasks.commands
- Modules (structs) for each command (e.g.SegmentChallenge.Commands.ApproveChallenge
).events
- Modules (structs) for each domain event (e.g.SegmentChallenge.Events.ChallengeApproved
).infrastructure
- Serialization and command middleware.projections
- Ecto repository and database migrations to build the read model database schema.web
- Phoenix web front-end containing a router, controllers, plugs, templates, views, and static assets.
cd segment_challenge
cd apps
mix new authorisation --module SegmentChallenge.Authorisation
mix new challenges --module SegmentChallenge.Challenges --sup
mix new commands --module SegmentChallenge.Commands
mix new events --module SegmentChallenge.Events
mix new infrastructure --module SegmentChallenge.Infrastructure
mix new projections --module SegmentChallenge.Projections --sup
mix phoenix.new web --app web --module SegmentChallenge.Web --no-brunch --no-ecto
The challenges
app is an ideal candidate to be further split up by logical area.
- Athletes
- Challenges
- Clubs
- Leaderboards
- Stages
An application's top-level structure should inform you of it's intent (e.g. challenges, leaderboards), not the delivery mechanism or technology it uses (e.g. projections, queries, tasks). Uncle Bob describes this ideal architecture in his "Architecture the Lost Years" talk.
In domain-driven design, the domain model is a conceptual model of the core business domain in an application containing behaviour and data. It includes the aggregates, commands, domain events, and process managers that comprise the business logic.
An aggregate root must conform to the following behaviour to implement the event sourcing pattern.
- Each public function must accept a command and return any resultant domain events, or an error.
- Its internal state may only be modified by applying a domain event to its current state.
- Its internal state can be rebuilt from an initial empty state by replaying all domain events in the order they were created.
Building an event sourced aggregate in Elixir requires defining a module containing: its state; command functions; and state mutator functions. I use a struct for the aggregate root's state.
Aggregates can be defined without external dependencies. By using event sourcing they have no persistence concerns. Relationships between aggregate roots are by identity only. Orchestration of aggregate roots is handled by process managers, who respond to events from one aggregate root and dispatch commands to another.
The challenge aggregate root is used by Segment Challenge to track each hosted challenge.
Here's a snippet of the Challenge
module that implements the aggregate root. You can see how the public command functions (e.g. create_challenge/2
) accept the challenge state (a %Challenge{}
struct) and a command (e.g. %CreateChallenge{}
struct). They return zero, one, or many domain events in response.
An aggregate root must protect itself against commands that would cause an invariant to be broken. As an example, attempting to start a challenge that has not been approved returns an error tagged tuple: {:error, :challenge_not_approved}
. Pattern matching is used to validate the state of the aggregate. A finite state machine can be used to formalise the allowed state changes within an aggregate root.
defmodule SegmentChallenge.Challenges.Challenge do
@moduledoc """
Challenges are multi-stage competitions, hosted by a club.
Athletes compete every month during the challenge to set the fastest time for the current stage.
"""
defstruct [
challenge_uuid: nil,
name: nil,
description: nil,
start_date: nil,
start_date_local: nil,
challenge_state: nil,
# ...
]
alias SegmentChallenge.Commands.{
CreateChallenge,
IncludeCompetitorsInChallenge,
HostChallenge,
StartChallenge,
EndChallenge,
}
alias SegmentChallenge.Events.{
ChallengeCreated,
CompetitorsJoinedChallenge,
ChallengeHosted,
ChallengeStarted,
ChallengeEnded,
}
alias SegmentChallenge.Challenges.Challenge
@doc """
Create a new challenge
"""
def create_challenge(challenge, create_challenge)
def create_challenge(%Challenge{challenge_state: nil}, %CreateChallenge{} = create_challenge) do
%ChallengeCreated{
challenge_uuid: create_challenge.challenge_uuid,
name: create_challenge.name,
description: create_challenge.description,
# ...
}
end
def create_challenge(%Challenge{}, %CreateChallenge{}), do: {:error, :challenge_already_created}
@doc """
Start the challenge, making it active
"""
def start_challenge(challenge, start_challenge)
def start_challenge(%Challenge{challenge_uuid: challenge_uuid, challenge_state: :approved} = challenge, %StartChallenge{}) do
%ChallengeStarted{
challenge_uuid: challenge_uuid,
start_date: challenge.start_date,
start_date_local: challenge.start_date_local,
}
end
def start_challenge(%Challenge{}, %StartChallenge{}), do: {:error, :challenge_not_approved}
def apply(%Challenge{} = challenge, %ChallengeCreated{challenge_uuid: challenge_uuid, name: name, description: description}) do
%Challenge{challenge |
challenge_uuid: challenge_uuid,
name: name,
description: description,
challenge_state: :created,
# ...
}
end
def apply(%Challenge{} = challenge, %ChallengeStarted{}) do
%Challenge{challenge |
challenge_state: :active,
}
end
end
Every domain event returned by the aggregate must have a corresponding apply/2
function. Its arguments are the aggregate root state and domain event. It is used to mutate the aggregate's state.
These functions are also used when the aggregate is rebuilt from an empty state by replaying its events. Apply functions must never fail. You cannot reject an event once it has occurred.
In Segment Challenge I implemented each command and domain event in its own module, with the modules being located in the relevant commands
or events
app. You could combine the two into a single app (e.g. messages
).
I use Elixir structs to define the fields, providing compile-time checks and default values.
Starting a challenge is used to transition a hosted challenge into an active state. This command is dispatched from a periodic task, scheduled using Quantum. Each challenge begins at midnight. The scheduled task runs each day to locate challenges ready to start and dispatches this command for each challenge.
defmodule SegmentChallenge.Commands.StartChallenge do
defstruct [
:challenge_uuid,
]
use Vex.Struct
validates :challenge_uuid, uuid: true
end
Vex is used to provide basic command validation. These are simple presence, formatting, and data type validation rules. Business rule validation belongs within the aggregate.
Commands are validated before being passed to the aggregate root using a command validation middleware.
The challenge aggregate root returns a ChallengeStarted
event in response to the start command when in a valid state to begin.
defmodule SegmentChallenge.Events.ChallengeStarted do
defstruct [
:challenge_uuid,
:start_date,
:start_date_local,
]
end
Events returned by an aggregate root are serialised to JSON using the Poison library and persisted to the Event Store. Persisted events are published out to interested subscribers: process managers; read model projections; and event handlers.
A process manager is responsible for coordinating one or more aggregate roots. It handles events and may dispatch commands in response. Each process manager has state used to track which aggregate roots are being orchestrated. They are vital for inter-aggregate communication, coordination, and long-running business processes.
A saga can be implemented using a process manager as an alternative to using a distributed transaction for managing a long-running business process. Each step of the business process has a defined compensating action. When the business process encounters an error condition and is unable to continue, it can execute the compensating actions for the steps that have already completed. This undoes the work completed so far to maintain the consistency of the system.
Typically, you would use a process manager to route messages between aggregates within a bounded context. You would use a saga to manage a long-running business process that spans multiple bounded contexts.
In Segment Challenge, a challenge and stage are separate aggregate roots. I use a process manager to allow a challenge to track which stages are included.
Commanded provides the building block for defining a process manager. It uses an interested?/1
routing function to indicate which events the process manager receives. The response is used to route the event to an existing process manager instance, or create a new one.
In this example the process manager is started by a ChallengeCreated
event. It uses the challenge_uuid
to identify an instance of the process.
When a StageCreated
event is received, an IncludeStageInChallenge
command is sent to the corresponding challenge aggregate root. All other events are ignored by this process manager.
defmodule SegmentChallenge.Challenges.ChallengeStageProcessManager do
@moduledoc """
Track the stages that comprise a challenge.
"""
defstruct [
challenge_uuid: nil,
stages: [],
]
defmodule Stage do
defstruct [
:stage_uuid,
:stage_number,
:name,
]
end
alias SegmentChallenge.Events.{
ChallengeCreated,
StageCreated,
}
alias SegmentChallenge.Commands.{IncludeStageInChallenge}
alias SegmentChallenge.Challenges.ChallengeStageProcessManager
alias ChallengeStageProcessManager.Stage
def interested?(%ChallengeCreated{challenge_uuid: challenge_uuid}), do: {:start, challenge_uuid}
def interested?(%StageCreated{challenge_uuid: challenge_uuid}), do: {:continue, challenge_uuid}
def interested?(_event), do: false
def handle(%ChallengeStageProcessManager{}, %ChallengeCreated{}), do: []
def handle(%ChallengeStageProcessManager{challenge_uuid: challenge_uuid}, %StageCreated{stage_uuid: stage_uuid, stage_number: stage_number, name: name, start_date: start_date, start_date_local: start_date_local, end_date: end_date, end_date_local: end_date_local}) do
%IncludeStageInChallenge{
challenge_uuid: challenge_uuid,
stage_uuid: stage_uuid,
stage_number: stage_number,
name: name,
start_date: start_date,
start_date_local: start_date_local,
end_date: end_date,
end_date_local: end_date_local,
}
end
def apply(%ChallengeStageProcessManager{} = process_manager, %ChallengeCreated{challenge_uuid: challenge_uuid}) do
%ChallengeStageProcessManager{process_manager |
challenge_uuid: challenge_uuid
}
end
def apply(%ChallengeStageProcessManager{stages: stages} = process_manager, %StageCreated{stage_uuid: stage_uuid, stage_number: stage_number, name: name}) do
stage = %Stage{
stage_uuid: stage_uuid,
stage_number: stage_number,
name: name
}
%ChallengeStageProcessManager{process_manager |
stages: stages ++ [stage]
}
end
end
The state of a process manager instance is modified following the aggregate root approach. An apply/2
function must exist for each handled domain event. The process manager's state is mutated and returned.
This is a simple process manager example. Segment Challenge contains more complex process managers following the same principles. Examples include tracking club members to include them in challenges hosted by the club and any active stages and applying scores based on stage rank at the end of the stage to update the overall challenge leaderboards.
Each process manager must be supervised to ensure it starts with the application and restarts on failure. Commanded provides the Commanded.ProcessManagers.ProcessRouter
module to host a process manager.
defmodule SegmentChallenge.Challenges.Supervisor do
use Supervisor
alias SegmentChallenge.Challenges
alias Challenges.{
ChallengeStageProcessManager,
}
def start_link do
Supervisor.start_link(__MODULE__, nil)
end
def init(_) do
children = [
# projections
supervisor(SegmentChallenge.Challenges.Projections.Supervisor, []),
# process managers
worker(Commanded.ProcessManagers.ProcessRouter, ["ChallengeStageProcessManager", ChallengeStageProcessManager, Challenges.Router], id: :challenge_stage_process_manager),
]
supervise(children, strategy: :one_for_one)
end
end
Commanded takes care of creating a subscription to the event store for each process manager. It serialises a snapshot of an instance's state to the event store after every handled event to ensure its state can be rehydrated on restart.
Once the core domain model is built, it's time to provide an external interface to allow commands to be sent in.
The Commanded library provides aggregate root hosting, command registration, and dispatch. Every command has exactly one registered handler. It can be sent to a command handler module, or directly to the target aggregate root. Attempting to dispatch an unregistered command results in an exception.
A command router is used to configure which aggregate root or command handler responds to a command. This is analogous to routing in the Phoenix web framework. However, it sends commands to handlers rather than HTTP requests to controllers.
During command dispatch, an Elixir GenServer process is started to host the aggregate root instance. It will fetch the aggregate's event stream from the event store, and rebuild its state. Any returned domain events are appended to the event stream. The aggregate root host process remains alive, so subsequent commands routed to the same instance will not require rebuilding its state from the event store.
Segment Challenge defines a SegmentChallenge.Challenges.Router
module that uses the Commanded.Commands.Router
macro. Here I register every command available within the application.
The snippet below shows the commands relating to the challenge aggregate root.
defmodule SegmentChallenge.Challenges.Router do
use Commanded.Commands.Router
alias SegmentChallenge.Challenges.{Challenge,ChallengeCommandHandler}
middleware Commanded.Middleware.Auditing
middleware Commanded.Middleware.Logger
middleware SegmentChallenge.Infrastructure.Validation.Middleware
dispatch [
SegmentChallenge.Commands.CreateChallenge,
SegmentChallenge.Commands.IncludeCompetitorsInChallenge,
SegmentChallenge.Commands.RemoveCompetitorFromChallenge,
SegmentChallenge.Commands.ExcludeCompetitorFromChallenge,
SegmentChallenge.Commands.HostChallenge,
SegmentChallenge.Commands.StartChallenge,
SegmentChallenge.Commands.EndChallenge,
], to: ChallengeCommandHandler, aggregate: Challenge, identity: :challenge_uuid
end
These commands are sent to the aggregate via the ChallengeCommandHandler
module.
The router allows configuration of middleware. All dispatched commands pass through the middleware chain, in the order defined. A middleware can choose to halt execution. This provides the integration point for cross-cutting concerns, including command auditing, logging, and validation. These are concerns applicable to all commands.
A registered command handler module receives the target aggregate root state and the dispatched command. This allows additional processing to be done before delegating to the aggregate root.
The example below shows how a unique URL slug is created, using the challenge name, and included in the command.
defmodule SegmentChallenge.Challenges.ChallengeCommandHandler do
@behaviour Commanded.Commands.Handler
alias SegmentChallenge.Commands.{
CreateChallenge,
StartChallenge,
}
alias SegmentChallenge.Challenges.Challenge
alias SegmentChallenge.Challenges.Services.UrlSlugs.UniqueSlugger
def handle(%Challenge{} = challenge, %CreateChallenge{challenge_uuid: challenge_uuid, name: name} = create_challenge) do
# assign a unique URL slug from the challenge name
{:ok, url_slug} = UniqueSlugger.slugify(Challenge.slug_source, challenge_uuid, name)
challenge
|> Challenge.create_challenge(%CreateChallenge{create_challenge | url_slug: url_slug})
end
def handle(%Challenge{} = challenge, %StartChallenge{} = start_challenge) do
challenge
|> Challenge.start_challenge(start_challenge)
end
end
A command is dispatched using the configured router module.
alias SegmentChallenge.Challenges.Router
alias SegmentChallenge.Commands.StartChallenge
:ok = Router.dispatch(%StartChallenge{challenge_uuid: challenge_uuid})
In Segment Challenge, all commands are sent to a single Phoenix controller. The Phoenix web router is configured to accept JSON requests posted to /api/commands
which are sent to controller API.CommandController
.
defmodule SegmentChallenge.Web.Router do
use SegmentChallenge.Web.Web, :router
pipeline :api do
plug :accepts, ["json"]
plug :fetch_session
plug :assign_current_user
end
scope "/api", SegmentChallenge.Web, as: :api do
pipe_through :api
post "/commands", API.CommandController, :dispatch
end
end
The router contains a single public dispatch
function which:
- Builds and populates the command struct defined by the
command
parameter using the ExConstructor library. - Authorises the current user can dispatch the command using the Canada library.
- Dispatches the command.
- Returns an appropriate HTTP response code, depending upon the outcome of the command dispatch.
defmodule SegmentChallenge.Web.API.CommandController do
use SegmentChallenge.Web.Web, :controller
import Canada.Can, only: [can?: 3]
alias SegmentChallenge.Authorisation.User
alias SegmentChallenge.Challenges.Router
alias SegmentChallenge.Web.CommandBuilder
@empty_json_response "{}"
@doc """
Dispatch the command defined in the `params`
"""
def dispatch(conn, %{"command" => command} = params) do
command = CommandBuilder.build(command, conn, Map.delete(params, "command"))
user = current_user(conn)
if can?(user, :dispatch, command) do
case Router.dispatch(command) do
:ok ->
conn
|> send_resp(201, @empty_json_response)
{:error, :validation_failure, errors} ->
conn
|> put_status(:unprocessable_entity)
|> render(:dispatch, errors: errors)
{:error, _reason} ->
conn
|> send_resp(400, @empty_json_response)
end
else
conn
|> send_resp(403, @empty_json_response)
end
end
@doc """
Attempted to dispatch missing command
"""
def dispatch(conn, _params) do
conn
|> send_resp(400, @empty_json_response)
end
defp current_user(conn) do
case current_athlete_uuid(conn) do
nil -> nil
athlete_uuid -> %User{athlete_uuid: athlete_uuid}
end
end
end
Command validation errors are returned to the client as JSON. The view CommandView
deals with formatting the data. By default Phoenix uses Poison to serialize to JSON.
defmodule SegmentChallenge.Web.API.CommandView do
use SegmentChallenge.Web.Web, :view
def render("dispatch.json", %{errors: errors}) do
Enum.map(errors, &to_json/1)
end
defp to_json({:error, field, _type, message}) do
%{
name: field,
message: message,
}
end
end
All dispatched commands are validated before being passed onto the target aggregate root. I use the following middleware which verifies the simple validation rules defined in a command using Vex.
defmodule SegmentChallenge.Infrastructure.Validation.Middleware do
@behaviour Commanded.Middleware
require Logger
alias Commanded.Middleware.Pipeline
import Pipeline
def before_dispatch(%Pipeline{command: command} = pipeline) do
case Vex.valid?(command) do
true -> pipeline
false -> failed_validation(pipeline)
end
end
def after_dispatch(pipeline), do: pipeline
def after_failure(pipeline), do: pipeline
defp failed_validation(%Pipeline{command: command} = pipeline) do
errors = Vex.errors(command)
Logger.warn(fn -> "Command #{inspect command.__struct__} failed validation, errors: #{inspect errors}, command: #{inspect command}" end)
pipeline
|> respond({:error, :validation_failure, errors})
|> halt
end
end
Middleware is registered in the command routing module using the Commanded.Commands.Router
macro.
defmodule SegmentChallenge.Challenges.Router do
use Commanded.Commands.Router
middleware SegmentChallenge.Infrastructure.Validation.Middleware
end
Validation failures are returned to the command dispatcher and may be shown to the end user. The example Phoenix command controller demonstrates how these errors are handled.
For Segment Challenge, authentication is provided by Strava using the OAuth2 authentication protocol. The Strava library provides a strategy to generate the relevant Strava login URL and handle the authentication response.
Canada is used to implement authorisation and define permission rules.
An authenticated user is used to authorise command dispatch.
To use Canada, I implemented the Canada.Can
protocol for each command dispatched from the web front-end. Any unconfigured commands were disallowed.
defimpl Canada.Can, for: SegmentChallenge.Authorisation.User do
alias SegmentChallenge.Authorisation.User
alias SegmentChallenge.Authorisation.Policies.{
ChallengePolicy,
StagePolicy,
}
alias SegmentChallenge.Commands.{
CreateChallenge,
CreateStage,
}
def can?(%User{} = user, :dispatch, %CreateChallenge{} = command), do: ChallengePolicy.can?(user, :dispatch, command)
def can?(%User{} = user, :dispatch, %CreateStage{} = command), do: StagePolicy.can?(user, :dispatch, command)
def can?(_user, _action, _command), do: false
end
The above Canada.Can
protocol implementation simply delegates to the appropriate policy module (e.g. ChallengePolicy
) containing the permission rules.
Elixir pattern matching provides a convenient way of defining rules. A can?
function without matches provides the default disallow response.
In the snippet below, the host challenge command uses the read model projection to enforce the rules. The user must be the original creator of the challenge and it must be in a pending state.
defmodule SegmentChallenge.Authorisation.Policies.ChallengePolicy do
alias SegmentChallenge.Authorisation.User
alias SegmentChallenge.Commands.{
HostChallenge,
}
alias SegmentChallenge.Challenges.Projections.ChallengeProjection
alias SegmentChallenge.Projections.Repo
def can?(
%User{athlete_uuid: athlete_uuid} = user,
:dispatch,
%HostChallenge{challenge_uuid: challenge_uuid, hosted_by_athlete_uuid: athlete_uuid} = command)
do
challenge = Repo.get(ChallengeProjection, challenge_uuid)
can?(user, :dispatch, command, challenge)
end
def can?(_user, _action, _command), do: false
def can?(
%User{athlete_uuid: athlete_uuid},
:dispatch,
%HostChallenge{challenge_uuid: challenge_uuid, hosted_by_athlete_uuid: athlete_uuid},
%ChallengeProjection{challenge_uuid: challenge_uuid, created_by_athlete_uuid: athlete_uuid, status: :pending}), do: true
def can?(_user, _action, _command, _challenge), do: false
end
Reporting and querying the state of an application is handled by building a read model. I use the Ecto library and a dedicated read store PostgreSQL database in Segment Challenge.
Ecto provides a domain specific language for writing queries and interacting with databases in Elixir. It includes a mix
command line tool to create database schema migrations and execute them. This is used to migrate the development and production databases.
The read model is optimised for querying. Data is duplicated and denormalised as required. Queries with table joins are infrequent.
The read store uses a single Ecto repository to execute queries against the database.
defmodule SegmentChallenge.Projections.Repo do
use Ecto.Repo, otp_app: :projections
end
To create and migrate the database.
mix ecto.create -r SegmentChallenge.Projections.Repo
mix ecto.migrate -r SegmentChallenge.Projections.Repo
All read models are populated using projections.
A projection is an event handler that receives every persisted event from the event store. It executes queries against the database to add, update, and delete data. Event handlers run concurrently and are eventually consistent.
Each projection includes at least one Ecto schema definition and a Projector
event handler module. The projector handles all events relevant to the read model it builds. In this example, this includes any event related to clubs.
defmodule SegmentChallenge.Challenges.Projections.Clubs do
defmodule ClubProjection do
use SegmentChallenge.Projections.Projection
@primary_key {:club_uuid, :string, []}
schema "clubs" do
field :name, :string
field :profile, :string
timestamps
end
end
alias SegmentChallenge.Challenges.Projections.Clubs.ClubProjection
defmodule Projector do
import Ecto.Query, only: [from: 2]
alias SegmentChallenge.Events.{
ClubImported,
}
@behaviour Commanded.Event.Handler
@projection_name "club"
def handle(
%ClubImported{
club_uuid: club_uuid,
name: name,
profile: profile,
},
%{event_id: event_id})
do
ClubProjection.update_projection(@projection_name, event_id, fn multi ->
Ecto.Multi.insert(multi, :club, %ClubProjection{
club_uuid: club_uuid,
name: name,
profile: profile,
})
end)
end
# ignore all other events
def handle(_event, _metadata), do: :ok
end
end
I use Ecto.Multi
to execute queries to insert, update, and delete data.
The SegmentChallenge.Projections.Projection
macro ensures each event is only processed once. Event handlers may receive an event more than once. Each projection records its last seen event within the same database transaction as the data manipulation. Already seen events can then be ignored; the transaction containing the duplicate change gets rolled back.
defmodule SegmentChallenge.Projections.Projection do
@moduledoc false
defmacro __using__(_) do
quote do
use Ecto.Schema
import Ecto.Changeset
import Ecto.Query
alias SegmentChallenge.Projections.{Repo,Projection,ProjectionVersion}
def update_projection(projection_name, event_id, multi_fn) do
multi =
Ecto.Multi.new
|> Ecto.Multi.run(:verify_projection_version, fn _ ->
version = case Repo.get(ProjectionVersion, projection_name) do
nil -> Repo.insert!(%ProjectionVersion{projection_name: projection_name, last_seen_event_id: 0})
version -> version
end
if version.last_seen_event_id < event_id do
{:ok, %{version: version}}
else
{:error, :already_seen_event}
end
end)
|> Ecto.Multi.update(:projection_version, ProjectionVersion.changeset(%ProjectionVersion{projection_name: projection_name}, %{last_seen_event_id: event_id}))
multi = apply(multi_fn, [multi])
case Repo.transaction(multi) do
{:ok, _changes} -> :ok
{:error, :verify_projection_version, :already_seen_event, _changes_so_far} -> :ok
{:error, stage, reason, _changes_so_far} -> {:error, reason}
end
end
end
end
end
All read model projectors are supervised to ensure they start with the application and restart on failure.
defmodule SegmentChallenge.Challenges.Projections.Supervisor do
use Supervisor
alias Projections.Clubs
def start_link do
Supervisor.start_link(__MODULE__, nil)
end
def init(_) do
children = [
# projections
worker(Commanded.Event.Handler, ["ClubProjection", Clubs.Projector], id: :club_projection),
]
supervise(children, strategy: :one_for_one)
end
end
The read model is optimised for the queries required by the application. The read model projections contain denormalised data so every query can be fulfilled using predicates on indexed columns and without complex joins. This provides performant read querying.
Here the Ecto.Query
domain specific query language is used to build a query to retrieve the challenges created by a given athlete. This uses an index on the created_by_athlete_uuid
column.
defmodule SegmentChallenge.Challenges.Queries.Challenges.ChallengesCreatedByAthleteQuery do
import Ecto.Query, only: [from: 2]
alias SegmentChallenge.Challenges.Projections.ChallengeProjection
def new(athlete_uuid) do
from c in ChallengeProjection,
where: c.created_by_athlete_uuid == ^athlete_uuid,
order_by: [desc: c.start_date_local]
end
end
The query is constructed and executed by the Ecto Repo
module.
challenges = ChallengesCreatedByAthleteQuery.new(current_athlete_uuid(conn)) |> Repo.all
In event sourced systems the event stream is the canonical source of truth. This allows the read model to be entirely rebuilt, replaced, and significantly altered, assuming that the domain events contain the relevant information. You can repurpose the read model as future needs dictate.
To rebuild, you define the new schema, modify the affected projections, and replay all events from the beginning of time through the projectors. This approach allows you to migrate the data from one storage mechanism (e.g. a relational database) to another (e.g. document database).
A rebuild and data migration can be done while the application is online. You configure the new projection and it will replay from the first event. Once caught up, you switch to use the new read model for reads from the application.
Applications built using CQRS/ES are great for unit and integration testing.
Within the domain model, commands are the input and domain events are the output. Unit tests verify the expected events are produced.
I use ExMachina to define fixture data for tests in Segment Challenge.
defmodule SegmentChallenge.Factory do
use ExMachina
def challenge_factory do
%{
name: "Segment of the Month",
start_date: ~N[2017-01-01 00:00:00],
start_date_local: ~N[2017-01-01 00:00:00],
end_date: ~N[2017-10-31 23:59:59],
end_date_local: ~N[2017-10-31 23:59:59],
hosted_by_club_uuid: "club-1234",
hosted_by_club_name: "Example club",
created_by_athlete_uuid: "athlete-5678",
created_by_athlete_name: "Ben Smith",
url_slug: "segment-of-the-month",
}
end
end
This is used by calling the build
function included by importing the factory module into a test.
create_challenge = struct(CreateChallenge, build(:challenge, %{challenge_uuid: "1234"}))
Two unit test examples are shown in this aggregate root test. The second test -- excluding a competitor -- shows how an aggregate root's state must be mutated by applying any returned events.
defmodule SegmentChallenge.Challenges.ChallengeTest do
use ExUnit.Case
import SegmentChallenge.Factory
import SegmentChallenge.Aggregate, only: [evolve: 2]
alias SegmentChallenge.Commands.{
CreateChallenge,
IncludeCompetitorsInChallenge,
ExcludeCompetitorFromChallenge,
}
alias SegmentChallenge.Events.{
ChallengeCreated,
CompetitorsJoinedChallenge,
CompetitorExcludedFromChallenge,
}
alias SegmentChallenge.Challenges.Challenge
defp create_challenge(challenge_uuid) do
Challenge.create_challenge(%Challenge{}, struct(CreateChallenge, build(:challenge, %{challenge_uuid: challenge_uuid})))
end
@tag :unit
test "create a challenge" do
challenge_uuid = UUID.uuid4
assert create_challenge(challenge_uuid) == struct(ChallengeCreated, build(:challenge, %{challenge_uuid: challenge_uuid}))
end
describe "exclude a competitor" do
@tag :unit
test "should remove competitor" do
challenge_uuid = UUID.uuid4
athlete_uuid = UUID.uuid4
reason = "Not a paid club member"
competitor_excluded =
with challenge <- evolve(%Challenge{}, create_challenge(challenge_uuid)),
challenge <- evolve(challenge, Challenge.include_competitors(challenge, %IncludeCompetitorsInChallenge{challenge_uuid: challenge_uuid, competitors: [ %IncludeCompetitorsInChallenge.Competitor{athlete_uuid: athlete_uuid} ]})),
do: Challenge.exclude_competitor(challenge, %ExcludeCompetitorFromChallenge{challenge_uuid: challenge_uuid, athlete_uuid: athlete_uuid, reason: reason})
assert competitor_excluded == %CompetitorExcludedFromChallenge{challenge_uuid: challenge_uuid, athlete_uuid: athlete_uuid, reason: reason}
end
end
end
I use Elixir's with
keyword to chain the command functions. The evolve/2
function is a unit test helper that mutates the aggregate root state. By calling the apply/2
function for each of the given events. Starting from an empty state (e.g. %Challenge{}
).
defmodule SegmentChallenge.Aggregate do
def evolve(aggregate, events) do
Enum.reduce(List.wrap(events), aggregate, &aggregate.__struct__.apply(&2, &1))
end
end
I apply a :unit
tag to all unit tests. This allows me to execute the very fast unit test suite on its own.
mix test --only unit
For integration tests I follow the same approach: use commands as input and verify the expected domain events are published.
In Segment Challenge I execute the full application during integration test, including the event store, aggregate root and process manager hosting, read store projections, and external HTTP requests. The event and read stores are both reset between each test.
Here's an example integration test to create a challenge. The command dispatch is hidden in the :create_challenge
function call during setup, leaving the test to assert the expected domain event is received.
defmodule SegmentChallenge.Challenges.HostChallengeTest do
use SegmentChallenge.StorageCase
import Commanded.Assertions.EventAssertions
import SegmentChallenge.UseCases.CreateChallengeUseCase, only: [create_challenge: 11]
alias SegmentChallenge.Events.{
ChallengeCreated,
CompetitorsJoinedChallenge,
}
setup do
HTTPoison.start
:ok
end
describe "creating a challenge" do
setup [:create_challenge]
@tag :integration
test "should create the challenge", context do
assert_receive_event(ChallengeCreated, fn event ->
assert event.challenge_uuid == context[:challenge_uuid]
assert event.hosted_by_club_uuid == context[:club_uuid]
assert event.url_slug == "segment-of-the-month"
end)
end
@tag :integration
test "should include competitors", context do
assert_receive_event(CompetitorsJoinedChallenge, fn event ->
assert event.challenge_uuid == context[:challenge_uuid]
assert Enum.any?(event.competitors, fn competitor -> competitor.athlete_uuid == "athlete-5678" end)
end)
end
end
end
The assert_receive_event
function is provided by the Commanded.Assertions.EventAssertion
module. It creates a new subscription to the event store. For each received event matching the given module (e.g. ChallengeCreated
), it attempts to verify using the provided assertion function. It will wait until the expected event is received, within a limited timeout period then fail.
I've created test use case modules; reusable functions that cover an end-user scenario. The test above creates a challenge with the CreateChallengeUseCase
module. ExUnit supports chaining function calls using setup
inside a describe block. Each function receives a context map. It may append new values to it on return, allowing functions to build upon work done in those previous.
This example test use case makes an external HTTP call to the Strava API. I use ExVCR to record the initial response to disk, then replay the cached response for subsequent test runs. This guarantees my test works end-to-end, yet allows a short test feedback loop as the external request is only made when a cached request is not present.
- Read more: HTTP unit tests using ExVCR
defmodule SegmentChallenge.UseCases.CreateChallengeUseCase do
use ExVCR.Mock, adapter: ExVCR.Adapter.Hackney
import Commanded.Assertions.EventAssertions
import SegmentChallenge.Factory
alias SegmentChallenge.Events.{AthleteImported}
alias SegmentChallenge.Commands.{
ImportClub,
ImportClubMembers,
CreateChallenge,
}
alias SegmentChallenge.Events.{
AthleteImported,
ClubImported,
CompetitorsJoinedChallenge,
}
alias SegmentChallenge.Challenges.Router
def create_challenge(_context) do
strava_club_id = 1234
club_uuid = UUID.uuid4
challenge_uuid = UUID.uuid4
athlete_uuid = "athlete-5678"
use_cassette "challenge/create_challenge##{strava_club_id}", match_requests_on: [:query] do
:ok = Router.dispatch(%ImportClub{club_uuid: club_uuid, strava_id: strava_club_id, strava_access_token: strava_access_token})
:ok = Router.dispatch(%ImportClubMembers{club_uuid: club_uuid, strava_id: strava_club_id, strava_access_token: strava_access_token})
# wait for the athlete who will create and club who will host the challenge to be imported
wait_for_event ClubImported, fn event -> event.club_uuid == club_uuid end
wait_for_event AthleteImported, fn event -> event.athlete_uuid == athlete_uuid end
:ok = Router.dispatch(struct(CreateChallenge, build(:challenge, %{challenge_uuid: challenge_uuid, hosted_by_club_uuid: club_uuid})))
wait_for_event CompetitorsJoinedChallenge, fn event -> event.challenge_uuid == challenge_uuid end
end
[
strava_club_id: strava_club_id,
club_uuid: club_uuid,
challenge_uuid: challenge_uuid,
athlete_uuid: athlete_uuid,
]
end
end
The keyword list returned by the function is merged into the context
map. This is made available to the subsequent setup functions and the test. Allowing key-based access (e.g. context[:challenge_uuid]
).
Integration tests for read model projections follow a similar pattern. I reuse the use cases for succinct tests containing assertions.
defmodule SegmentChallenge.Projections.Challenges.ChallengeProjectionTest do
use SegmentChallenge.StorageCase
import SegmentChallenge.Factory
import Commanded.Assertions.EventAssertions
import SegmentChallenge.UseCases.CreateChallengeUseCase, only: [create_challenge: 1]
alias SegmentChallenge.Wait
alias SegmentChallenge.Projections.Repo
alias SegmentChallenge.Challenges.Projections.ChallengeProjection
setup do
HTTPoison.start
:ok
end
describe "creating a challenge" do
setup [:create_challenge]
@tag :integration
@tag :projection
test "should create challenge projection", context do
Wait.until fn ->
challenge = Repo.get(ChallengeProjection, context[:challenge_uuid])
assert challenge != nil
assert challenge.name: "Segment of the Month"
assert challenge.start_date: ~N[2017-01-01 00:00:00]
end
end
end
end
The read model is eventually consistent. So I use the following Wait
helper module, to allow the projection to be built within a timeout period before the test fails.
defmodule SegmentChallenge.Wait do
def until(fun), do: until(500, fun)
def until(0, fun), do: fun.()
def until(timeout, fun) do
try do
fun.()
rescue
ExUnit.AssertionError ->
:timer.sleep(10)
until(max(0, timeout - 10), fun)
end
end
end
Integration and projection tests are tagged with :integration
and :projection
. This allows me to execute these slower test suites on their own.
mix test --only integration
mix test --only projection
Segment Challenge uses Distillery to create the Elixir release. Build and deployment to the production host is handled by edeliver.
Deployment and administration of a production CQRS/ES application deserves it's own full article. Subscribe to the mailing list below to be notified when new and relevant content is published.
Applying the Command Query Responsibility Segregation and event sourcing pattern to an Elixir and Phoenix web application is an unorthodox approach. I hope this case study has demonstrated why -- and briefly how -- you might do so.
The eventstore and commanded Elixir libraries provide the building blocks to help you. Event store relies upon PostgreSQL for its persistence. Commanded uses OTP behaviours and supervision to provide concurrency, reliability, and resiliency.
Please get in touch with feedback, ideas, requests for further articles, and criticism. Subscribe to the mailing list below if you'd like to stay informed.
@jhosteny Finally applied your edits to the published article. Thanks again for taking the time to edit and improve.