Last active
September 18, 2020 10:31
-
-
Save rhomel/6b1f26e27915ffd8a85d1b5c94a59b8c to your computer and use it in GitHub Desktop.
Event Sourcing prototype with Elixir
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Example Event Sourcing aggregate implementation in Elixir | |
# | |
# This is literally the first code that I wrote with Elixir. | |
# So this may not be optimal or idiomatic Elixir. | |
# | |
# Note that I heavily use Elixir's pattern matching features. | |
# | |
# Run with: | |
# elixir es.exs | |
# | |
# Or with iex REPL: | |
# iex es.exs | |
# # to hot reload the code in iex: | |
# c "es.exs" | |
# # ctrl+c twice to exit | |
# | |
# Or copy-paste to https://repl.it/languages/elixir to run in a web browser. | |
defmodule ExperimentConfig do | |
defstruct name: "" | |
end | |
defmodule Experiment do | |
defstruct config: %ExperimentConfig{} | |
end | |
defmodule Event do | |
defstruct name: "Unknown", properties: %{} | |
end | |
defmodule Command do | |
defstruct name: "Unknown", properties: %{} | |
end | |
defmodule Aggregate do | |
defstruct events: [], created: false, draft: nil, published: nil | |
defp log(command = %Command{}) do | |
IO.puts " Process: " <> command.name | |
end | |
defp log(event = %Event{}) do | |
IO.puts " Apply: " <> event.name | |
end | |
def append(aggregate = %Aggregate{}, event =%Event{}) do | |
log(event) | |
%Aggregate{aggregate | events: aggregate.events ++ [event]} | |
end | |
def new() do | |
%Aggregate{} | |
end | |
def new(aggregate = %Aggregate{}, []) do | |
aggregate | |
end | |
def new(aggregate = %Aggregate{}, events) when is_list(events) do | |
next = new(aggregate, hd events) | |
new(next, tl events) | |
end | |
def new(aggregate = %Aggregate{}, event = %Event{name: "CreateExperiment"}) do | |
%Aggregate{aggregate | created: true} | |
|> append(event) | |
end | |
def new(aggregate = %Aggregate{}, event = %Event{name: "CreateDraft", properties: %{draft: %Experiment{}}}) do | |
%Aggregate{aggregate | draft: event.properties.draft} | |
|> append(event) | |
end | |
def new(aggregate = %Aggregate{}, event = %Event{}) do | |
next = append(aggregate, event) | |
IO.puts " Warning: skipping unrecognized event: " <> event.name | |
next | |
end | |
def process(aggregate = %Aggregate{}, command = %Command{name: "CreateDraft"}) do | |
log(command) | |
events = [ | |
%Event{name: "CreateExperiment"}, | |
%Event{name: "CreateDraft", properties: command.properties} | |
] | |
case aggregate do | |
%Aggregate{created: false} -> | |
{:ok, new(aggregate, events), events} | |
%Aggregate{created: true, draft: nil} -> | |
{:ok, new(aggregate, events), events} | |
_ -> | |
{:error, aggregate, []} | |
# probably can replace the :error atom with a more useful error type | |
end | |
end | |
end | |
defmodule Test do | |
def create do | |
IO.puts "Test creating a draft from an empty aggregate" | |
draft = %Experiment{ | |
config: %ExperimentConfig{ | |
name: "NewExperiment" | |
} | |
} | |
command = %Command{ | |
name: "CreateDraft", | |
properties: %{draft: draft} | |
} | |
a0 = Aggregate.new() | |
result = Aggregate.process(a0, command) | |
expected = {:ok, | |
%Aggregate{ | |
created: true, | |
draft: %Experiment{config: %ExperimentConfig{name: "NewExperiment"}}, | |
events: [ | |
%Event{name: "CreateExperiment", properties: %{}}, | |
%Event{ | |
name: "CreateDraft", | |
properties: %{ | |
draft: %Experiment{config: %ExperimentConfig{name: "NewExperiment"}} | |
} | |
} | |
], | |
published: nil | |
}, | |
[ | |
%Event{name: "CreateExperiment", properties: %{}}, | |
%Event{ | |
name: "CreateDraft", | |
properties: %{ | |
draft: %Experiment{config: %ExperimentConfig{name: "NewExperiment"}} | |
} | |
} | |
]} | |
^expected = result | |
end | |
def new do | |
IO.puts "Test creating an aggregate from events" | |
events = [ | |
%Event{ name: "CreateExperiment"}, | |
%Event{ name: "Foo"} | |
] | |
a0 = Aggregate.new() | |
anext = Aggregate.new(a0, events) | |
expected = %Aggregate{ | |
created: true, | |
draft: nil, | |
events: [ | |
%Event{name: "CreateExperiment", properties: %{}}, | |
%Event{name: "Foo", properties: %{}} | |
], | |
published: nil | |
} | |
^expected = anext | |
end | |
def update do | |
IO.puts "Test updating an aggregate's draft" | |
IO.puts " This is intentionally not implemented to demonstrate unexpected code failures." | |
draft = %Experiment{ | |
config: %ExperimentConfig{ | |
name: "UpdatedExperiment" | |
} | |
} | |
command = %Command{ | |
name: "UpdateDraft", | |
properties: %{draft: draft} | |
} | |
a0 = Aggregate.new() | |
Aggregate.process(a0, command) | |
# no expectation set to show what happens when a process "crashes" | |
end | |
end | |
defmodule Tester do | |
def run(method) when is_function(method) do | |
tests = [ | |
&Test.new/0, | |
&Test.update/0, | |
&Test.create/0, | |
] | |
method.(tests) | |
end | |
@doc """ | |
Run all tests sequentially as separate Task processes. | |
Tasks that fail will not crash the caller. | |
""" | |
def supervised(tests) when is_list(tests) do | |
Enum.each(tests, fn test -> | |
{:ok, pid} = Task.Supervisor.start_link() | |
Task.Supervisor.async_nolink(pid, test) | |
|> Task.yield() | |
|> IO.inspect | |
IO.puts "" | |
end) | |
end | |
@doc """ | |
Run all tests in the current process. | |
If one task fails the current process will stop. | |
""" | |
def unsupervised(tests) when is_list(tests) do | |
Enum.each(tests, fn test -> test.() end) | |
end | |
end | |
# Run tests in isolated processes | |
Tester.run(&Tester.supervised/1) | |
# Run tests in the current process | |
# (execution will stop when a failure is encountered) | |
# Tester.run(&Tester.unsupervised/1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Nice work!