Created
August 3, 2009 22:46
-
-
Save boorad/160871 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
%%% -*- erlang-indent-level:2 -*- | |
%%%------------------------------------------------------------------- | |
%%% File: membership2.erl | |
%%% @author Cliff Moon <[email protected]> [] | |
%%% @copyright 2009 Cliff Moon | |
%%% @doc | |
%%% | |
%%% @end | |
%%% | |
%%% @since 2009-05-04 by Cliff Moon | |
%%%------------------------------------------------------------------- | |
-module(membership2). | |
-author('[email protected]'). | |
-author('[email protected]'). | |
-behaviour(gen_server). | |
%% API | |
-export([start_link/2, start_link/3, register/2, servers_for_key/1, stop/1, | |
partitions/0, servers/0, servers/1, servers_to_list/1]). | |
%% gen_server callbacks | |
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, | |
terminate/2, code_change/3]). | |
%% includes | |
-include("../include/config.hrl"). | |
-include("../include/common.hrl"). | |
-include_lib("eunit/include/eunit.hrl"). | |
%% init,join,gossip,register,down | |
-define(GOSSIP_INFO, []). | |
%%==================================================================== | |
%% API | |
%%==================================================================== | |
%% @doc Starts the server | |
%% @end | |
%%-------------------------------------------------------------------- | |
start_link(Node, Nodes) -> | |
start_link(Node, Nodes, []). | |
start_link(Node, Nodes, Args) -> | |
gen_server:start_link({local, membership}, ?MODULE, [Node, Nodes, Args], []). | |
register(Partition, Pid) -> | |
gen_server:cast(membership, {register, Partition, Pid}). | |
stop(Server) -> | |
gen_server:cast(Server, stop). | |
partitions() -> | |
gen_server:call(membership, partitions). | |
servers() -> | |
servers(all). | |
servers(Type) -> | |
gen_server:call(membership, {servers, Type}). | |
servers_for_key(Key) -> | |
gen_server:call(membership, {servers_for_key, Key}). | |
%% this gets everything we know of, not just locals | |
servers_to_list(Servers) -> | |
L = ets:foldl(fun | |
({Partition, Pid}, List) -> | |
[{Partition, Pid}|List]; | |
({_Ref, _Partition, _Pid}, List) -> | |
List | |
end, [], Servers), | |
lists:sort(L). | |
%%==================================================================== | |
%% gen_server callbacks | |
%%==================================================================== | |
%%-------------------------------------------------------------------- | |
%% @spec init(Args) -> {ok, State} | | |
%% {ok, State, Timeout} | | |
%% ignore | | |
%% {stop, Reason} | |
%% @doc Initiates the server | |
%% @end | |
%%-------------------------------------------------------------------- | |
init([Node, Nodes, Args]) -> | |
process_flag(trap_exit,true), | |
Config = configuration:get_config(), | |
#membership{nodes=PersistentNodes, | |
partitions=PersistentParts, | |
version=PersistentVersion} = load(Node), | |
Servers = ets:new(member_servers, [public, bag]), | |
Hints = proplists:get_value(hints, Args, []), | |
%% Hints = showroom_brain:hints(PartialNodes) TODO | |
{WorldNodes, PMap, Version} = | |
case PersistentParts of | |
undefined -> | |
% didn't find persistent state on disk | |
PartialNodes = lists:usort(Nodes ++ PersistentNodes), | |
Partners = replication:partners(Node, Nodes, Config), | |
PartnersPlus = partners_plus(Node, Partners, Nodes), | |
{NewVersion, RemoteNodes} = join_to(Node, PartnersPlus, Hints), | |
NewWorldNodes = lists:usort(PartialNodes ++ RemoteNodes), | |
{NewWorldNodes, | |
get_init_pmap(Node, Partners, NewWorldNodes, Config), | |
vector_clock:increment(node(), NewVersion)}; | |
_ -> | |
% found persistent state on disk | |
case Hints of | |
[] -> ok; | |
_ -> ?infoFmt("~nPersistent State was loaded from disk. Hints ~p " | |
"ignored.", [Hints]) | |
end, | |
{PersistentNodes, PersistentParts, PersistentVersion} | |
end, | |
State = #membership{ | |
node=Node, | |
nodes=WorldNodes, | |
partitions=PMap, | |
version=Version, | |
servers=Servers}, | |
save(State), | |
?infoMsg("Loading storage servers.~n"), | |
storage_manager:load(Nodes, Servers, State#membership.partitions, | |
int_partitions_for_node(Node, State, all), | |
[{type,init},{dst,Node}] | |
), | |
{ok, State}. | |
%%-------------------------------------------------------------------- | |
%% @spec | |
%% handle_call(Request, From, State) -> {reply, Reply, State} | | |
%% {reply, Reply, State, Timeout} | | |
%% {noreply, State} | | |
%% {noreply, State, Timeout} | | |
%% {stop, Reason, Reply, State} | | |
%% {stop, Reason, State} | |
%% @doc Handling call messages | |
%% @end | |
%%-------------------------------------------------------------------- | |
%% join | |
handle_call({join, JoiningNode, Hints}, _From, | |
State = #membership{version=Version, node=Node, nodes=Nodes, | |
servers=Servers, partitions=Partitions}) -> | |
WorldNodes = lists:usort(Nodes ++ [JoiningNode]), | |
PMap = case partitions:join(JoiningNode, Partitions, Hints) of | |
{ok, Table} -> Table; | |
{error, Error, _Table} -> throw({join_error, Error}) | |
end, | |
NewVersion = vector_clock:increment(node(), Version), | |
NewState = State#membership{nodes=WorldNodes, partitions=PMap, | |
version=NewVersion}, | |
save(NewState), | |
%% clean up this node's storage servers, after JoiningNode just joined. | |
storage_manager:load(Nodes, Servers, PMap, | |
int_partitions_for_node(Node, NewState, all), | |
[{type,join},{dst,Node}] | |
), | |
fire_gossip(Node, WorldNodes, gossip_join, {NewVersion, PMap}), | |
{reply, {NewVersion, WorldNodes}, NewState}; | |
%% servers_for_key | |
handle_call({servers_for_key, Key}, _From, | |
State = #membership{servers=Servers}) -> | |
Config = configuration:get_config(), | |
Hash = lib_misc:hash(Key), | |
Partition = list_to_atom(partitions:hash_to_hex(Hash, Config#config.q)), | |
ServerList = servers_to_list(Servers), | |
Pids = proplists:get_all_values(Partition, ServerList), | |
{reply, Pids, State}; | |
%% state | |
handle_call(state, _From, State) -> | |
{reply, State, State}; | |
%% partitions | |
handle_call(partitions, _From, State = #membership{partitions=Parts}) -> | |
{reply, {ok, Parts}, State}; | |
%% servers | |
handle_call({servers, list}, _From, State = #membership{servers=Servers}) -> | |
{reply, {ok, servers_to_list(Servers)}, State}; | |
handle_call({servers, ets}, _From, State = #membership{servers=Servers}) -> | |
{reply, {ok, ets:tab2list(Servers)}, State}; | |
handle_call({servers, _}, _From, State = #membership{servers=Servers}) -> | |
{reply, {ok, [{list, servers_to_list(Servers)}, | |
{ets, ets:tab2list(Servers)}]}, State}; | |
handle_call(_Request, _From, State) -> | |
{reply, ignored, State}. | |
%%-------------------------------------------------------------------- | |
%% @spec handle_cast(Msg, State) -> {noreply, State} | | |
%% {noreply, State, Timeout} | | |
%% {stop, Reason, State} | |
%% @doc Handling cast messages | |
%% @end | |
%%-------------------------------------------------------------------- | |
%% register | |
handle_cast({register, Partition, Pid}, | |
State = #membership{node=Me, nodes=Nodes, servers=Servers, | |
version=Version}) -> | |
%% ?debugFmt("~nRegistering ~p~n", [{Partition, Pid}]), | |
Ref = erlang:monitor(process, Pid), | |
ets:insert(Servers, {Partition, Pid}), | |
ets:insert(Servers, {Ref, Partition, Pid}), | |
NewVersion = vector_clock:increment(node(), Version), | |
NewState = State#membership{node=Me, version=NewVersion}, | |
ServerList = servers_to_list(Servers), | |
fire_gossip(Me, Nodes, gossip_register, {NewVersion, ServerList, | |
Ref, Partition, Pid}), | |
{noreply, NewState}; | |
%% gossip_join | |
handle_cast({gossip_join, {RemoteVersion, RemotePMap}, RemoteNode}, | |
State = #membership{version=LocalVersion, node=LocalNode, | |
partitions=LocalPMap, nodes=Nodes}) -> | |
{MergeType, MergedVersion, MergedPMap} = | |
merge_pmaps(RemoteVersion, RemotePMap, RemoteNode, | |
LocalVersion, LocalPMap, LocalNode), | |
NewState = | |
case MergeType of | |
equal -> State; | |
merged -> | |
fire_gossip(LocalNode, Nodes, gossip_join, {MergedVersion, MergedPMap}), | |
State#membership{version=MergedVersion, partitions=MergedPMap} | |
end, | |
{noreply, NewState}; | |
%% gossip_register and gossip_down TODO: guards for Type? | |
handle_cast({Type, {RemoteVersion, RemoteServerList, Ref, Part, Pid}, | |
RemoteNode}, | |
State = #membership{version=LocalVersion, node=LocalNode, | |
servers=LocalServers, nodes=Nodes}) -> | |
Op = case Type of | |
gossip_register -> insert; | |
gossip_down -> delete_object | |
end, | |
{MergeType, MergedVersion, MergedServerList} = | |
merge_servers(RemoteVersion, RemoteServerList, RemoteNode, | |
Op, Ref, Part, Pid, | |
LocalVersion, LocalServers, LocalNode), | |
case MergeType of | |
equal -> {noreply, State}; | |
merged -> | |
?debugFmt("~nMergedServerList: ~p~n", [MergedServerList]), | |
server_list_into_table(MergedServerList, LocalServers), | |
fire_gossip(LocalNode, Nodes, Type, | |
{MergedVersion, MergedServerList, Ref, Part, Pid}), | |
NewState = State#membership{version=MergedVersion}, | |
storage_manager:load(Nodes, | |
LocalServers, | |
NewState#membership.partitions, | |
int_partitions_for_node(LocalNode, NewState, all), | |
[{type,Type},{dst,LocalNode}] | |
), | |
{noreply, NewState} | |
end; | |
%% stop | |
handle_cast(stop, State) -> | |
{stop, normal, State}. | |
%%-------------------------------------------------------------------- | |
%% @spec handle_info(Info, State) -> {noreply, State} | | |
%% {noreply, State, Timeout} | | |
%% {stop, Reason, State} | |
%% @doc Handling all non call/cast messages | |
%% @end | |
%%-------------------------------------------------------------------- | |
handle_info({'DOWN', Ref, _, Pid, _}, | |
State = #membership{node=Me, nodes=Nodes, servers=Servers, | |
version=Version}) -> | |
erlang:demonitor(Ref), | |
[{Ref, Partition, Pid}] = ets:lookup(Servers, Ref), | |
ets:delete_object(Servers, {Ref, Partition, Pid}), | |
ets:delete_object(Servers, {Partition, Pid}), | |
?infoFmt("Storage Server Pid is down ~p~n", [{Ref, Partition, Pid}]), | |
NewVersion = vector_clock:increment(node(), Version), | |
NewState = State#membership{node=Me, version=NewVersion}, | |
ServerList = servers_to_list(Servers), | |
fire_gossip(Me, Nodes, gossip_down, {NewVersion, ServerList, | |
Ref, Partition, Pid}), | |
{noreply, NewState}; | |
handle_info(Info, State) -> | |
?debugFmt("~nInfo: ~p~n", [Info]), | |
{noreply, State}. | |
%%-------------------------------------------------------------------- | |
%% @spec terminate(Reason, State) -> void() | |
%% @doc This function is called by a gen_server when it is about to | |
%% terminate. It should be the opposite of Module:init/1 and do any necessary | |
%% cleaning up. When it returns, the gen_server terminates with Reason. | |
%% The return value is ignored. | |
%% @end | |
%%-------------------------------------------------------------------- | |
terminate(_Reason, _State) -> | |
ok. | |
%%-------------------------------------------------------------------- | |
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} | |
%% @doc Convert process state when code is changed | |
%% @end | |
%%-------------------------------------------------------------------- | |
code_change(_OldVsn, State, _Extra) -> | |
{ok, State}. | |
%%-------------------------------------------------------------------- | |
%%% Internal functions | |
%%-------------------------------------------------------------------- | |
%% return State from membership file | |
load(Node) -> | |
Config = configuration:get_config(), | |
case file:consult(filename:join([Config#config.directory, | |
lists:concat([node:name(Node), ".state"])])) of | |
{error, Reason} -> | |
?infoFmt("Could not load state: ~p~n", [Reason]), | |
#membership{nodes=[]}; | |
{ok, [Terms]} -> | |
Terms | |
end. | |
%% save the State to a file | |
save(State) -> | |
Config = configuration:get_config(), | |
Filename = filename:join([Config#config.directory, | |
lists:concat([node:name(State#membership.node), ".state"])]), | |
{ok, File} = file:open(Filename, [binary, write]), | |
io:format(File, "~w.~n", [State]), | |
file:close(File). | |
%% joining is bi-directional, as opposed to gossip which is unidirectional | |
%% we want to collect the list of known nodes to compute the partition map | |
%% which isn't necessarily the same as the list of running nodes | |
join_to(Node, Partners, Hints) -> | |
join_to(Node, Partners, | |
{vector_clock:create(node()), []}, Hints). | |
join_to(_, [], {Version, World}, _Hints) -> | |
{Version, World}; | |
join_to(Node, [Remote|Partners], {Version, World}, Hints) -> | |
case call_join(Remote, Node, Hints) of | |
{'EXIT', _} -> | |
join_to(Node, Partners, {Version, World}, Hints); | |
{RemoteVersion, NewNodes} -> | |
join_to(Node, Partners, { | |
vector_clock:merge(Version, RemoteVersion), | |
lists:usort(World ++ NewNodes)}); | |
Val -> | |
?debugFmt("join_to unexpected value: ~p~n", [Val]) | |
end. | |
call_join(Remote, Node, Hints) -> | |
catch gen_server:call({membership, node:name(Remote)}, | |
{join, Node, Hints}). | |
merge_pmaps(RemoteVersion, RemotePMap, _RemoteNode, | |
LocalVersion, LocalPMap, _LocalNode) -> | |
case vector_clock:compare(RemoteVersion, LocalVersion) of | |
equal -> | |
{equal, LocalVersion, LocalPMap}; | |
less -> | |
{equal, LocalVersion, LocalPMap}; | |
greater -> | |
MergedVersion = vector_clock:merge(RemoteVersion, LocalVersion), | |
{merged, MergedVersion, RemotePMap}; | |
concurrent -> | |
%% TODO: use clocks/nodes to determine proper map to return | |
MergedVersion = vector_clock:merge(RemoteVersion, LocalVersion), | |
{merged, MergedVersion, RemotePMap} | |
end. | |
merge_servers(RemoteVersion, RemoteServerList, RemoteNode, | |
Op, Ref, Part, Pid, | |
LocalVersion, LocalServers, LocalNode) -> | |
LocalServerList = servers_to_list(LocalServers), | |
%% ?debugFmt("~n" | |
%% "RemoteVersion : ~p~n" | |
%% "RemoteNode : ~p~n" | |
%% "RemoteServerList: ~p~n" | |
%% "LocalVersion : ~p~n" | |
%% "LocalNode : ~p~n" | |
%% "LocalServerList : ~p~n" | |
%% , [RemoteVersion, RemoteNode, RemoteServerList, | |
%% LocalVersion, LocalNode, LocalServerList]), | |
case vector_clock:compare(RemoteVersion, LocalVersion) of | |
equal -> | |
?debugMsg("remote clock EQUAL"), | |
{equal, LocalVersion, LocalServerList}; | |
less -> | |
?debugMsg("remote clock LESS"), | |
{equal, LocalVersion, LocalServerList}; | |
greater -> | |
?debugMsg("remote clock GREATER"), | |
%% TODO: separate fun, that this, concurrent, and register cast use | |
ets:Op(LocalServers, {Part, Pid}), | |
ets:Op(LocalServers, {Ref, Part, Pid}), | |
MergedServerList = merge_server_lists(RemoteServerList, | |
servers_to_list(LocalServers)), | |
MergedVersion = vector_clock:merge(RemoteVersion, LocalVersion), | |
{merged, MergedVersion, MergedServerList}; | |
concurrent -> | |
?debugMsg("remote clock CONCURRENT"), | |
%% TODO: use clocks/nodes to determine proper map to return | |
%% TODO: separate fun, that this, greater, and register cast use | |
ets:Op(LocalServers, {Part, Pid}), | |
ets:Op(LocalServers, {Ref, Part, Pid}), | |
MergedServerList = merge_server_lists(RemoteServerList, | |
servers_to_list(LocalServers)), | |
MergedVersion = vector_clock:merge(RemoteVersion, LocalVersion), | |
{merged, MergedVersion, MergedServerList} | |
end. | |
merge_server_lists(Remote, Local) -> | |
%% assuming they're sorted by servers_to_list/1 | |
lists:usort(lists:merge(Local, Remote)). | |
fire_gossip(Me, WorldNodes, Type, Gossip) -> | |
Partners = replication:partners(Me, WorldNodes), | |
GossipPartners = partners_plus(Me, Partners, WorldNodes), | |
lists:foreach(fun(TargetNode) -> | |
gen_server:cast({membership, TargetNode}, {Type,Gossip,Me}) | |
end, | |
GossipPartners). | |
%% return a list of live/up Partners, and if all Partners are down, | |
%% walk the ring to get one other remote node and return it. | |
partners_plus(Node, Partners, WorldNodes) -> | |
PartnersDown = lists:subtract(Partners, nodes()), | |
PartnersUp = lists:subtract(Partners, PartnersDown), | |
case PartnersUp of | |
[] -> | |
TargetNodes = replication:target_list(Node, WorldNodes), | |
NonPartners = lists:subtract(TargetNodes, | |
lists:flatten([Node, Partners])), | |
walk_ring(NonPartners); | |
_ -> | |
%% at least one partner is up, so gossip w/ them | |
PartnersUp | |
end. | |
walk_ring([]) -> | |
%% TODO: should we be more forceful here and throw? not for now | |
?infoFmt("~p:walk_ring/1 - could not find node for gossip~n", [?MODULE]), | |
[]; | |
walk_ring([Node|Rest]) -> | |
case lists:member(Node, nodes()) of | |
true -> [Node]; | |
_ -> walk_ring(Rest) | |
end. | |
server_list_into_table(ServerList, Servers) -> | |
lists:foreach(fun({Partition, Pid}) -> | |
ets:insert(Servers, {Partition, Pid}) | |
end, ServerList). | |
int_partitions_for_node(Node, State, master) -> | |
Partitions = State#membership.partitions, | |
{Matching,_} = lists:partition(fun({N,_}) -> N == Node end, Partitions), | |
lists:map(fun({_,P}) -> P end, Matching); | |
int_partitions_for_node(Node, State, all) -> | |
Config = configuration:get_config(), | |
Nodes = State#membership.nodes, | |
Partners = replication:partners(Node, Nodes, Config), | |
lists:foldl( | |
fun(E, Acc) -> | |
lists:merge(Acc, int_partitions_for_node(E, State, master)) | |
end, [], lists:flatten([Node, Partners])). | |
%% @doc get the partition table/map from some remote nodes, or | |
%% create a new one | |
%% @end | |
%% TODO: do vector clocks need to be here, to ensure we get most | |
%% up-to-date table? | |
get_init_pmap(Node, Partners, WorldNodes, Config) -> | |
case length(Partners) of | |
0 -> | |
partitions:create_partitions(Config#config.q, Node, WorldNodes); | |
_ -> | |
case get_table_from_remotes(Partners) of | |
{ok, Table} -> Table; | |
_Error -> | |
%% ok, we got some error, so we're gonna provide a fresh | |
%% parts table. Hopefully gossip fixes this | |
partitions:create_partitions(Config#config.q, Node, WorldNodes) | |
end | |
end. | |
get_table_from_remotes([]) -> | |
no_table_from_remotes; | |
%% throw({partition_table_error, "could not get partition table from remotes"}); | |
get_table_from_remotes([Remote|Rest]) -> | |
try | |
case gen_server:call({membership, Remote}, partitions) of | |
{ok, Table} -> | |
{ok, Table}; | |
_Other -> | |
get_table_from_remotes(Rest) | |
end | |
catch _:_ -> | |
get_table_from_remotes(Rest) | |
end. | |
%% | |
%% internal tests | |
%% | |
int_parts_for_node_self_test() -> | |
%% TODO: effigy mock configuration N=3 | |
Node = a, | |
Partitions = [{a,1},{a,2},{a,3}], | |
State = #membership{node=Node, nodes=[a], partitions=Partitions}, | |
?assertEqual([1,2,3], int_partitions_for_node(Node, State, all)), | |
ok. | |
int_parts_for_node_partners_test() -> | |
%% TODO: effigy mock configuration N=2 | |
Node = a, | |
Partitions = [{a,1},{a,2},{b,3},{b,4},{c,5},{c,6}], | |
State = #membership{node=Node, nodes=[a,b], partitions=Partitions}, | |
?assertEqual([1,2,3,4], int_partitions_for_node(Node, State, all)), | |
ok. | |
int_parts_for_node_bigring_test() -> | |
%% TODO: effigy mock configuration N=3 | |
Node = a, | |
Partitions = [{a,1},{a,2},{b,3},{b,4},{c,5},{c,6},{d,7},{d,8}], | |
State = #membership{node=Node, nodes=[a,b,c,d], partitions=Partitions}, | |
?assertEqual([1,2,3,4,5,6], int_partitions_for_node(Node, State, all)), | |
ok. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment