Created
May 8, 2020 00:50
-
-
Save shinnya/ccf085d9fae4d74f3a83281b0064d38e to your computer and use it in GitHub Desktop.
Implementing the weakest failure detector for solving consensus Mikel Larrea, Antonio Fernández Anta, Sergio Arévalo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
%% This module implements f-Resilient ♦S failure detector described in | |
%% "Implementing the weakest failure detector for solving consensus | |
%% Mikel Larrea, Antonio Fernández Anta, Sergio Arévalo" | |
-module(failure_detector). | |
-behaviour(gen_statem). | |
%% API | |
-export([start_link/3, | |
get_leader/0, | |
get_suspected/0 | |
]). | |
%% gen_statem callbacks | |
-export([callback_mode/0, init/1, terminate/3, code_change/4]). | |
-export([handle_event/4]). | |
-define(SERVER, ?MODULE). | |
-type member() :: pid(). | |
-type member_index() :: non_neg_integer(). | |
-type option() :: #{tolerable_faults => non_neg_integer()}. | |
-record(data, { | |
me :: member(), | |
my_index :: member_index(), | |
trusting :: member_index(), | |
members :: [member()], | |
tolerable_faults :: non_neg_integer(), | |
trusting_timer :: timer:tref() | undefined, | |
heartbeat_timer :: timer:tref() | undefined, | |
counts :: #{member_index() => non_neg_integer()}, | |
timeouts :: #{member_index() => non_neg_integer()} | |
}). | |
start_link(Me, Members, Option) -> | |
gen_statem:start_link({local, ?SERVER}, ?MODULE, [Me, Members, Option], []). | |
get_leader() -> | |
gen_statem:call(?SERVER, get_leader). | |
get_suspected() -> | |
gen_statem:call(?SERVER, get_suspected). | |
-spec callback_mode() -> gen_statem:callback_mode_result(). | |
callback_mode() -> handle_event_function. | |
init([Me, Members, Option]) -> | |
process_flag(trap_exit, true), | |
Data = #data{ me = Me, | |
my_index = my_index(Me, Members), | |
members = Members, | |
tolerable_faults = maps:get(tolerable_faults, Option, 1), | |
heartbeat_timer = undefined, | |
trusting_timer = undefined, | |
timeouts = init_timeouts(Members, 100), | |
counts = init_counts(Members) | |
}, | |
{NextState, NewData} = become_trusting(Data, 1), | |
{ok, NextState, NewData}. | |
%% Gets a trusted process | |
handle_event({call,From}, get_leader, State, Data) -> | |
Reply = get_trusted(Data), | |
{next_state, State, Data, [{ok,From,Reply}]}; | |
%% Gets suspected processes | |
handle_event({call,From}, get_suspected, State, Data) -> | |
Reply = get_suspected(Data), | |
{next_state, State, Data, [{ok,From,Reply}]}; | |
handle_event(info, heartbeat_timeout, State, Data) -> | |
broadcast_heartbeat(Data), | |
{next_state, State, Data}; | |
handle_event(info, trusting_timeout, State, Data) -> | |
{NextState, NewData} = handle_trusting_timeout(State, Data), | |
{next_state, NextState, NewData}; | |
handle_event(info, {heartbeat, Sender}, State, Data) -> | |
{NextState, NewData} = handle_heartbeat(State, Data, Sender), | |
{next_state, NextState, NewData}; | |
handle_event(info, {new_leader, Sender, SenderCount}, State, Data) -> | |
{NextState, NewData} = handle_new_leader(State, Data, Sender, SenderCount), | |
{next_state, NextState, NewData}; | |
handle_event({call,From}, _Msg, State, Data) -> | |
{next_state, State, Data, [{reply,From,ok}]}. | |
terminate(_Reason, _State, _Data) -> | |
void. | |
code_change(_OldVsn, State, Data, _Extra) -> | |
{ok, State, Data}. | |
become_trusted(Data) -> | |
cancel_timer(Data#data.trusting_timer), | |
cancel_timer(Data#data.heartbeat_timer), | |
{ok, HeartbeatTimer} = timer:send_interval(100, self(), heartbeat_timeout), | |
NewData = Data#data{ trusting = Data#data.my_index, | |
heartbeat_timer = HeartbeatTimer, | |
trusting_timer = undefined | |
}, | |
broadcast_new_leader(NewData), | |
{trusted, NewData}. | |
become_trusting(Data) -> | |
NextTrusting = (Data#data.trusting rem length(Data#data.members)) + 1, | |
become_trusting(Data, NextTrusting). | |
become_trusting(Data, Trusting) when Data#data.my_index =:= Trusting -> | |
become_trusted(Data); | |
become_trusting(Data, Trusting) -> | |
cancel_timer(Data#data.trusting_timer), | |
cancel_timer(Data#data.heartbeat_timer), | |
Timeout = timeout_of(Trusting, Data), | |
{ok, TrustingTimer} = timer:send_after(Timeout, self(), trusting_timeout), | |
{trusting, Data#data{ trusting = Trusting, | |
trusting_timer = TrustingTimer, | |
heartbeat_timer = undefined | |
}}. | |
handle_trusting_timeout(State, Data) when Data#data.tolerable_faults + 2 < Data#data.my_index -> | |
{State, Data}; | |
%% Task 2: for every process p_i,i=1...f+1 | |
%% when (trusted_i < i) and (did not receive I-AM-THE-LEADER from P_trusted_i), | |
%% during the last timeout(i,trusted_i) time units | |
handle_trusting_timeout(trusting, Data) when Data#data.trusting < Data#data.my_index -> | |
become_trusting(Data); | |
handle_trusting_timeout(State, Data) -> {State, Data}. | |
handle_heartbeat(State, Data, _Sender) when Data#data.tolerable_faults + 2 < Data#data.my_index -> | |
{State, Data}; | |
%% Task 3: for every process p_i,i=1...f+1 | |
%% when (received I-AM-THE-LEADER from P_j) and (j < trusted_i) | |
handle_heartbeat(_State, Data, Sender) when Sender < Data#data.trusting -> | |
Timeout = maps:get(Sender, Data#data.timeouts), | |
NewTimeouts = maps:put(Sender, Timeout + 1, Data#data.timeouts), | |
become_trusting(Data#data{ timeouts = NewTimeouts }, Sender); | |
handle_heartbeat(State, Data, _Sender) -> {State, Data}. | |
%% Task 1: for every process p_i,i=f+2...n | |
handle_new_leader(_, Data, Sender, SenderCount) when Data#data.tolerable_faults + 2 < Data#data.my_index -> | |
case {Data#data.my_index, count_of(Data, Data#data.trusting)} < {Sender, SenderCount} of | |
false -> Data; | |
true -> | |
NewCounts = maps:put(Sender, Sender, Data#data.counts), | |
NewData = Data#data{ counts = NewCounts }, | |
become_trusting(NewData) | |
end; | |
%% Task 4: for every process p_i,i=1...f+1 | |
%% when (R-deliver(NEW-LEADER, count_j) from p_j) and ((count_i,i) < (count_j,j)) | |
handle_new_leader(State, Data, Sender, SenderCount) -> | |
case {Data#data.my_index, count_of(Data, Data#data.my_index)} < {Sender, SenderCount} of | |
false -> | |
{State, Data}; | |
true -> | |
Count = maps:get(Sender, Data#data.counts), | |
NewCounts = maps:put(Sender, Count + 1, Data#data.counts), | |
NewData = Data#data{ counts = NewCounts }, | |
broadcast_new_leader(NewData), | |
{State, NewData} | |
end. | |
init_timeouts(Members, DefaultTimeout) -> | |
Indices = lists:seq(1, length(Members)), | |
lists:foldl( | |
fun(N, AccIn) -> maps:put(N, DefaultTimeout, AccIn) end, | |
#{}, | |
Indices | |
). | |
init_counts(Members) -> | |
Indices = lists:seq(1, length(Members)), | |
lists:foldl( | |
fun(N, AccIn) -> maps:put(N, 0, AccIn) end, | |
#{}, | |
Indices | |
). | |
timeout_of(Data, MemberIndex) -> | |
maps:get(MemberIndex, Data#data.timeouts). | |
count_of(Data, MemberIndex) -> | |
maps:get(MemberIndex, Data#data.counts). | |
broadcast_heartbeat(Data) when Data#data.tolerable_faults + 2 < Data#data.my_index -> ok; | |
%% Task 1: for every process p_i,i=1...f+1 | |
%% repeat periodically | |
%% if trusted_i = i then send I-AM-THE-LEADER to p_i+1 ... p_f+1 | |
broadcast_heartbeat(Data) when Data#data.trusting =:= Data#data.my_index -> | |
Members0 = lists:sublist(Data#data.members, Data#data.trusting + 1, Data#data.tolerable_faults), | |
Members1 = lists:delete(Data#data.me, Members0), | |
[ erlang:send(M, {heartbeat, Data#data.my_index}) || M <- Members1 ]; | |
broadcast_heartbeat(_Data) -> ok. | |
broadcast_new_leader(Data) when Data#data.tolerable_faults + 2 < Data#data.my_index -> ok; | |
%% for every process p_i,i=1...f+1 | |
%% if trusted_i = i then R-broadcast(NEW-LEADER, count_i) | |
broadcast_new_leader(Data) when Data#data.trusting =:= Data#data.my_index -> | |
Members = lists:delete(Data#data.me, Data#data.members), | |
MyCount = count_of(Data, #data.my_index), | |
[ erlang:send(M, {heartbeat, Data#data.my_index, MyCount}) || M <- Members ]; | |
broadcast_new_leader(_Data) -> ok. | |
get_trusted(Data) -> | |
lists:nth(Data#data.trusting, Data#data.members). | |
%% Suspects all processes except the trusted process | |
get_suspected(Data) -> | |
Trusting = lists:nth(Data#data.trusting, Data#data.members), | |
lists:delete(Trusting, Data#data.members). | |
my_index(Me, Members) -> my_index(Me, Members, 1). | |
my_index(Me, Members, N) -> | |
case Me =:= lists:nth(N, Members) of | |
true -> N; | |
false -> my_index(Me, Members, N + 1) | |
end. | |
cancel_timer(undefined) -> ok; | |
cancel_timer(Timer) -> timer:cancel(Timer). |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
%% This module implements ♦P failure detector described in | |
%% "Implementing the weakest failure detector for solving consensus | |
%% Mikel Larrea, Antonio Fernández Anta, Sergio Arévalo" | |
-module(failure_detector). | |
-behaviour(gen_statem). | |
%% API | |
-export([start_link/3, | |
get_leader/0, | |
get_suspected/0 | |
]). | |
%% gen_statem callbacks | |
-export([callback_mode/0, init/1, terminate/3, code_change/4]). | |
-export([handle_event/4]). | |
-define(SERVER, ?MODULE). | |
-type member() :: pid(). | |
-type member_index() :: non_neg_integer(). | |
-type option() :: #{ | |
heartbeat_time_millis => non_neg_integer(), | |
alive_waiting_time_millis => non_neg_integer() | |
}. | |
-record(data, { | |
me :: member(), | |
my_index :: member_index(), | |
trusting :: member_index(), | |
members :: [member()], | |
trusting_timer :: timer:tref() | undefined, | |
heartbeat_time_millis :: non_neg_integer(), | |
heartbeat_timer :: timer:tref() | undefined, | |
suspected :: sets:set(member()), | |
alive_waiting_time_millis :: non_neg_integer(), | |
alive_waiting_timers :: #{member_index() => timer:tref()}, | |
timeouts :: #{member_index() => non_neg_integer()} | |
}). | |
start_link(Me, Members, Option) -> | |
gen_statem:start_link({local, ?SERVER}, ?MODULE, [Me, Members, Option], []). | |
get_leader() -> | |
gen_statem:call(?SERVER, get_leader). | |
get_suspected() -> | |
gen_statem:call(?SERVER, get_suspected). | |
-spec callback_mode() -> gen_statem:callback_mode_result(). | |
callback_mode() -> handle_event_function. | |
init([Me, Members, Option]) -> | |
process_flag(trap_exit, true), | |
Data = #data{ me = Me, | |
my_index = my_index(Me, Members), | |
members = Members, | |
heartbeat_time_millis = maps:get(heartbeat_time_millis, Option, 100), | |
heartbeat_timer = undefined, | |
trusting_timer = undefined, | |
timeouts = init_timeouts(Members, 100), | |
alive_waiting_time_millis = maps:get(alive_waiting_time_millis, Option, 100), | |
alive_waiting_timers = #{}, | |
suspected = sets:new() | |
}, | |
{NextState, NewData} = become_trusting(Data, 1), | |
{ok, NextState, NewData}. | |
%% Gets a trusted process | |
handle_event({call,From}, get_leader, State, Data) -> | |
Reply = get_trusted(Data), | |
{next_state, State, Data, [{ok,From,Reply}]}; | |
%% Gets suspected processes | |
handle_event({call,From}, get_suspected, State, Data) -> | |
Reply = get_suspected(Data), | |
{next_state, State, Data, [{ok,From,Reply}]}; | |
handle_event(info, heartbeat_timeout, State, Data) -> | |
handle_heartbeat_timeout(Data), | |
{next_state, State, Data}; | |
handle_event(info, trusting_timeout, State, Data) -> | |
{NextState, NewData} = handle_trusting_timeout(State, Data), | |
{next_state, NextState, NewData}; | |
handle_event(info, {alive_waiting_timeout, Member}, State, Data) -> | |
{NextState, NewData} = handle_alive_waiting_timeout(State, Data, Member), | |
{next_state, NextState, NewData}; | |
handle_event(info, {heartbeat, Sender, SenderSuspected}, State, Data) -> | |
{NextState, NewData} = handle_heartbeat(State, Data, Sender, SenderSuspected), | |
{next_state, NextState, NewData}; | |
handle_event(info, {alive, Sender}, State, Data) -> | |
{NextState, NewData} = handle_alive(State, Data, Sender), | |
{next_state, NextState, NewData}; | |
handle_event({call,From}, _Msg, State, Data) -> | |
{next_state, State, Data, [{reply,From,ok}]}. | |
terminate(_Reason, _State, _Data) -> | |
void. | |
code_change(_OldVsn, State, Data, _Extra) -> | |
{ok, State, Data}. | |
become_trusted(Data) -> | |
cancel_timer(Data#data.trusting_timer), | |
cancel_timer(Data#data.heartbeat_timer), | |
cancel_alive_waiting_timers(Data#data.alive_waiting_timers), | |
{ok, HeartbeatTimer} = timer:send_interval(Data#data.heartbeat_time_millis, self(), heartbeat_timeout), | |
Suspected = sets:from_list(lists:sublist(Data#data.members, 1, Data#data.my_index - 1)), | |
NewData = Data#data{ trusting = Data#data.my_index, | |
heartbeat_timer = HeartbeatTimer, | |
trusting_timer = undefined, | |
suspected = Suspected, | |
alive_waiting_timers = init_alive_waiting_timers( | |
Data#data.members, | |
Data#data.alive_waiting_time_millis | |
) | |
}, | |
{trusted, NewData}. | |
become_trusting(Data) -> | |
NextTrusting = (Data#data.trusting rem length(Data#data.members)) + 1, | |
become_trusting(Data, NextTrusting). | |
become_trusting(Data, Trusting) when Data#data.my_index =:= Trusting -> | |
become_trusted(Data); | |
become_trusting(Data, Trusting) -> | |
cancel_timer(Data#data.trusting_timer), | |
cancel_timer(Data#data.heartbeat_timer), | |
cancel_alive_waiting_timers(Data#data.alive_waiting_timers), | |
Timeout = timeout_of(Trusting, Data), | |
{ok, TrustingTimer} = timer:send_after(Timeout, self(), trusting_timeout), | |
{ok, HeartbeatTimer} = timer:send_interval(Timeout, self(), heartbeat_timeout), | |
{trusting, Data#data{ trusting = Trusting, | |
trusting_timer = TrustingTimer, | |
heartbeat_timer = HeartbeatTimer, | |
alive_waiting_timers = #{} | |
}}. | |
%% Task 2: | |
handle_trusting_timeout(trusting, Data) when Data#data.trusting < Data#data.my_index -> | |
become_trusting(Data); | |
handle_trusting_timeout(State, Data) -> {State, Data}. | |
%% Task 3: | |
handle_heartbeat(State, Data, Sender, SenderSuspected) when Sender =< Data#data.trusting -> | |
case Sender < Data#data.trusting of | |
true -> | |
Timeout = maps:get(Sender, Data#data.timeouts), | |
NewTimeouts = maps:put(Sender, Timeout + 1, Data#data.timeouts), | |
NewData = Data#data{ timeouts = NewTimeouts, suspected = SenderSuspected }, | |
become_trusting(NewData, Sender); | |
false -> | |
NewData = Data#data{ suspected = SenderSuspected }, | |
{State, NewData} | |
end; | |
handle_heartbeat(State, Data, _Sender, _SenderSuspected) -> {State, Data}. | |
%% Task 4: | |
handle_alive_waiting_timeout(State, Data, WaitingMemberIndex) when Data#data.trusting =:= Data#data.my_index -> | |
WaitingMember = member_of(Data, WaitingMemberIndex), | |
cancel_timer(maps:get(WaitingMemberIndex, Data#data.alive_waiting_timers, undefined)), | |
Timeout = timeout_of(Data, WaitingMemberIndex), | |
{ok, NewTimer} = timer:send_after(Timeout, self(), {alive_waiting_timeout, WaitingMemberIndex}), | |
NewAliveWaitingTimers = maps:put(WaitingMemberIndex, NewTimer, Data#data.alive_waiting_timers), | |
NewSuspected = sets:add_element(WaitingMember, Data#data.suspected), | |
NewData = Data#data{ suspected = NewSuspected, alive_waiting_timers = NewAliveWaitingTimers }, | |
{State, NewData}; | |
handle_alive_waiting_timeout(State, Data, _) -> {State, Data}. | |
handle_alive(State, Data, Sender) when Data#data.trusting =:= Data#data.my_index -> | |
Member = member_of(Data, Sender), | |
case sets:is_element(Member, Data#data.suspected) of | |
true -> | |
Timeout = maps:get(Sender, Data#data.timeouts), | |
NewTimeouts = maps:put(Sender, Timeout + 1, Data#data.timeouts), | |
NewSuspected = sets:add_element(Member, Data#data.suspected), | |
NewData = Data#data{ suspected = NewSuspected, timeouts = NewTimeouts }, | |
{State, NewData}; | |
false -> | |
{State, Data} | |
end; | |
handle_alive(State, Data, _Sender) -> {State, Data}. | |
init_timeouts(Members, DefaultTimeout) -> | |
Indices = lists:seq(1, length(Members)), | |
lists:foldl( | |
fun(N, AccIn) -> maps:put(N, DefaultTimeout, AccIn) end, | |
#{}, | |
Indices | |
). | |
cancel_alive_waiting_timers(Timers) -> | |
lists:foreach( | |
fun(N) -> cancel_timer(maps:get(N, Timers, undefined)) end, | |
maps:keys(Timers) | |
). | |
init_alive_waiting_timers(Members, Timeout) -> | |
Indices = lists:seq(1, length(Members)), | |
lists:foldl( | |
fun(N, AccIn) -> | |
{ok, Timer} = timer:send_after(Timeout, self(), alive_waiting_timeout), | |
maps:put(N, Timer, AccIn) | |
end, | |
#{}, | |
Indices | |
). | |
timeout_of(Data, MemberIndex) -> | |
maps:get(MemberIndex, Data#data.timeouts). | |
member_of(Data, MemberIndex) -> | |
lists:nth(MemberIndex, Data#data.members). | |
%% Task 1: | |
handle_heartbeat_timeout(Data) when Data#data.trusting =:= Data#data.my_index -> | |
Suspected = sets:to_list(Data#data.suspected), | |
%% P_i+1 ... P_n | |
Members = lists:sublist(Data#data.members, Data#data.my_index + 1, length(Data#data.members)) -- Suspected, | |
[ erlang:send(M, {heartbeat, Data#data.my_index, Data#data.suspected}) || M <- Members ]; | |
handle_heartbeat_timeout(Data) -> | |
Trusted = get_trusted(Data), | |
erlang:send(Trusted, {alive, Data#data.my_index}). | |
get_trusted(Data) -> | |
lists:nth(Data#data.trusting, Data#data.members). | |
%% Suspects all processes except the trusted process | |
get_suspected(Data) -> | |
Data#data.suspected. | |
my_index(Me, Members) -> my_index(Me, Members, 1). | |
my_index(Me, Members, N) -> | |
case Me =:= lists:nth(N, Members) of | |
true -> N; | |
false -> my_index(Me, Members, N + 1) | |
end. | |
cancel_timer(undefined) -> ok; | |
cancel_timer(Timer) -> timer:cancel(Timer). |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment