Skip to content

Instantly share code, notes, and snippets.

@KirinDave
Created August 22, 2008 04:49
Show Gist options
  • Save KirinDave/6754 to your computer and use it in GitHub Desktop.
Save KirinDave/6754 to your computer and use it in GitHub Desktop.
%% ``The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved via the world wide web at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
%% AB. All Rights Reserved.''
%%
%%
%% $Id: gen_leader.erl,v 1.2 2005/07/04 06:55:56 hanssv Exp $
%%
%% @author Hans Svensson <[email protected]>
%% @author Thomas Arts <[email protected]>
%% @author Ulf Wiger <[email protected]>
%% @author Dave Fayram <[email protected]>
%%
%% (Maintainer's Note: Dave Fayram's modifications occured significantly
%% after the original release of this file.)
%%
%% @doc Leader election behavior.
%% <p>This application implements a leader election behavior modeled after
%% gen_server. This behavior intends to make it reasonably
%% straightforward to implement a fully distributed server with
%% master-slave semantics.</p>
%% <p>The gen_leader behavior supports nearly everything that gen_server
%% does (some functions, such as multicall() and the internal timeout,
%% have been removed), and adds a few callbacks and API functions to
%% support leader election etc.</p>
%% <p>Also included is an example program, a global dictionary, based
%% on the modules gen_leader and dict. The callback implementing the
%% global dictionary is called 'test_cb', for no particularly logical
%% reason.</p>
%% <p><b>New version:</b> The internal leader election algorithm was faulty
%% and has been replaced with a new version based on a different leader
%% election algorithm. As a consequence of this the query functions
%% <tt>alive</tt> and <tt>down</tt> can no longer be provided.
%% The new algorithm also make use of an incarnation parameter, by
%% default written to disk in the function <tt>incarnation</tt>. This
%% implies that only one <tt>gen_leader</tt> per node is permitted, if
%% used in a diskless environment, <tt>incarnation</tt> must be adapted.
%% </p>
%% @end
%%
%% @type election() = tuple(). Opaque state of the gen_leader behaviour.
%% @type node() = atom(). A node name.
%% @type name() = atom(). A locally registered name.
%% @type serverRef() = Name | {name(),node()} | {global,Name} | pid().
%% See gen_server.
%% @type callerRef() = {pid(), reference()}. See gen_server.
%%
-module(gen_leader).
% Time between rounds of query from the leader
-define(TAU,250).
% Exports for quickcheck
%-export([safe_loop/4,loop/4]).
-export([start/6,
start_link/6,
leader_call/2, leader_call/3, leader_cast/2, worker_announce/2,
call/2, call/3, cast/2,
reply/2]).
%% Query functions
-export([%% alive/1,
%% down/1,
candidates/1,
workers/1]).
-export([
system_continue/3,
system_terminate/4,
system_code_change/4,
format_status/2
]).
-export([behaviour_info/1]).
%% Internal exports
-export([init_it/6, print_event/3
%%, safe_send/2
]).
-import(error_logger , [format/2]).
-import(lists, [foldl/3,
foreach/2,
member/2,
keydelete/3,
keysearch/3]).
% Include for QuickCheck
% -include("eqc.hrl").
-record(election,{leader = none,
name,
leadernode = none,
candidate_nodes = [],
worker_nodes = [],
alive = [],
down = [],
monitored = [],
buffered = [],
status,
elid,
acks = [],
work_down = [],
pendack,
incarn,
nextel
}).
-record(server, {parent,
mod,
state,
debug}).
%%% ---------------------------------------------------
%%% Interface functions.
%%% ---------------------------------------------------
%% @hidden
behaviour_info(callbacks) ->
[{init,1},
{elected,2},
{surrendered,3},
{handle_leader_call,4},
{handle_leader_cast,3},
{from_leader,3},
{handle_call,3},
{handle_cast,2},
{handle_DOWN,3},
{handle_info,2},
{terminate,2},
{code_change,4}];
behaviour_info(_Other) ->
undefined.
%% @spec start(Name::node(), CandidateNodes::[node()],
%% Workers::[node()], Mod::atom(), Arg, Options::list()) ->
%% {ok,pid()}
%%
%% @doc Starts a gen_leader process without linking to the parent.
%%
start(Name, CandidateNodes, Workers, Mod, Arg, Options) when is_atom(Name) ->
gen:start(?MODULE, nolink, {local,Name},
Mod, {CandidateNodes, Workers, Arg}, Options).
%% @spec start_link(Name::atom(), CandidateNodes::[atom()],
%% Workers::[atom()], Mod::atom(), Arg, Options::list()) ->
%% {ok, pid()}
%%
%% @doc Starts a gen_leader process.
%% <table>
%% <tr><td>Name</td><td>The locally registered name of the process</td></tr>
%% <tr><td>CandidateNodes</td><td>The names of nodes capable of assuming
%% a leadership role</td></tr>
%% <tr><td>Workers</td>
%% <td>The names of nodes that will be part of the "cluster",
%% but cannot ever assume a leadership role.</td></tr>
%% <tr><td>Mod</td><td>The name of the callback module</td></tr>
%% <tr><td>Arg</td><td>Argument passed on to <code>Mod:init/1</code></td></tr>
%% <tr><td>Options</td><td>Same as gen_server's Options</td></tr>
%% </table>
%%
%% <p>The list of candidates needs to be known from the start. Workers
%% can be added at runtime.</p>
%% @end
start_link(Name, CandidateNodes, Workers,
Mod, Arg, Options) when is_atom(Name) ->
% Random delay for QuickCheck
% timer:sleep(random:uniform(400)),
gen:start(?MODULE, link, {local,Name}, Mod,
{CandidateNodes, Workers, Arg}, Options).
%% Query functions to be used from the callback module
%% alive(#election{alive = Alive}) ->
%% Alive.
%% down(#election{down = Down}) ->
%% Down.
%% @spec candidates(E::election()) -> [node()]
%%
%% @doc Returns a list of known candidates.
%%
candidates(#election{candidate_nodes = Cands}) ->
Cands.
%% @spec workers(E::election()) -> [node()]
%%
%% @doc Returns a list of known workers.
%%
workers(#election{worker_nodes = Workers}) ->
Workers.
%% Used by dynamically added workers.
%% @hidden
worker_announce(Name, Pid) ->
Name ! {add_worker, Pid}.
%
% Make a call to a generic server.
% If the server is located at another node, that node will
% be monitored.
% If the client is trapping exits and is linked server termination
% is handled here (? Shall we do that here (or rely on timeouts) ?).
%
%% @spec call(Name::serverRef(), Request) -> term()
%%
%% @doc Equivalent to <code>gen_server:call/2</code>, but with a slightly
%% different exit reason if something goes wrong. This function calls
%% the <code>gen_leader</code> process exactly as if it were a gen_server
%% (which, for practical purposes, it is.)
%% @end
call(Name, Request) ->
case catch gen:call(Name, '$gen_call', Request) of
{ok,Res} ->
Res;
{'EXIT',Reason} ->
exit({Reason, {?MODULE, local_call, [Name, Request]}})
end.
%% @spec call(Name::serverRef(), Request, Timeout::integer()) ->
%% Reply
%%
%% Reply = term()
%%
%% @doc Equivalent to <code>gen_server:call/3</code>, but with a slightly
%% different exit reason if something goes wrong. This function calls
%% the <code>gen_leader</code> process exactly as if it were a gen_server
%% (which, for practical purposes, it is.)
%% @end
call(Name, Request, Timeout) ->
case catch gen:call(Name, '$gen_call', Request, Timeout) of
{ok,Res} ->
Res;
{'EXIT',Reason} ->
exit({Reason, {?MODULE, local_call, [Name, Request, Timeout]}})
end.
%% @spec leader_call(Name::name(), Request::term())
%% -> Reply
%%
%% Reply = term()
%%
%% @doc Makes a call (similar to <code>gen_server:call/2</code>) to the
%% leader. The call is forwarded via the local gen_leader instance, if
%% that one isn't actually the leader. The client will exit if the
%% leader dies while the request is outstanding.
%% <p>This function uses <code>gen:call/3</code>, and is subject to the
%% same default timeout as e.g. <code>gen_server:call/2</code>.</p>
%% @end
%%
leader_call(Name, Request) ->
case catch gen:call(Name, '$leader_call', Request) of
{ok,{leader,reply,Res}} ->
Res;
{ok,{error, leader_died}} ->
exit({leader_died, {?MODULE, leader_call, [Name, Request]}});
{'EXIT',Reason} ->
exit({Reason, {?MODULE, leader_call, [Name, Request]}})
end.
%% @spec leader_call(Name::name(), Request::term(), Timeout::integer())
%% -> Reply
%%
%% Reply = term()
%%
%% @doc Makes a call (similar to <code>gen_server:call/3</code>) to the
%% leader. The call is forwarded via the local gen_leader instance, if
%% that one isn't actually the leader. The client will exit if the
%% leader dies while the request is outstanding.
%% @end
%%
leader_call(Name, Request, Timeout) ->
case catch gen:call(Name, '$leader_call', Request, Timeout) of
{ok,{leader,reply,Res}} ->
Res;
{'EXIT',Reason} ->
exit({Reason, {?MODULE, leader_call, [Name, Request, Timeout]}})
end.
%% @equiv gen_server:cast/2
cast(Name, Request) ->
catch do_cast('$gen_cast', Name, Request),
ok.
%% @spec leader_cast(Name::name(), Msg::term()) -> ok
%% @doc Similar to <code>gen_server:cast/2</code> but will be forwarded to
%% the leader via the local gen_leader instance.
leader_cast(Name, Request) ->
catch do_cast('$leader_cast', Name, Request),
ok.
do_cast(Tag, Name, Request) when atom(Name) ->
Name ! {Tag, Request};
do_cast(Tag, Pid, Request) when pid(Pid) ->
Pid ! {Tag, Request}.
%% @spec reply(From::callerRef(), Reply::term()) -> Void
%% @equiv gen_server:reply/2
reply({To, Tag}, Reply) ->
catch To ! {Tag, Reply}.
%%% ---------------------------------------------------
%%% Initiate the new process.
%%% Register the name using the Rfunc function
%%% Calls the Mod:init/Args function.
%%% Finally an acknowledge is sent to Parent and the main
%%% loop is entered.
%%% ---------------------------------------------------
%%% @hidden
init_it(Starter, self, Name, Mod, {CandidateNodes, Workers, Arg}, Options) ->
io:format("In little init_it!~n"),
init_it(Starter, self(), Name, Mod,
{CandidateNodes, Workers, Arg}, Options);
init_it(Starter,Parent,Name,Mod,{CandidateNodes,Workers,Arg},Options) ->
io:format("In big init_it!~n"),
%% The following row is needed in case of trace analysis,
%% starting tracing is too slow otherwise!
receive after 100 -> ok end,
Debug = debug_options(Name, Options),
AmCandidate = member(node(), CandidateNodes),
Election = #election{candidate_nodes = CandidateNodes,
worker_nodes = Workers,
name = Name,
nextel = 0},
case {catch Mod:init(Arg), AmCandidate} of
{{stop, Reason},_} ->
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason);
{ignore,_} ->
proc_lib:init_ack(Starter, ignore),
exit(normal);
{{'EXIT', Reason},_} ->
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason);
{{ok, State}, true} ->
NewE = startStage1(Election#election{incarn = incarnation(node())}),
proc_lib:init_ack(Starter, {ok, self()}),
safe_loop(#server{parent = Parent,mod = Mod,state = State,debug = Debug},
candidate, NewE,{init});
{{ok, State}, false} ->
proc_lib:init_ack(Starter, {ok, self()}),
case lists:member(self(), Workers) of
false ->
rpc:multicall(CandidateNodes, gen_leader, worker_announce, [Name, node(self())]);
_ -> nop
end,
safe_loop(#server{parent = Parent,mod = Mod,state = State,debug = Debug},
waiting_worker, Election,{init});
Else ->
Error = {bad_return_value, Else},
proc_lib:init_ack(Starter, {error, Error}),
exit(Error)
end.
%%% ---------------------------------------------------
%%% The MAIN loops.
%%% ---------------------------------------------------
safe_loop(#server{mod = Mod, state = State} = Server, Role,
#election{name = Name} = E, _PrevMsg) ->
% Event for QuickCheck
% ?EVENT({Role,E}),
receive
{system, From, Req} ->
#server{parent = Parent, debug = Debug} = Server,
sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
[safe, Server, Role, E]);
{'EXIT', _, Reason} = Msg ->
terminate(Reason, Msg, Server, Role, E);
{halt,T,From} = Msg ->
NewE = halting(E,T,From),
From ! {ackLeader,T,self()},
safe_loop(Server,Role,NewE,Msg);
{hasLeader,Ldr,T,_} = Msg ->
NewE1 = mon_node(E,Ldr),
case ( (E#election.status == elec2) and (E#election.acks /= []) ) of
true ->
lists:foreach(
fun(Node) ->
{Name,Node} ! {hasLeader,Ldr,T,self()}
end,E#election.acks);
false ->
ok
end,
NewE = NewE1#election{elid = T,
status = wait,
leadernode = node(Ldr),
down = E#election.down -- [node(Ldr)],
acks = []},
Ldr ! {isLeader,T,self()},
safe_loop(Server,Role,NewE,Msg);
{isLeader,T,From} = Msg ->
From ! {notLeader,T,self()},
safe_loop(Server,Role,E,Msg);
{notLeader,T,_} = Msg ->
case ( (E#election.status == wait) and (E#election.elid == T) ) of
true ->
NewE = startStage1(E);
false ->
NewE = E
end,
safe_loop(Server,Role,NewE,Msg);
{ackLeader,T,From} = Msg ->
case ( (E#election.status == elec2) and (E#election.elid == T) and
(E#election.pendack == node(From)) ) of
true ->
NewE = continStage2(E#election{acks = [node(From)|E#election.acks]});
false ->
NewE = E
end,
hasBecomeLeader(NewE,Server,Msg);
{ldr,Synch,T,From, LeaderE} = Msg ->
case ( (E#election.status == wait) and (E#election.elid == T) ) of
true ->
NewE1 = mon_node(E,From),
NewE = NewE1#election{leader = From,
leadernode = node(From),
status = norm,
worker_nodes = LeaderE#election.worker_nodes},
{ok,NewState} = Mod:surrendered(State,Synch,NewE),
loop(Server#server{state = NewState},surrendered,NewE,Msg);
false ->
safe_loop(Server,Role,E,Msg)
end;
{normQ,T,From} = Msg ->
case ( (E#election.status == elec1) or
( (E#election.status == wait) and (E#election.elid == T))) of
true ->
NewE = halting(E,T,From),
From ! {notNorm,T,self()};
false ->
NewE = E
end,
safe_loop(Server,Role,NewE,Msg);
{notNorm,_,_} = Msg ->
safe_loop(Server,Role,E,Msg);
{workerAlive,T,From} = Msg ->
case E#election.leadernode == none of
true -> % We should initiate activation, monitor the possible leader!
NewE = mon_node(E#election{leadernode = node(From),
elid = T},
From),
From ! {workerIsAlive,T,self()};
false ->
% We should acutally ignore this, the present activation
% will complete or abort first...
NewE = E
end,
safe_loop(Server,Role,NewE,Msg);
{workerIsAlive,_,_} = Msg ->
% If this happens, the activation process should abort
% This process is no longer the leader!
% The sender will notice this via a DOWN message
safe_loop(Server,Role,E,Msg);
{activateWorker,T,Synch,From,OldE} = Msg ->
case ( (T == E#election.elid) and (node(From) == E#election.leadernode)) of
true ->
NewE = E#election{ leader = From,
status = worker, worker_nodes = OldE#election.worker_nodes },
{ok,NewState} = Mod:surrendered(State,Synch,NewE),
loop(Server#server{state = NewState},worker,NewE,Msg);
false ->
% This should be a VERY special case...
% But doing nothing is the right thing!
% A DOWN message should arrive to solve this situation
safe_loop(Server,Role,E,Msg)
end;
{tau_timeout} = Msg ->
safe_loop(Server,Role,E,Msg);
{'DOWN',_Ref,process,From,_Reason} = Msg when Role == waiting_worker ->
% We are only monitoring one proc, the leader!
Node = case From of
{Name,_Node} -> _Node;
_ when pid(From) -> node(From)
end,
case Node == E#election.leadernode of
true ->
NewE = E#election{ leader = none, leadernode = none,
status = waiting_worker,
monitored = []};
false ->
NewE = E
end,
safe_loop(Server, Role, NewE,Msg);
{'DOWN',Ref,process,From,_Reason} = Msg ->
Node = case From of
{Name,_Node} -> _Node;
_ when pid(From) -> node(From)
end,
NewMon = E#election.monitored -- [{Ref,Node}],
case lists:member(Node,E#election.candidate_nodes) of
true ->
NewDown = [Node | E#election.down],
E1 = E#election{down = NewDown, monitored = NewMon},
case ( pos(Node,E#election.candidate_nodes) <
pos(node(),E#election.candidate_nodes) ) of
true ->
Lesser = lesser(node(),E#election.candidate_nodes),
LesserIsSubset = (Lesser -- NewDown) == [],
case ((E#election.status == wait) and (Node == E#election.leadernode)) of
true ->
NewE = startStage1(E1);
false ->
case ((E#election.status == elec1) and LesserIsSubset) of
true ->
NewE = startStage2(E1#election{down = Lesser});
false ->
NewE = E1
end
end;
false ->
case ( (E#election.status == elec2) and (Node == E#election.pendack) ) of
true ->
NewE = continStage2(E1);
false ->
case ( (E#election.status == wait) and
(Node == E#election.leadernode)) of
true ->
NewE = startStage1(E1);
false ->
NewE = E1
end
end
end
end,
hasBecomeLeader(NewE,Server,Msg)
end.
loop(#server{parent = Parent,
mod = Mod,
state = State,
debug = Debug} = Server, Role, #election{name = Name} = E, _PrevMsg) ->
% Event for QuickCheck
% ?EVENT({Role,E}),
receive
Msg ->
case Msg of
{system, From, Req} ->
sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
[normal, Server, Role, E]);
{'EXIT', Parent, Reason} ->
terminate(Reason, Msg, Server, Role, E);
{halt,_,From} ->
From ! {hasLeader,E#election.leader,E#election.elid,self()},
loop(Server,Role,E,Msg);
{hasLeader,_,_,_} ->
loop(Server,Role,E,Msg);
{isLeader,T,From} ->
case (self() == E#election.leader) of
true ->
NewE = mon_node(E#election{down = E#election.down -- [node(From)]},
From),
{ok,Synch,NewState} = Mod:elected(State,NewE),
From ! {ldr,Synch,E#election.elid,self(), NewE},
loop(Server#server{state = NewState},Role,NewE,Msg);
false ->
From ! {notLeader,T,self()},
loop(Server,Role,E,Msg)
end;
{ackLeader,_,_} ->
loop(Server,Role,E,Msg);
{notLeader,_,_} ->
loop(Server,Role,E,Msg);
{ack,_,_} ->
loop(Server,Role,E,Msg);
{ldr,_,_,_,_} ->
loop(Server,Role,E,Msg);
{normQ,_,_} ->
loop(Server,Role,E,Msg);
{notNorm,T,From} ->
case ( (E#election.leader == self()) and (E#election.elid == T) ) of
true ->
NewE = mon_node(E#election{down = E#election.down -- [node(From)]},
From),
{ok,Synch,NewState} = Mod:elected(State,NewE),
From ! {ldr,Synch,E#election.elid,self(), NewE},
loop(Server#server{state = NewState},Role,NewE,Msg);
false ->
loop(Server,Role,E,Msg)
end;
{workerAlive,_,_} ->
% Do nothing if we get this from a new leader
% We will soon notice that the prev leader has died, and
% get the same message again when we are back in safe_loop!
loop(Server,Role,E,Msg);
{activateWorker,_,_,_} ->
% We ignore this, we are already active...
% It must be an old message!
loop(Server,Role,E,Msg);
{workerIsAlive,T,From} ->
case ((T == E#election.elid) and (self() == E#election.leader)
% and iselem(node(From),E#election.monitored)
) of
true ->
NewE = mon_node(
E#election{work_down = E#election.work_down -- [node(From)]},
From),
% NewE = E#election{work_down = E#election.work_down -- [node(From)]},
{ok,Synch,NewState} = Mod:elected(State,NewE),
From ! {activateWorker,T,Synch,self(), NewE},
loop(Server#server{state = NewState},Role,NewE,Msg);
false ->
loop(Server,Role,E,Msg)
end;
{tau_timeout} ->
case (E#election.leader == self()) of
true ->
lists:foreach(
fun(Node) ->
{Name,Node} ! {normQ,E#election.elid,self()}
end,E#election.down -- [lists:nth(1,E#election.candidate_nodes)]),
lists:foreach(
fun(Node) ->
{Name,Node} ! {workerAlive,E#election.elid,self()}
end,E#election.work_down),
timer:send_after(?TAU,{tau_timeout});
false ->
ok
end,
loop(Server,Role,E,Msg);
{'DOWN',_Ref,process,From,_Reason} when Role == worker ->
% We are only monitoring one proc, the leader!
Node = case From of
{Name,_Node} -> _Node;
_ when pid(From) -> node(From)
end,
case Node == E#election.leadernode of
true ->
NewE = E#election{ leader = none, leadernode = none,
status = waiting_worker,
monitored = []},
safe_loop(Server, waiting_worker, NewE,Msg);
false ->
loop(Server, Role, E,Msg)
end;
{'DOWN',Ref,process,From,_Reason} ->
Node = case From of
{Name,_Node} -> _Node;
_ when pid(From) -> node(From)
end,
NewMon = E#election.monitored -- [{Ref,Node}],
case lists:member(Node,E#election.candidate_nodes) of
true ->
NewDown = [Node | E#election.down],
E1 = E#election{down = NewDown, monitored = NewMon},
case (Node == E#election.leadernode) of
true ->
NewE = startStage1(E1),
safe_loop(Server, candidate, NewE,Msg);
false ->
loop(Server, Role, E1,Msg)
end;
false ->
% I am the leader, make sure the dead worker is in work_down.
E1 = E#election{monitored = NewMon,
work_down = [Node | (E#election.work_down -- [Node])]
},
loop(Server, Role, E1,Msg)
end;
{add_worker, WorkerNode} ->
case {node(self()) =:= node(self()),
lists:member(WorkerNode, E#election.worker_nodes)} of
{false, _} ->
loop(Server, Role, E, Msg);
{true, false} ->
{WNodes, DNodes} = {E#election.worker_nodes, E#election.work_down},
loop(Server, Role, E#election{worker_nodes=[WorkerNode|WNodes],
work_down=[WorkerNode|DNodes]},
Msg);
{true, true} -> % Redundancy, meet the mirror
loop(Server, Role, E, Msg)
end;
_Msg when Debug == [] ->
handle_msg(Msg, Server, Role, E);
_Msg ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
E#election.name, {in, Msg}),
handle_msg(Msg, Server#server{debug = Debug1}, Role, E)
end
end.
%%-----------------------------------------------------------------
%% Callback functions for system messages handling.
%%-----------------------------------------------------------------
%% @hidden
system_continue(_Parent, _Debug, [safe, Server, Role, E]) ->
safe_loop(Server, Role, E,{});
system_continue(_Parent, _Debug, [normal, Server, Role, E]) ->
loop(Server, Role, E,{}).
%% @hidden
system_terminate(Reason, _Parent, _Debug, [_Mode, Server, Role, E]) ->
terminate(Reason, [], Server, Role, E).
%% @hidden
system_code_change([Mode, Server, Role, E], _Module, OldVsn, Extra) ->
#server{mod = Mod, state = State} = Server,
case catch Mod:code_change(OldVsn, State, E, Extra) of
{ok, NewState} ->
NewServer = Server#server{state = NewState},
{ok, [Mode, NewServer, Role, E]};
{ok, NewState, NewE} ->
NewServer = Server#server{state = NewState},
{ok, [Mode, NewServer, Role, NewE]};
Else -> Else
end.
%%-----------------------------------------------------------------
%% Format debug messages. Print them as the call-back module sees
%% them, not as the real erlang messages. Use trace for that.
%%-----------------------------------------------------------------
%% @hidden
print_event(Dev, {in, Msg}, Name) ->
case Msg of
{'$gen_call', {From, _Tag}, Call} ->
io:format(Dev, "*DBG* ~p got local call ~p from ~w~n",
[Name, Call, From]);
{'$leader_call', {From, _Tag}, Call} ->
io:format(Dev, "*DBG* ~p got global call ~p from ~w~n",
[Name, Call, From]);
{'$gen_cast', Cast} ->
io:format(Dev, "*DBG* ~p got local cast ~p~n",
[Name, Cast]);
{'$leader_cast', Cast} ->
io:format(Dev, "*DBG* ~p got global cast ~p~n",
[Name, Cast]);
_ ->
io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg])
end;
print_event(Dev, {out, Msg, To, State}, Name) ->
io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n",
[Name, Msg, To, State]);
print_event(Dev, {noreply, State}, Name) ->
io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]);
print_event(Dev, Event, Name) ->
io:format(Dev, "*DBG* ~p dbg ~p~n", [Name, Event]).
handle_msg({'$leader_call', From, Request} = Msg,
#server{mod = Mod, state = State} = Server, elected = Role, E) ->
case catch Mod:handle_leader_call(Request, From, State, E) of
{reply, Reply, NState} ->
NewServer = reply(From, {leader,reply,Reply},
Server#server{state = NState}, Role, E),
loop(NewServer, Role, E,Msg);
{reply, Reply, Broadcast, NState} ->
NewE = broadcast({from_leader,Broadcast}, E),
NewServer = reply(From, {leader,reply,Reply},
Server#server{state = NState}, Role,
NewE),
loop(NewServer, Role, NewE,Msg);
{noreply, NState} = Reply ->
NewServer = handle_debug(Server#server{state = NState},
Role, E, Reply),
loop(NewServer, Role, E,Msg);
{stop, Reason, Reply, NState} ->
{'EXIT', R} =
(catch terminate(Reason, Msg,
Server#server{state = NState},
Role, E)),
reply(From, Reply),
exit(R);
Other ->
handle_common_reply(Other, Msg, Server, Role, E)
end;
handle_msg({from_leader, Cmd} = Msg,
#server{mod = Mod, state = State} = Server, Role, E) ->
handle_common_reply(catch Mod:from_leader(Cmd, State, E),
Msg, Server, Role, E);
handle_msg({'$leader_call', From, Request} = Msg, Server, Role,
#election{buffered = Buffered, leader = Leader} = E) ->
Ref = make_ref(),
Leader ! {'$leader_call', {self(),Ref}, Request},
NewBuffered = [{Ref,From}|Buffered],
loop(Server, Role, E#election{buffered = NewBuffered},Msg);
handle_msg({Ref, {leader,reply,Reply}} = Msg, Server, Role,
#election{buffered = Buffered} = E) ->
{value, {_,From}} = keysearch(Ref,1,Buffered),
NewServer = reply(From, {leader,reply,Reply}, Server, Role,
E#election{buffered = keydelete(Ref,1,Buffered)}),
loop(NewServer, Role, E, Msg);
handle_msg({'$gen_call', From, Request} = Msg,
#server{mod = Mod, state = State} = Server, Role, E) ->
case catch Mod:handle_call(Request, From, State) of
{reply, Reply, NState} ->
NewServer = reply(From, Reply,
Server#server{state = NState}, Role, E),
loop(NewServer, Role, E, Msg);
{noreply, NState} = Reply ->
NewServer = handle_debug(Server#server{state = NState},
Role, E, Reply),
loop(NewServer, Role, E, Msg);
{stop, Reason, Reply, NState} ->
{'EXIT', R} =
(catch terminate(Reason, Msg, Server#server{state = NState},
Role, E)),
reply(From, Reply),
exit(R);
Other ->
handle_common_reply(Other, Msg, Server, Role, E)
end;
handle_msg({'$gen_cast',Msg} = Cast,
#server{mod = Mod, state = State} = Server, Role, E) ->
handle_common_reply(catch Mod:handle_cast(Msg, State),
Cast, Server, Role, E);
handle_msg(Msg,
#server{mod = Mod, state = State} = Server, Role, E) ->
handle_common_reply(catch Mod:handle_info(Msg, State),
Msg, Server, Role, E).
handle_common_reply(Reply, Msg, Server, Role, E) ->
case Reply of
{ok, NState} ->
NewServer = handle_debug(Server#server{state = NState},
Role, E, Reply),
loop(NewServer, Role, E, Msg);
{stop, Reason, NState} ->
terminate(Reason, Msg, Server#server{state = NState}, Role, E);
{'EXIT', Reason} ->
terminate(Reason, Msg, Server, Role, E);
_ ->
terminate({bad2_return_value, Reply}, Msg, Server, Role, E)
end.
reply({To, Tag}, Reply, #server{state = State} = Server, Role, E) ->
reply({To, Tag}, Reply),
handle_debug(Server, Role, E, {out, Reply, To, State}).
handle_debug(#server{debug = []} = Server, _Role, _E, _Event) ->
Server;
handle_debug(#server{debug = Debug} = Server, _Role, E, Event) ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
E#election.name, Event),
Server#server{debug = Debug1}.
%%% ---------------------------------------------------
%%% Terminate the server.
%%% ---------------------------------------------------
terminate(Reason, Msg, #server{mod = Mod,
state = State,
debug = Debug} = _Server, _Role,
#election{name = Name} = _E) ->
case catch Mod:terminate(Reason, State) of
{'EXIT', R} ->
error_info(R, Name, Msg, State, Debug),
exit(R);
_ ->
case Reason of
normal ->
exit(normal);
shutdown ->
exit(shutdown);
_ ->
error_info(Reason, Name, Msg, State, Debug),
exit(Reason)
end
end.
%% Maybe we shouldn't do this? We have the crash report...
error_info(Reason, Name, Msg, State, Debug) ->
format("** Generic leader ~p terminating \n"
"** Last message in was ~p~n"
"** When Server state == ~p~n"
"** Reason for termination == ~n** ~p~n",
[Name, Msg, State, Reason]),
sys:print_log(Debug),
ok.
%%% ---------------------------------------------------
%%% Misc. functions.
%%% ---------------------------------------------------
opt(Op, [{Op, Value}|_]) ->
{ok, Value};
opt(Op, [_|Options]) ->
opt(Op, Options);
opt(_, []) ->
false.
debug_options(Name, Opts) ->
case opt(debug, Opts) of
{ok, Options} -> dbg_options(Name, Options);
_ -> dbg_options(Name, [])
end.
dbg_options(Name, []) ->
Opts =
case init:get_argument(generic_debug) of
error ->
[];
_ ->
[log, statistics]
end,
dbg_opts(Name, Opts);
dbg_options(Name, Opts) ->
dbg_opts(Name, Opts).
dbg_opts(Name, Opts) ->
case catch sys:debug_options(Opts) of
{'EXIT',_} ->
format("~p: ignoring erroneous debug options - ~p~n",
[Name, Opts]),
[];
Dbg ->
Dbg
end.
%%-----------------------------------------------------------------
%% Status information
%%-----------------------------------------------------------------
%% @hidden
format_status(Opt, StatusData) ->
[PDict, SysState, Parent, Debug, [_Mode, Server, _Role, E]] = StatusData,
Header = lists:concat(["Status for generic server ", E#election.name]),
Log = sys:get_debug(log, Debug, []),
#server{mod = Mod, state = State} = Server,
Specific =
case erlang:function_exported(Mod, format_status, 2) of
true ->
case catch apply(Mod, format_status, [Opt, [PDict, State]]) of
{'EXIT', _} -> [{data, [{"State", State}]}];
Else -> Else
end;
_ ->
[{data, [{"State", State}]}]
end,
[{header, Header},
{data, [{"Status", SysState},
{"Parent", Parent},
{"Logged events", Log}]} |
Specific].
%%-----------------------------------------------------------------
%% Leader-election functions
%%-----------------------------------------------------------------
%% Corresponds to startStage1 in Figure 1 in the Stoller-article
startStage1(E) ->
Elid = {pos(node(),E#election.candidate_nodes),E#election.incarn,E#election.nextel},
NewE = E#election{
elid = Elid,
nextel = E#election.nextel + 1,
down = [],
status = elec1},
case ( pos(node(),E#election.candidate_nodes) == 1) of
true ->
startStage2(NewE);
false ->
mon_nodes(NewE,lesser(node(),E#election.candidate_nodes))
end.
%% Corresponds to startStage2
startStage2(E) ->
continStage2(E#election{
status = elec2,
pendack = node(),
acks = []}).
continStage2(E) ->
case pos(E#election.pendack,E#election.candidate_nodes) < length(E#election.candidate_nodes) of
true ->
Pendack = next(E#election.pendack,E#election.candidate_nodes),
NewE = mon_nodes(E,[Pendack]),
{E#election.name,Pendack} ! {halt,E#election.elid,self()},
NewE#election{pendack = Pendack};
false ->
% I am the leader
% io:format("I am the leader (Node ~w) ~n", [node()]),
E#election{leader = self(),
leadernode = node(),
status = norm}
end.
%% corresponds to Halting
halting(E,T,From) ->
NewE = mon_node(E,From),
NewE#election{elid = T,
status = wait,
leadernode = node(From),
down = E#election.down -- [node(From)]
}.
%% Start monitor a bunch of nodes
mon_nodes(E,Nodes) ->
foldl(
fun(Node,_E) ->
mon_node(_E,{_E#election.name,Node})
end,E,Nodes).
%% Star monitoring one Process
mon_node(E,Proc) ->
Node = case Proc of
{_Name,Node_} ->
Node_;
Pid when pid(Pid) ->
node(Pid)
end,
case iselem(Node,E#election.monitored) of
true ->
E;
false ->
Ref = erlang:monitor(process,Proc),
E#election{monitored = [{Ref,Node} | E#election.monitored]}
end.
%% Stop monitoring of a bunch of nodes
%demon_nodes(E) ->
% foreach(fun({R,_}) ->
% erlang:demonitor(R)
% end,E#election.monitored),
% E#election{monitored = []}.
%% checks if the proc has become the leader, if so switch to loop
hasBecomeLeader(E,Server,Msg) ->
case ((E#election.status == norm) and (E#election.leader == self())) of
true ->
{ok,Synch,NewState} = (Server#server.mod):elected(Server#server.state,E),
lists:foreach(
fun(Node) ->
{E#election.name,Node} !
{ldr, Synch, E#election.elid, self(), E}
end,E#election.acks),
% Make sure we will try to contact all workers!
NewE = E#election{work_down = E#election.worker_nodes},
% Set the internal timeout (corresponds to Periodically)
timer:send_after(?TAU,{tau_timeout}), %It's meaningful only when I am the leader!
loop(Server#server{state = NewState},elected,NewE,Msg);
false ->
safe_loop(Server,candidate,E,Msg)
end.
%%%
%
% incarnation should return an integer value for the next
% incarnation of this node. We create a file for each node,
% this file contains a counter. When starting the system for the
% first time, the files should be intialized with 0 incarnation
% counter for all nodes orelse be removed, since we create
% files if not present with counter 1.
%
% Atomicity: This approach is safe as long as there is only
% one gen_leader running per node.
%
incarnation(Node) ->
case file:read_file_info(Node) of
{error,_Reason} ->
ok = file:write_file(Node,term_to_binary(1)),
0;
{ok,_} ->
{ok,Bin} = file:read_file(Node),
Incarn = binary_to_term(Bin),
ok = file:write_file(Node,term_to_binary(Incarn+1)),
Incarn
end.
broadcast(Msg, #election{monitored = Monitored} = E) ->
%% This function is used for broadcasts,
%% and we make sure only to broadcast to already known nodes.
ToNodes = [N || {_,N} <- Monitored],
broadcast(Msg, ToNodes, E).
broadcast({from_leader, Msg}, ToNodes, E) ->
foreach(
fun(Node) ->
{E#election.name,Node} ! {from_leader, Msg}
end,ToNodes),
E.
iselem(_,[]) ->
false;
iselem(P,[{_,P}|_]) ->
true;
iselem(P,[_ | Ns]) ->
iselem(P,Ns).
lesser(_,[]) ->
[];
lesser(N,[N|_]) ->
[];
lesser(N,[M|Ms]) ->
[M|lesser(N,Ms)].
next(_,[]) ->
no_val;
next(N,[N|Ms]) ->
lists:nth(1,Ms);
next(N,[_|Ms]) ->
next(N,Ms).
pos(N1,[N1|_]) ->
1;
pos(N1,[_|Ns]) ->
1+pos(N1,Ns).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment