Skip to content

Instantly share code, notes, and snippets.

@rhomel
Last active September 18, 2020 10:31
Show Gist options
  • Save rhomel/6b1f26e27915ffd8a85d1b5c94a59b8c to your computer and use it in GitHub Desktop.
Save rhomel/6b1f26e27915ffd8a85d1b5c94a59b8c to your computer and use it in GitHub Desktop.
Event Sourcing prototype with Elixir
# Example Event Sourcing aggregate implementation in Elixir
#
# This is literally the first code that I wrote with Elixir.
# So this may not be optimal or idiomatic Elixir.
#
# Note that I heavily use Elixir's pattern matching features.
#
# Run with:
# elixir es.exs
#
# Or with iex REPL:
# iex es.exs
# # to hot reload the code in iex:
# c "es.exs"
# # ctrl+c twice to exit
#
# Or copy-paste to https://repl.it/languages/elixir to run in a web browser.
defmodule ExperimentConfig do
defstruct name: ""
end
defmodule Experiment do
defstruct config: %ExperimentConfig{}
end
defmodule Event do
defstruct name: "Unknown", properties: %{}
end
defmodule Command do
defstruct name: "Unknown", properties: %{}
end
defmodule Aggregate do
defstruct events: [], created: false, draft: nil, published: nil
defp log(command = %Command{}) do
IO.puts " Process: " <> command.name
end
defp log(event = %Event{}) do
IO.puts " Apply: " <> event.name
end
def append(aggregate = %Aggregate{}, event =%Event{}) do
log(event)
%Aggregate{aggregate | events: aggregate.events ++ [event]}
end
def new() do
%Aggregate{}
end
def new(aggregate = %Aggregate{}, []) do
aggregate
end
def new(aggregate = %Aggregate{}, events) when is_list(events) do
next = new(aggregate, hd events)
new(next, tl events)
end
def new(aggregate = %Aggregate{}, event = %Event{name: "CreateExperiment"}) do
%Aggregate{aggregate | created: true}
|> append(event)
end
def new(aggregate = %Aggregate{}, event = %Event{name: "CreateDraft", properties: %{draft: %Experiment{}}}) do
%Aggregate{aggregate | draft: event.properties.draft}
|> append(event)
end
def new(aggregate = %Aggregate{}, event = %Event{}) do
next = append(aggregate, event)
IO.puts " Warning: skipping unrecognized event: " <> event.name
next
end
def process(aggregate = %Aggregate{}, command = %Command{name: "CreateDraft"}) do
log(command)
events = [
%Event{name: "CreateExperiment"},
%Event{name: "CreateDraft", properties: command.properties}
]
case aggregate do
%Aggregate{created: false} ->
{:ok, new(aggregate, events), events}
%Aggregate{created: true, draft: nil} ->
{:ok, new(aggregate, events), events}
_ ->
{:error, aggregate, []}
# probably can replace the :error atom with a more useful error type
end
end
end
defmodule Test do
def create do
IO.puts "Test creating a draft from an empty aggregate"
draft = %Experiment{
config: %ExperimentConfig{
name: "NewExperiment"
}
}
command = %Command{
name: "CreateDraft",
properties: %{draft: draft}
}
a0 = Aggregate.new()
result = Aggregate.process(a0, command)
expected = {:ok,
%Aggregate{
created: true,
draft: %Experiment{config: %ExperimentConfig{name: "NewExperiment"}},
events: [
%Event{name: "CreateExperiment", properties: %{}},
%Event{
name: "CreateDraft",
properties: %{
draft: %Experiment{config: %ExperimentConfig{name: "NewExperiment"}}
}
}
],
published: nil
},
[
%Event{name: "CreateExperiment", properties: %{}},
%Event{
name: "CreateDraft",
properties: %{
draft: %Experiment{config: %ExperimentConfig{name: "NewExperiment"}}
}
}
]}
^expected = result
end
def new do
IO.puts "Test creating an aggregate from events"
events = [
%Event{ name: "CreateExperiment"},
%Event{ name: "Foo"}
]
a0 = Aggregate.new()
anext = Aggregate.new(a0, events)
expected = %Aggregate{
created: true,
draft: nil,
events: [
%Event{name: "CreateExperiment", properties: %{}},
%Event{name: "Foo", properties: %{}}
],
published: nil
}
^expected = anext
end
def update do
IO.puts "Test updating an aggregate's draft"
IO.puts " This is intentionally not implemented to demonstrate unexpected code failures."
draft = %Experiment{
config: %ExperimentConfig{
name: "UpdatedExperiment"
}
}
command = %Command{
name: "UpdateDraft",
properties: %{draft: draft}
}
a0 = Aggregate.new()
Aggregate.process(a0, command)
# no expectation set to show what happens when a process "crashes"
end
end
defmodule Tester do
def run(method) when is_function(method) do
tests = [
&Test.new/0,
&Test.update/0,
&Test.create/0,
]
method.(tests)
end
@doc """
Run all tests sequentially as separate Task processes.
Tasks that fail will not crash the caller.
"""
def supervised(tests) when is_list(tests) do
Enum.each(tests, fn test ->
{:ok, pid} = Task.Supervisor.start_link()
Task.Supervisor.async_nolink(pid, test)
|> Task.yield()
|> IO.inspect
IO.puts ""
end)
end
@doc """
Run all tests in the current process.
If one task fails the current process will stop.
"""
def unsupervised(tests) when is_list(tests) do
Enum.each(tests, fn test -> test.() end)
end
end
# Run tests in isolated processes
Tester.run(&Tester.supervised/1)
# Run tests in the current process
# (execution will stop when a failure is encountered)
# Tester.run(&Tester.unsupervised/1)
@ggarek
Copy link

ggarek commented Sep 18, 2020

Nice work!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment