Skip to content

Instantly share code, notes, and snippets.

@shinnya
Created May 8, 2020 00:50
Show Gist options
  • Save shinnya/ccf085d9fae4d74f3a83281b0064d38e to your computer and use it in GitHub Desktop.
Save shinnya/ccf085d9fae4d74f3a83281b0064d38e to your computer and use it in GitHub Desktop.
Implementing the weakest failure detector for solving consensus Mikel Larrea, Antonio Fernández Anta, Sergio Arévalo
%% This module implements f-Resilient ♦S failure detector described in
%% "Implementing the weakest failure detector for solving consensus
%% Mikel Larrea, Antonio Fernández Anta, Sergio Arévalo"
-module(failure_detector).
-behaviour(gen_statem).
%% API
-export([start_link/3,
get_leader/0,
get_suspected/0
]).
%% gen_statem callbacks
-export([callback_mode/0, init/1, terminate/3, code_change/4]).
-export([handle_event/4]).
-define(SERVER, ?MODULE).
-type member() :: pid().
-type member_index() :: non_neg_integer().
-type option() :: #{tolerable_faults => non_neg_integer()}.
-record(data, {
me :: member(),
my_index :: member_index(),
trusting :: member_index(),
members :: [member()],
tolerable_faults :: non_neg_integer(),
trusting_timer :: timer:tref() | undefined,
heartbeat_timer :: timer:tref() | undefined,
counts :: #{member_index() => non_neg_integer()},
timeouts :: #{member_index() => non_neg_integer()}
}).
start_link(Me, Members, Option) ->
gen_statem:start_link({local, ?SERVER}, ?MODULE, [Me, Members, Option], []).
get_leader() ->
gen_statem:call(?SERVER, get_leader).
get_suspected() ->
gen_statem:call(?SERVER, get_suspected).
-spec callback_mode() -> gen_statem:callback_mode_result().
callback_mode() -> handle_event_function.
init([Me, Members, Option]) ->
process_flag(trap_exit, true),
Data = #data{ me = Me,
my_index = my_index(Me, Members),
members = Members,
tolerable_faults = maps:get(tolerable_faults, Option, 1),
heartbeat_timer = undefined,
trusting_timer = undefined,
timeouts = init_timeouts(Members, 100),
counts = init_counts(Members)
},
{NextState, NewData} = become_trusting(Data, 1),
{ok, NextState, NewData}.
%% Gets a trusted process
handle_event({call,From}, get_leader, State, Data) ->
Reply = get_trusted(Data),
{next_state, State, Data, [{ok,From,Reply}]};
%% Gets suspected processes
handle_event({call,From}, get_suspected, State, Data) ->
Reply = get_suspected(Data),
{next_state, State, Data, [{ok,From,Reply}]};
handle_event(info, heartbeat_timeout, State, Data) ->
broadcast_heartbeat(Data),
{next_state, State, Data};
handle_event(info, trusting_timeout, State, Data) ->
{NextState, NewData} = handle_trusting_timeout(State, Data),
{next_state, NextState, NewData};
handle_event(info, {heartbeat, Sender}, State, Data) ->
{NextState, NewData} = handle_heartbeat(State, Data, Sender),
{next_state, NextState, NewData};
handle_event(info, {new_leader, Sender, SenderCount}, State, Data) ->
{NextState, NewData} = handle_new_leader(State, Data, Sender, SenderCount),
{next_state, NextState, NewData};
handle_event({call,From}, _Msg, State, Data) ->
{next_state, State, Data, [{reply,From,ok}]}.
terminate(_Reason, _State, _Data) ->
void.
code_change(_OldVsn, State, Data, _Extra) ->
{ok, State, Data}.
become_trusted(Data) ->
cancel_timer(Data#data.trusting_timer),
cancel_timer(Data#data.heartbeat_timer),
{ok, HeartbeatTimer} = timer:send_interval(100, self(), heartbeat_timeout),
NewData = Data#data{ trusting = Data#data.my_index,
heartbeat_timer = HeartbeatTimer,
trusting_timer = undefined
},
broadcast_new_leader(NewData),
{trusted, NewData}.
become_trusting(Data) ->
NextTrusting = (Data#data.trusting rem length(Data#data.members)) + 1,
become_trusting(Data, NextTrusting).
become_trusting(Data, Trusting) when Data#data.my_index =:= Trusting ->
become_trusted(Data);
become_trusting(Data, Trusting) ->
cancel_timer(Data#data.trusting_timer),
cancel_timer(Data#data.heartbeat_timer),
Timeout = timeout_of(Trusting, Data),
{ok, TrustingTimer} = timer:send_after(Timeout, self(), trusting_timeout),
{trusting, Data#data{ trusting = Trusting,
trusting_timer = TrustingTimer,
heartbeat_timer = undefined
}}.
handle_trusting_timeout(State, Data) when Data#data.tolerable_faults + 2 < Data#data.my_index ->
{State, Data};
%% Task 2: for every process p_i,i=1...f+1
%% when (trusted_i < i) and (did not receive I-AM-THE-LEADER from P_trusted_i),
%% during the last timeout(i,trusted_i) time units
handle_trusting_timeout(trusting, Data) when Data#data.trusting < Data#data.my_index ->
become_trusting(Data);
handle_trusting_timeout(State, Data) -> {State, Data}.
handle_heartbeat(State, Data, _Sender) when Data#data.tolerable_faults + 2 < Data#data.my_index ->
{State, Data};
%% Task 3: for every process p_i,i=1...f+1
%% when (received I-AM-THE-LEADER from P_j) and (j < trusted_i)
handle_heartbeat(_State, Data, Sender) when Sender < Data#data.trusting ->
Timeout = maps:get(Sender, Data#data.timeouts),
NewTimeouts = maps:put(Sender, Timeout + 1, Data#data.timeouts),
become_trusting(Data#data{ timeouts = NewTimeouts }, Sender);
handle_heartbeat(State, Data, _Sender) -> {State, Data}.
%% Task 1: for every process p_i,i=f+2...n
handle_new_leader(_, Data, Sender, SenderCount) when Data#data.tolerable_faults + 2 < Data#data.my_index ->
case {Data#data.my_index, count_of(Data, Data#data.trusting)} < {Sender, SenderCount} of
false -> Data;
true ->
NewCounts = maps:put(Sender, Sender, Data#data.counts),
NewData = Data#data{ counts = NewCounts },
become_trusting(NewData)
end;
%% Task 4: for every process p_i,i=1...f+1
%% when (R-deliver(NEW-LEADER, count_j) from p_j) and ((count_i,i) < (count_j,j))
handle_new_leader(State, Data, Sender, SenderCount) ->
case {Data#data.my_index, count_of(Data, Data#data.my_index)} < {Sender, SenderCount} of
false ->
{State, Data};
true ->
Count = maps:get(Sender, Data#data.counts),
NewCounts = maps:put(Sender, Count + 1, Data#data.counts),
NewData = Data#data{ counts = NewCounts },
broadcast_new_leader(NewData),
{State, NewData}
end.
init_timeouts(Members, DefaultTimeout) ->
Indices = lists:seq(1, length(Members)),
lists:foldl(
fun(N, AccIn) -> maps:put(N, DefaultTimeout, AccIn) end,
#{},
Indices
).
init_counts(Members) ->
Indices = lists:seq(1, length(Members)),
lists:foldl(
fun(N, AccIn) -> maps:put(N, 0, AccIn) end,
#{},
Indices
).
timeout_of(Data, MemberIndex) ->
maps:get(MemberIndex, Data#data.timeouts).
count_of(Data, MemberIndex) ->
maps:get(MemberIndex, Data#data.counts).
broadcast_heartbeat(Data) when Data#data.tolerable_faults + 2 < Data#data.my_index -> ok;
%% Task 1: for every process p_i,i=1...f+1
%% repeat periodically
%% if trusted_i = i then send I-AM-THE-LEADER to p_i+1 ... p_f+1
broadcast_heartbeat(Data) when Data#data.trusting =:= Data#data.my_index ->
Members0 = lists:sublist(Data#data.members, Data#data.trusting + 1, Data#data.tolerable_faults),
Members1 = lists:delete(Data#data.me, Members0),
[ erlang:send(M, {heartbeat, Data#data.my_index}) || M <- Members1 ];
broadcast_heartbeat(_Data) -> ok.
broadcast_new_leader(Data) when Data#data.tolerable_faults + 2 < Data#data.my_index -> ok;
%% for every process p_i,i=1...f+1
%% if trusted_i = i then R-broadcast(NEW-LEADER, count_i)
broadcast_new_leader(Data) when Data#data.trusting =:= Data#data.my_index ->
Members = lists:delete(Data#data.me, Data#data.members),
MyCount = count_of(Data, #data.my_index),
[ erlang:send(M, {heartbeat, Data#data.my_index, MyCount}) || M <- Members ];
broadcast_new_leader(_Data) -> ok.
get_trusted(Data) ->
lists:nth(Data#data.trusting, Data#data.members).
%% Suspects all processes except the trusted process
get_suspected(Data) ->
Trusting = lists:nth(Data#data.trusting, Data#data.members),
lists:delete(Trusting, Data#data.members).
my_index(Me, Members) -> my_index(Me, Members, 1).
my_index(Me, Members, N) ->
case Me =:= lists:nth(N, Members) of
true -> N;
false -> my_index(Me, Members, N + 1)
end.
cancel_timer(undefined) -> ok;
cancel_timer(Timer) -> timer:cancel(Timer).
%% This module implements ♦P failure detector described in
%% "Implementing the weakest failure detector for solving consensus
%% Mikel Larrea, Antonio Fernández Anta, Sergio Arévalo"
-module(failure_detector).
-behaviour(gen_statem).
%% API
-export([start_link/3,
get_leader/0,
get_suspected/0
]).
%% gen_statem callbacks
-export([callback_mode/0, init/1, terminate/3, code_change/4]).
-export([handle_event/4]).
-define(SERVER, ?MODULE).
-type member() :: pid().
-type member_index() :: non_neg_integer().
-type option() :: #{
heartbeat_time_millis => non_neg_integer(),
alive_waiting_time_millis => non_neg_integer()
}.
-record(data, {
me :: member(),
my_index :: member_index(),
trusting :: member_index(),
members :: [member()],
trusting_timer :: timer:tref() | undefined,
heartbeat_time_millis :: non_neg_integer(),
heartbeat_timer :: timer:tref() | undefined,
suspected :: sets:set(member()),
alive_waiting_time_millis :: non_neg_integer(),
alive_waiting_timers :: #{member_index() => timer:tref()},
timeouts :: #{member_index() => non_neg_integer()}
}).
start_link(Me, Members, Option) ->
gen_statem:start_link({local, ?SERVER}, ?MODULE, [Me, Members, Option], []).
get_leader() ->
gen_statem:call(?SERVER, get_leader).
get_suspected() ->
gen_statem:call(?SERVER, get_suspected).
-spec callback_mode() -> gen_statem:callback_mode_result().
callback_mode() -> handle_event_function.
init([Me, Members, Option]) ->
process_flag(trap_exit, true),
Data = #data{ me = Me,
my_index = my_index(Me, Members),
members = Members,
heartbeat_time_millis = maps:get(heartbeat_time_millis, Option, 100),
heartbeat_timer = undefined,
trusting_timer = undefined,
timeouts = init_timeouts(Members, 100),
alive_waiting_time_millis = maps:get(alive_waiting_time_millis, Option, 100),
alive_waiting_timers = #{},
suspected = sets:new()
},
{NextState, NewData} = become_trusting(Data, 1),
{ok, NextState, NewData}.
%% Gets a trusted process
handle_event({call,From}, get_leader, State, Data) ->
Reply = get_trusted(Data),
{next_state, State, Data, [{ok,From,Reply}]};
%% Gets suspected processes
handle_event({call,From}, get_suspected, State, Data) ->
Reply = get_suspected(Data),
{next_state, State, Data, [{ok,From,Reply}]};
handle_event(info, heartbeat_timeout, State, Data) ->
handle_heartbeat_timeout(Data),
{next_state, State, Data};
handle_event(info, trusting_timeout, State, Data) ->
{NextState, NewData} = handle_trusting_timeout(State, Data),
{next_state, NextState, NewData};
handle_event(info, {alive_waiting_timeout, Member}, State, Data) ->
{NextState, NewData} = handle_alive_waiting_timeout(State, Data, Member),
{next_state, NextState, NewData};
handle_event(info, {heartbeat, Sender, SenderSuspected}, State, Data) ->
{NextState, NewData} = handle_heartbeat(State, Data, Sender, SenderSuspected),
{next_state, NextState, NewData};
handle_event(info, {alive, Sender}, State, Data) ->
{NextState, NewData} = handle_alive(State, Data, Sender),
{next_state, NextState, NewData};
handle_event({call,From}, _Msg, State, Data) ->
{next_state, State, Data, [{reply,From,ok}]}.
terminate(_Reason, _State, _Data) ->
void.
code_change(_OldVsn, State, Data, _Extra) ->
{ok, State, Data}.
become_trusted(Data) ->
cancel_timer(Data#data.trusting_timer),
cancel_timer(Data#data.heartbeat_timer),
cancel_alive_waiting_timers(Data#data.alive_waiting_timers),
{ok, HeartbeatTimer} = timer:send_interval(Data#data.heartbeat_time_millis, self(), heartbeat_timeout),
Suspected = sets:from_list(lists:sublist(Data#data.members, 1, Data#data.my_index - 1)),
NewData = Data#data{ trusting = Data#data.my_index,
heartbeat_timer = HeartbeatTimer,
trusting_timer = undefined,
suspected = Suspected,
alive_waiting_timers = init_alive_waiting_timers(
Data#data.members,
Data#data.alive_waiting_time_millis
)
},
{trusted, NewData}.
become_trusting(Data) ->
NextTrusting = (Data#data.trusting rem length(Data#data.members)) + 1,
become_trusting(Data, NextTrusting).
become_trusting(Data, Trusting) when Data#data.my_index =:= Trusting ->
become_trusted(Data);
become_trusting(Data, Trusting) ->
cancel_timer(Data#data.trusting_timer),
cancel_timer(Data#data.heartbeat_timer),
cancel_alive_waiting_timers(Data#data.alive_waiting_timers),
Timeout = timeout_of(Trusting, Data),
{ok, TrustingTimer} = timer:send_after(Timeout, self(), trusting_timeout),
{ok, HeartbeatTimer} = timer:send_interval(Timeout, self(), heartbeat_timeout),
{trusting, Data#data{ trusting = Trusting,
trusting_timer = TrustingTimer,
heartbeat_timer = HeartbeatTimer,
alive_waiting_timers = #{}
}}.
%% Task 2:
handle_trusting_timeout(trusting, Data) when Data#data.trusting < Data#data.my_index ->
become_trusting(Data);
handle_trusting_timeout(State, Data) -> {State, Data}.
%% Task 3:
handle_heartbeat(State, Data, Sender, SenderSuspected) when Sender =< Data#data.trusting ->
case Sender < Data#data.trusting of
true ->
Timeout = maps:get(Sender, Data#data.timeouts),
NewTimeouts = maps:put(Sender, Timeout + 1, Data#data.timeouts),
NewData = Data#data{ timeouts = NewTimeouts, suspected = SenderSuspected },
become_trusting(NewData, Sender);
false ->
NewData = Data#data{ suspected = SenderSuspected },
{State, NewData}
end;
handle_heartbeat(State, Data, _Sender, _SenderSuspected) -> {State, Data}.
%% Task 4:
handle_alive_waiting_timeout(State, Data, WaitingMemberIndex) when Data#data.trusting =:= Data#data.my_index ->
WaitingMember = member_of(Data, WaitingMemberIndex),
cancel_timer(maps:get(WaitingMemberIndex, Data#data.alive_waiting_timers, undefined)),
Timeout = timeout_of(Data, WaitingMemberIndex),
{ok, NewTimer} = timer:send_after(Timeout, self(), {alive_waiting_timeout, WaitingMemberIndex}),
NewAliveWaitingTimers = maps:put(WaitingMemberIndex, NewTimer, Data#data.alive_waiting_timers),
NewSuspected = sets:add_element(WaitingMember, Data#data.suspected),
NewData = Data#data{ suspected = NewSuspected, alive_waiting_timers = NewAliveWaitingTimers },
{State, NewData};
handle_alive_waiting_timeout(State, Data, _) -> {State, Data}.
handle_alive(State, Data, Sender) when Data#data.trusting =:= Data#data.my_index ->
Member = member_of(Data, Sender),
case sets:is_element(Member, Data#data.suspected) of
true ->
Timeout = maps:get(Sender, Data#data.timeouts),
NewTimeouts = maps:put(Sender, Timeout + 1, Data#data.timeouts),
NewSuspected = sets:add_element(Member, Data#data.suspected),
NewData = Data#data{ suspected = NewSuspected, timeouts = NewTimeouts },
{State, NewData};
false ->
{State, Data}
end;
handle_alive(State, Data, _Sender) -> {State, Data}.
init_timeouts(Members, DefaultTimeout) ->
Indices = lists:seq(1, length(Members)),
lists:foldl(
fun(N, AccIn) -> maps:put(N, DefaultTimeout, AccIn) end,
#{},
Indices
).
cancel_alive_waiting_timers(Timers) ->
lists:foreach(
fun(N) -> cancel_timer(maps:get(N, Timers, undefined)) end,
maps:keys(Timers)
).
init_alive_waiting_timers(Members, Timeout) ->
Indices = lists:seq(1, length(Members)),
lists:foldl(
fun(N, AccIn) ->
{ok, Timer} = timer:send_after(Timeout, self(), alive_waiting_timeout),
maps:put(N, Timer, AccIn)
end,
#{},
Indices
).
timeout_of(Data, MemberIndex) ->
maps:get(MemberIndex, Data#data.timeouts).
member_of(Data, MemberIndex) ->
lists:nth(MemberIndex, Data#data.members).
%% Task 1:
handle_heartbeat_timeout(Data) when Data#data.trusting =:= Data#data.my_index ->
Suspected = sets:to_list(Data#data.suspected),
%% P_i+1 ... P_n
Members = lists:sublist(Data#data.members, Data#data.my_index + 1, length(Data#data.members)) -- Suspected,
[ erlang:send(M, {heartbeat, Data#data.my_index, Data#data.suspected}) || M <- Members ];
handle_heartbeat_timeout(Data) ->
Trusted = get_trusted(Data),
erlang:send(Trusted, {alive, Data#data.my_index}).
get_trusted(Data) ->
lists:nth(Data#data.trusting, Data#data.members).
%% Suspects all processes except the trusted process
get_suspected(Data) ->
Data#data.suspected.
my_index(Me, Members) -> my_index(Me, Members, 1).
my_index(Me, Members, N) ->
case Me =:= lists:nth(N, Members) of
true -> N;
false -> my_index(Me, Members, N + 1)
end.
cancel_timer(undefined) -> ok;
cancel_timer(Timer) -> timer:cancel(Timer).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment