Created
July 21, 2009 03:17
-
-
Save boorad/151090 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
%%% -*- erlang-indent-level:2 -*- | |
%%%------------------------------------------------------------------- | |
%%% File: membership2.erl | |
%%% @author Cliff Moon <[email protected]> [] | |
%%% @copyright 2009 Cliff Moon | |
%%% @doc | |
%%% | |
%%% @end | |
%%% | |
%%% @since 2009-05-04 by Cliff Moon | |
%%%------------------------------------------------------------------- | |
-module(membership2). | |
-author('[email protected]'). | |
-author('[email protected]'). | |
-behaviour(gen_server). | |
%% API | |
-export([start_link/2, start_link/3, register/2, servers_for_key/1, stop/1, | |
listen/1, partitions/0]). | |
%% gen_server callbacks | |
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, | |
terminate/2, code_change/3]). | |
%% testing exports | |
-export([servers_to_list/1]). | |
%% includes | |
-include("../include/config.hrl"). | |
-include("../include/common.hrl"). | |
-include_lib("eunit/include/eunit.hrl"). | |
-define(GOSSIP_INFO, false). | |
%%==================================================================== | |
%% API | |
%%==================================================================== | |
%% @doc Starts the server | |
%% @end | |
%%-------------------------------------------------------------------- | |
start_link(Node, Nodes) -> | |
start_link(Node, Nodes, []). | |
start_link(Node, Nodes, Args) -> | |
gen_server:start_link({local, membership}, ?MODULE, [Node, Nodes, Args], []). | |
register(Partition, Pid) -> | |
gen_server:cast(membership, {register, Partition, Pid}). | |
servers_for_key(Key) -> | |
gen_server:call(membership, {servers_for_key, Key}). | |
stop(Server) -> | |
gen_server:cast(Server, stop). | |
listen(Pid) -> | |
gen_server:call(membership, {listen, Pid}). | |
partitions() -> | |
gen_server:call(membership, partitions). | |
%%==================================================================== | |
%% gen_server callbacks | |
%%==================================================================== | |
%%-------------------------------------------------------------------- | |
%% @spec init(Args) -> {ok, State} | | |
%% {ok, State, Timeout} | | |
%% ignore | | |
%% {stop, Reason} | |
%% @doc Initiates the server | |
%% @end | |
%%-------------------------------------------------------------------- | |
init([Node, Nodes, Args]) -> | |
process_flag(trap_exit,true), | |
Config = configuration:get_config(), | |
PersistentNodes = load(Node), | |
PartialNodes = lists:usort(Nodes ++ PersistentNodes), | |
Partners = replication:partners(Node, Nodes, Config), | |
Servers = ets:new(member_servers, [public, bag]), | |
Hints = proplists:get_value(hints, Args, []), | |
%% Hints = showroom_brain:hints(PartialNodes) TODO | |
{Version, RemoteNodes} = join_to(Node, Servers, Partners, Hints), | |
WorldNodes = lists:usort(PartialNodes ++ RemoteNodes), | |
PMap = get_init_pmap(Node, Partners, WorldNodes, Config), | |
State = #membership{ | |
node=Node, | |
nodes=WorldNodes, | |
partitions=PMap, | |
version=vector_clock:increment(pid_to_list(self()), Version), | |
servers=Servers}, | |
save(State), | |
?infoMsg("Loading storage servers.~n"), | |
storage_manager:load(Nodes, State#membership.partitions, | |
int_partitions_for_node(Node, State, all)), | |
{ok, State}. | |
%%-------------------------------------------------------------------- | |
%% @spec | |
%% handle_call(Request, From, State) -> {reply, Reply, State} | | |
%% {reply, Reply, State, Timeout} | | |
%% {noreply, State} | | |
%% {noreply, State, Timeout} | | |
%% {stop, Reason, Reply, State} | | |
%% {stop, Reason, State} | |
%% @doc Handling call messages | |
%% @end | |
%%-------------------------------------------------------------------- | |
%% join | |
handle_call({join, JoiningNode, Hints}, _From, | |
State = #membership{version=Version, node=Node, nodes=Nodes, | |
servers=Servers, partitions=Partitions}) -> | |
Config = configuration:get_config(), | |
WorldNodes = lists:usort(Nodes ++ [JoiningNode]), | |
PMap = case partitions:join(JoiningNode, Partitions, Hints) of | |
{ok, Table} -> Table; | |
{error, Error, _Table} -> throw({join_error, Error}) | |
end, | |
ServerList = servers_to_list(Servers), | |
NewVersion = vector_clock:increment(pid_to_list(self()), Version), | |
NewState = State#membership{nodes=WorldNodes, partitions=PMap, | |
version=NewVersion}, | |
%% clean up this node's storage servers, after JoiningNode just joined. | |
storage_manager:load(Nodes, PMap, | |
int_partitions_for_node(Node, NewState, all)), | |
Info = case ?GOSSIP_INFO of | |
false -> []; | |
_ -> | |
[{type, join} | |
, {src, Node} | |
, {joining_node, JoiningNode} | |
, {old_pmap, Partitions} | |
, {new_pmap, PMap} | |
, {old_version, Version} | |
, {new_version, NewVersion} | |
, {servers, Servers} | |
] | |
end, | |
fire_gossip(Node, NewState, Config, Info), | |
{reply, {NewVersion, WorldNodes, ServerList}, NewState}; | |
%% listen | |
handle_call({listen, Pid}, _From, | |
State = #membership{listeners=Listeners, partitions=PMap}) -> | |
{reply, PMap, State#membership{listeners=lists:usort([Pid|Listeners])}}; | |
%% servers_for_key | |
handle_call({servers_for_key, Key}, _From, | |
State = #membership{servers=Servers}) -> | |
Config = configuration:get_config(), | |
Hash = lib_misc:hash(Key), | |
Partition = partitions:hash_to_partition(Hash, Config#config.q), | |
?debugFmt("getting for partition ~p", [Partition]), | |
{_, Pids} = lists:unzip(ets:lookup(Servers, Partition)), | |
{reply, Pids, State}; | |
%% state | |
handle_call(state, _From, State) -> | |
{reply, State, State}; | |
%% partitions | |
handle_call(partitions, _From, State = #membership{partitions=Parts}) -> | |
{reply, {ok, Parts}, State}. | |
%%-------------------------------------------------------------------- | |
%% @spec handle_cast(Msg, State) -> {noreply, State} | | |
%% {noreply, State, Timeout} | | |
%% {stop, Reason, State} | |
%% @doc Handling cast messages | |
%% @end | |
%%-------------------------------------------------------------------- | |
%% gossip | |
handle_cast({gossip, Version, Nodes, ServerList, PMap, Info}, | |
State = #membership{node=Me}) -> | |
case ?GOSSIP_INFO of | |
false -> ok; | |
_ -> | |
?debugFmt("~nreceive gossip...~n" | |
"Self : ~p~n" | |
"Info : ~p~n" | |
,[self(), Info]) | |
end, | |
{MergeType, Merged} = merge_state(Version, Nodes, ServerList, PMap, State), | |
case MergeType of | |
equal -> | |
%% ?debugMsg("Gossip 'equal'"), | |
{noreply, Merged}; | |
merged -> | |
storage_manager:load(Nodes, Merged#membership.partitions, | |
int_partitions_for_node(Me, Merged, all)), | |
NewInfo = case ?GOSSIP_INFO of | |
false -> []; | |
_ -> | |
[{type, gossip} | |
, {src, Me} | |
, {old_pmap, PMap} | |
, {new_pmap, Merged#membership.partitions} | |
, {old_version, Version} | |
, {new_version, Merged#membership.version} | |
, {servers, ServerList} | |
] | |
end, | |
fire_gossip(Me, Merged, configuration:get_config(), NewInfo), | |
publish_map_to_listeners(Merged), | |
{noreply, Merged} | |
end; | |
%% register | |
handle_cast({register, Partition, Pid}, | |
State = #membership{node=Me, servers=Servers, version=Version}) -> | |
?debugFmt("~nRegistering ~p~n", [{Partition, Pid}]), | |
Ref = erlang:monitor(process, Pid), | |
ets:insert(Servers, {Partition, Pid}), | |
ets:insert(Servers, {Ref, Partition, Pid}), | |
NewState = State#membership{node=Me, | |
version=vector_clock:increment(pid_to_list(self()), Version)}, | |
Info = [ {type, register} | |
, {src, Me} | |
, {servers, servers_to_list(Servers)} | |
], | |
fire_gossip(Me, NewState, configuration:get_config(), Info), | |
{noreply, NewState}; | |
%% stop | |
handle_cast(stop, State) -> | |
{stop, normal, State}. | |
%%-------------------------------------------------------------------- | |
%% @spec handle_info(Info, State) -> {noreply, State} | | |
%% {noreply, State, Timeout} | | |
%% {stop, Reason, State} | |
%% @doc Handling all non call/cast messages | |
%% @end | |
%%-------------------------------------------------------------------- | |
handle_info({'DOWN', Ref, _, Pid, _}, | |
State = #membership{node=Me, servers=Servers, | |
version=Version}) -> | |
erlang:demonitor(Ref), | |
[{Ref, Partition, Pid}] = ets:lookup(Servers, Ref), | |
ets:delete_object(Servers, {Ref, Partition, Pid}), | |
ets:delete_object(Servers, {Partition, Pid}), | |
?debugFmt("Pid is down ~p", [{Ref, Partition, Pid}]), | |
?debugFmt("~p", [ets:lookup(Servers, Partition)]), | |
NewState = State#membership{node=Me, | |
version=vector_clock:increment(pid_to_list(self()), Version)}, | |
Info = [ {type, 'DOWN'} | |
, {src, Me} | |
, {servers, servers_to_list(Servers)} | |
], | |
fire_gossip(Me, NewState, configuration:get_config(), Info), | |
{noreply, State}; | |
handle_info(Info, State) -> | |
?debugFmt("~nInfo: ~p~n", [Info]), | |
{noreply, State}. | |
%%-------------------------------------------------------------------- | |
%% @spec terminate(Reason, State) -> void() | |
%% @doc This function is called by a gen_server when it is about to | |
%% terminate. It should be the opposite of Module:init/1 and do any necessary | |
%% cleaning up. When it returns, the gen_server terminates with Reason. | |
%% The return value is ignored. | |
%% @end | |
%%-------------------------------------------------------------------- | |
terminate(_Reason, _State) -> | |
ok. | |
%%-------------------------------------------------------------------- | |
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} | |
%% @doc Convert process state when code is changed | |
%% @end | |
%%-------------------------------------------------------------------- | |
code_change(_OldVsn, State, _Extra) -> | |
{ok, State}. | |
%%-------------------------------------------------------------------- | |
%%% Internal functions | |
%%-------------------------------------------------------------------- | |
%% return list of known nodes from membership file | |
load(Node) -> | |
Config = configuration:get_config(), | |
case file:consult(filename:join([Config#config.directory, | |
lists:concat([node:name(Node), ".world"])])) of | |
{error, Reason} -> | |
?infoFmt("Could not load state: ~p~n", [Reason]), | |
[]; | |
{ok, [Terms]} -> | |
Terms | |
end. | |
%% save the list of known nodes to a file | |
save(State) -> | |
Config = configuration:get_config(), | |
Filename = filename:join([Config#config.directory, | |
lists:concat([node:name(State#membership.node), ".world"])]), | |
{ok, File} = file:open(Filename, [binary, write]), | |
io:format(File, "~w.~n", [State#membership.nodes]), | |
file:close(File). | |
%% joining is bi-directional, as opposed to gossip which is unidirectional | |
%% we want to collect the list of known nodes to compute the partition map | |
%% which isn't necessarily the same as the list of running nodes | |
join_to(Node, Servers, Partners, Hints) -> | |
%% ?debugFmt("~njoin_to~n" | |
%% "Node : ~p~n" | |
%% "Servers : ~p~n" | |
%% "Partners : ~p~n" | |
%% , [Node, Servers, Partners]), | |
join_to(Node, Servers, Partners, | |
{vector_clock:create(pid_to_list(self())), []}, Hints). | |
join_to(_, _, [], {Version, World}, _Hints) -> | |
{Version, World}; | |
join_to(Node, Servers, [Remote|Partners], {Version, World}, Hints) -> | |
case call_join(Remote, Node, Hints) of | |
{'EXIT', _} -> | |
join_to(Node, Servers, Partners, {Version, World}, Hints); | |
{RemoteVersion, NewNodes, ServerList} -> | |
server_list_into_table(ServerList, Servers), | |
join_to(Node, Servers, Partners, { | |
vector_clock:merge(Version, RemoteVersion), | |
lists:usort(World ++ NewNodes)}); | |
Val -> | |
?debugFmt("join_to unexpected value: ~p~n", [Val]) | |
end. | |
call_join(Remote, Node, Hints) -> | |
catch gen_server:call({membership, node:name(Remote)}, | |
{join, Node, Hints}). | |
merge_state(RemoteVersion, RemoteNodes, RemoteServerList, RemotePMap, | |
State = #membership{nodes=Nodes, version=LocalVersion, servers=Servers, | |
partitions=LocalPMap}) -> | |
case Compare = vector_clock:compare(RemoteVersion, LocalVersion) of | |
equal -> {equal, State}; | |
_ -> | |
MergedNodes = lists:usort(RemoteNodes ++ Nodes), | |
server_list_into_table(RemoteServerList, Servers), | |
MergedClock = vector_clock:merge(RemoteVersion, LocalVersion), | |
MergedPMap = case Compare of | |
less -> LocalPMap; | |
_ -> RemotePMap | |
end, | |
{merged, State#membership{nodes=MergedNodes,version=MergedClock, | |
partitions=MergedPMap}} | |
end. | |
fire_gossip(Me, State = #membership{nodes = Nodes}, Config, Info) -> | |
Partners = replication:partners(Me, Nodes, Config), | |
lists:foreach(fun(Node) -> gossip_with(Me, Node, State, Info) end, Partners). | |
gossip_with(_Me, OtherNode, | |
#membership{version=Version, nodes=Nodes, servers=Servers, | |
partitions=PMap}, Info) -> | |
ServerPacket = servers_to_list(Servers), | |
cast_gossip(OtherNode, Version, Nodes, ServerPacket, PMap, Info). | |
cast_gossip(OtherNode, Version, Nodes, ServerPacket, PMap, Info) -> | |
case ?GOSSIP_INFO of | |
false -> ok; | |
_ -> | |
?debugFmt("~nfire_gossip:~n" | |
"Self : ~p~n" | |
"Info : ~p~n" | |
"Target: ~p~n" | |
, [self(), Info, OtherNode]) | |
end, | |
gen_server:cast({membership, OtherNode}, {gossip, Version, Nodes, | |
ServerPacket, PMap, Info}). | |
%% this gets everything we know of, not just locals | |
servers_to_list(Servers) -> | |
L = ets:foldl(fun | |
({Partition, Pid}, List) -> | |
[{Partition, Pid}|List]; | |
({_Ref, _Partition, _Pid}, List) -> | |
List | |
end, [], Servers), | |
lists:keysort(1, L). | |
server_list_into_table(ServerList, Servers) -> | |
lists:foreach(fun({Partition, Pid}) -> | |
ets:insert(Servers, {Partition, Pid}) | |
end, ServerList). | |
publish_map_to_listeners(#membership{partitions=PMap, listeners=Listeners, | |
node=Node}) -> | |
%% ?debugFmt("~nPublish New PMap~n" | |
%% "Listeners : ~p~n" | |
%% , [Listeners]), | |
lists:foreach(fun(Pid) -> | |
gen_server:cast(Pid, {remap, Node, PMap}) | |
end, Listeners). | |
int_partitions_for_node(Node, State, master) -> | |
Partitions = State#membership.partitions, | |
{Matching,_} = lists:partition(fun({N,_}) -> N == Node end, Partitions), | |
lists:map(fun({_,P}) -> P end, Matching); | |
int_partitions_for_node(Node, State, all) -> | |
Config = configuration:get_config(), | |
Nodes = State#membership.nodes, | |
Partners = replication:partners(Node, Nodes, Config), | |
%% ?debugFmt("~n" | |
%% "Nodes : ~p~n" | |
%% "Partners: ~p~n" | |
%% , [Nodes, Partners]), | |
lists:foldl( | |
fun(E, Acc) -> | |
lists:merge(Acc, int_partitions_for_node(E, State, master)) | |
end, [], lists:flatten([Node, Partners])). | |
%% @doc get the partition table/map from some remote nodes, or | |
%% create a new one | |
%% @end | |
%% TODO: do vector clocks need to be here, to ensure we get most | |
%% up-to-date table? | |
get_init_pmap(Node, RemoteNodes, WorldNodes, Config) -> | |
case length(RemoteNodes) of | |
0 -> | |
partitions:create_partitions(Config#config.q, Node, WorldNodes); | |
_ -> | |
case get_table_from_remotes(RemoteNodes) of | |
{ok, Table} -> Table; | |
_Error -> | |
%% ok, we got some error, so we're gonna provide a fresh | |
%% parts table. Hopefully gossip fixes this | |
partitions:create_partitions(Config#config.q, Node, WorldNodes) | |
end | |
end. | |
get_table_from_remotes([]) -> | |
no_table_from_remotes; | |
%% throw({partition_table_error, "could not get partition table from remotes"}); | |
get_table_from_remotes([Remote|Rest]) -> | |
try | |
case gen_server:call({membership, Remote}, partitions) of | |
{ok, Table} -> | |
{ok, Table}; | |
_Other -> | |
get_table_from_remotes(Rest) | |
end | |
catch _:_ -> | |
get_table_from_remotes(Rest) | |
end. | |
%% | |
%% internal tests | |
%% | |
int_parts_for_node_self_test() -> | |
%% TODO: effigy mock configuration N=3 | |
Node = a, | |
Partitions = [{a,1},{a,2},{a,3}], | |
State = #membership{node=Node, nodes=[a], partitions=Partitions}, | |
?assertEqual([1,2,3], int_partitions_for_node(Node, State, all)), | |
ok. | |
int_parts_for_node_partners_test() -> | |
%% TODO: effigy mock configuration N=2 | |
Node = a, | |
Partitions = [{a,1},{a,2},{b,3},{b,4},{c,5},{c,6}], | |
State = #membership{node=Node, nodes=[a,b], partitions=Partitions}, | |
?assertEqual([1,2,3,4], int_partitions_for_node(Node, State, all)), | |
ok. | |
int_parts_for_node_bigring_test() -> | |
%% TODO: effigy mock configuration N=3 | |
Node = a, | |
Partitions = [{a,1},{a,2},{b,3},{b,4},{c,5},{c,6},{d,7},{d,8}], | |
State = #membership{node=Node, nodes=[a,b,c,d], partitions=Partitions}, | |
?assertEqual([1,2,3,4,5,6], int_partitions_for_node(Node, State, all)), | |
ok. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment