Last active
May 9, 2018 17:47
-
-
Save benwilson512/db861d118545abd8cd92c11bf3ccf382 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Raven.Util.Transaction do | |
use GenServer, restart: :temporary | |
@moduledoc """ | |
Claim a given transaction handle | |
Blocks if other callers have already claimed the handle. This will return with | |
`:ok` if the current process wins the claim, otherwise it will return `:already_taken` | |
### Example Usage | |
This is would be more ergonomic by doing it Repo.transaction still with an wrapping | |
function, but the split up explicit form we use here makes it easier to do | |
commanded stuff. | |
``` | |
email = "[email protected]" | |
lookup_fun = fn -> | |
User.find_by_email(email) | |
end | |
with :ok <- Transaction.claim(email, lookup_fun), | |
{:ok, user} <- create_user(email: email) do | |
:ok = Transaction.release(email) | |
{:ok, user} | |
else | |
error -> | |
:ok = Transaction.release(email) | |
error | |
end | |
``` | |
The idea here is that we ask for a claim with a specific handle, usually the handle | |
is the value we want to have unique access to. The lookup fun is run from within | |
the GenServer, and if it returns a truthy value then the value is already present, | |
and you get back `{:error, :already_taken}`. Running this from within the GenServer | |
avoids the usual race condition around this check. If the value is does not exist | |
then you successfully claim the handle and get back `:ok`. | |
With the claim in hand we go ahead doing whatever we want with the value. It is critical | |
that whatever action we take be strongly consistent with respect to the check done by | |
the lookup fun. After we have done what we want we release the transaction handle. This is where the lookup | |
function comes back in. The GenServer runs the lookup function again and if it returns a truthy | |
value then the transaction is considered to have been successful and any other | |
processes waiting on a claim will receive `{:error, :already_taken}`. | |
If the lookup function returns a falsey value then the next process gets a shot. | |
""" | |
def claim(handle, lookup_fun, timeout \\ 15_000) do | |
case DynamicSupervisor.start_child( | |
Raven.Util.Transactions, | |
{__MODULE__, handle} | |
) do | |
{:error, {:already_started, pid}} -> | |
GenServer.call(pid, {:claim, lookup_fun}, timeout) | |
{:ok, pid} -> | |
GenServer.call(pid, {:claim, lookup_fun}, timeout) | |
end | |
end | |
def release(handle, timeout \\ 5_000) do | |
GenServer.call({:global, handle}, :release, timeout) | |
end | |
defstruct [ | |
:current, | |
:lookup_fun, | |
claimants: [] | |
] | |
def start_link(handle) do | |
GenServer.start_link(__MODULE__, [], name: {:global, handle}) | |
end | |
def init(_) do | |
state = %__MODULE__{} | |
# if nobody actually makes a claim, then exit in 5 | |
Process.send_after(self(), :timeout, 5_000) | |
{:ok, state} | |
end | |
def handle_call({:claim, _fun}, {caller, _}, %{current: caller} = state) do | |
{:reply, :ok, state} | |
end | |
def handle_call({:claim, fun}, {caller, _}, %{current: nil} = state) do | |
if fun.() do | |
{:reply, {:error, :already_taken}, state} | |
else | |
Process.monitor(caller) | |
state = %{state | current: caller, lookup_fun: fun} | |
{:reply, :ok, state} | |
end | |
end | |
def handle_call({:claim, _fun}, {caller, _} = claim, state) do | |
Process.monitor(caller) | |
# ++ is not the most efficient but the number of callers will generally be low. | |
# consider fixing later. | |
state = %{state | claimants: state.claimants ++ [claim]} | |
{:noreply, state} | |
end | |
def handle_call(:release, {caller, _}, %{current: caller} = state) do | |
case next(state) do | |
{:done, state} -> {:reply, :ok, state, 5_000} | |
{:next, state} -> {:reply, :ok, state} | |
end | |
end | |
def handle_call(:release, _, state) do | |
{:reply, :ok, state} | |
end | |
def handle_info({:DOWN, _, :process, pid, _}, %{current: pid} = state) do | |
case next(state) do | |
{:done, state} -> {:noreply, state, 5_000} | |
{:next, state} -> {:noreply, state} | |
end | |
end | |
def handle_info(:timeout, %{current: nil} = state) do | |
{:stop, :normal, state} | |
end | |
def handle_info(:timeout, state) do | |
{:noreply, state} | |
end | |
def handle_info({:DOWN, _, :process, dead_pid, _}, state) do | |
claimants = for {pid, _} = claim <- state.claimants, pid != dead_pid, do: claim | |
{:noreply, %{state | claimants: claimants}} | |
end | |
defp next(state) do | |
if state.lookup_fun.() do | |
for claim <- state.claimants do | |
GenServer.reply(claim, {:error, :already_taken}) | |
end | |
state = %{state | current: nil, claimants: []} | |
{:done, state} | |
else | |
case next_claimant(%{state | current: nil}) do | |
{:none, state} -> | |
{:done, state} | |
{:ok, next, state} -> | |
GenServer.reply(next, :ok) | |
{:next, state} | |
end | |
end | |
end | |
defp next_claimant(%{claimants: []} = state) do | |
{:none, state} | |
end | |
defp next_claimant(%{claimants: [{pid, _} = next | rest]} = state) do | |
{:ok, next, %{state | current: pid, claimants: rest}} | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Raven.Ordering.Uniqueness do | |
@behaviour Commanded.Middleware | |
alias Commanded.Middleware.Pipeline | |
alias Raven.Ordering.Commands.{CreateStaff} | |
alias Raven.Util.Transaction | |
alias Raven.Ordering | |
def before_dispatch(%Pipeline{command: %CreateStaff{email: email}} = pipeline) do | |
transaction = Transaction.claim(email, fn -> | |
Ordering.get_by_email(Ordering.Staff, email) | |
end) | |
case transaction do | |
:ok -> | |
Pipeline.assign(pipeline, :transaction_handle, email) | |
{:error, :already_taken} -> | |
pipeline | |
|> Pipeline.respond({:error, email: :already_taken}) | |
|> Pipeline.halt | |
end | |
end | |
def before_dispatch(pipeline) do | |
pipeline | |
end | |
def after_dispatch(pipeline) do | |
if handle = pipeline.assigns[:transaction_handle] do | |
Transaction.release(handle) | |
end | |
pipeline | |
end | |
def after_failure(pipeline) do | |
if handle = pipeline.assigns[:transaction_handle] do | |
Transaction.release(handle) | |
end | |
pipeline | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Note the expectation for a dynamic supervisor on line 53, could be adjusted according to use case.