Last active
June 5, 2023 12:46
-
-
Save slashdotdash/1287f0c0735b49cca8afcc1b67665b42 to your computer and use it in GitHub Desktop.
Building projections with Ecto using Commanded event handlers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Projections.Repo.Migrations.CreateProjectionVersions do | |
use Ecto.Migration | |
def change do | |
create table(:projection_versions, primary_key: false) do | |
add :projection_name, :text, primary_key: true | |
add :last_seen_event_id, :bigint | |
timestamps | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Projections.ExampleProjection do | |
use Projection | |
schema "examples" do | |
field :name, :string | |
timestamps | |
end | |
defmodule Projector do | |
@behaviour Commanded.Event.Handler | |
@projection_name "example" | |
def handle(%SomeEvent{name: name}, %{event_id: event_id}) do | |
ExampleProjection.update_projection(@projection_name, event_id, fn multi -> | |
Ecto.Multi.insert(multi, :example, %ExampleProjection{ | |
name: name | |
}) | |
end) | |
end | |
# ignore all other events | |
def handle(_event, _metadata), do: :ok | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Projections.Projection do | |
@moduledoc false | |
defmacro __using__(_) do | |
quote do | |
use Ecto.Schema | |
import Ecto.Changeset | |
import Ecto.Query | |
alias Projections.{Repo,Projection,ProjectionVersion} | |
def update_projection(projection_name, event_id, multi_fn) do | |
multi = | |
Ecto.Multi.new | |
|> Ecto.Multi.run(:verify_projection_version, fn _ -> | |
version = case Repo.get(ProjectionVersion, projection_name) do | |
nil -> Repo.insert!(%ProjectionVersion{projection_name: projection_name, last_seen_event_id: 0}) | |
version -> version | |
end | |
if version.last_seen_event_id < event_id do | |
{:ok, %{version: version}} | |
else | |
{:error, :already_seen_event} | |
end | |
end) | |
|> Ecto.Multi.update(:projection_version, ProjectionVersion.changeset(%ProjectionVersion{projection_name: projection_name}, %{last_seen_event_id: event_id})) | |
multi = apply(multi_fn, [multi]) | |
case Repo.transaction(multi) do | |
{:ok, _changes} -> :ok | |
{:error, :verify_projection_version, :already_seen_event, _changes_so_far} -> :ok | |
{:error, stage, reason, _changes_so_far} -> {:error, reason} | |
end | |
end | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Projections.ProjectionVersion do | |
use Ecto.Schema | |
import Ecto.Changeset | |
import Ecto.Query | |
@primary_key {:projection_name, :string, []} | |
schema "projection_versions" do | |
field :last_seen_event_id, :integer | |
timestamps | |
end | |
@required_fields ~w(last_seen_event_id) | |
def changeset(model, params \\ :empty) do | |
model | |
|> cast(params, @required_fields) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Projections.Repo do | |
use Ecto.Repo, otp_app: :example | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Projections.Supervisor do | |
use Supervisor | |
def start_link do | |
Supervisor.start_link(__MODULE__, nil) | |
end | |
def init(_) do | |
children = [ | |
worker(Projections.Repo, []), | |
# projections | |
worker(Commanded.Event.Handler, ["ExampleProjection", Projections.ExampleProjection.Projector], id: :example_projection) | |
] | |
supervise(children, strategy: :one_for_one) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment