Created
May 28, 2012 16:00
-
-
Save sumerman/2819848 to your computer and use it in GitHub Desktop.
Primitive config distribution server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(sp_config_srv). | |
-behaviour(gen_server). | |
-define(NODEUP_PAUSE, 100). | |
%% ------------------------------------------------------------------ | |
%% API Function Exports | |
%% ------------------------------------------------------------------ | |
-export([ | |
start_link/2, | |
get/2, | |
replace/2, | |
value/2, | |
get_whole_scope/1, | |
set_post_hook/2 | |
]). | |
%% ------------------------------------------------------------------ | |
%% gen_server Function Exports | |
%% ------------------------------------------------------------------ | |
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, | |
terminate/2, code_change/3]). | |
-export([global_name_resolver/3]). | |
%% ------------------------------------------------------------------ | |
%% Types | |
%% ------------------------------------------------------------------ | |
-type scope() :: atom() | pid(). | |
-type key() :: any(). | |
-type value() :: any(). | |
-type kvlist() :: [{key(), value()}]. | |
-type post_hook_f() :: fun(([key()]) -> any()). | |
-type mfa_hook() :: {module(), atom(), [any()]}. | |
-type post_hook() :: none | undefined | mfa_hook() | post_hook_f(). | |
-record(state, { | |
dat_tab :: ets:tab(), | |
scope :: scope(), | |
mode = local :: global | local, | |
hooks = [] :: [post_hook()] | |
}). | |
%% ------------------------------------------------------------------ | |
%% API Function Definitions | |
%% ------------------------------------------------------------------ | |
-type option() :: | |
{mode, local | global} | | |
{bootstrap, mfa_hook()}| | |
{post_hook, post_hook()}. | |
-type options() :: [option()]. | |
-spec start_link(scope(), options()) -> | |
{ok, pid()} | ignore | {error, term()}. | |
start_link(ScopeName, Opt) -> | |
gen_server:start_link({local, ScopeName}, ?MODULE, [{scope, ScopeName} | Opt], []). | |
-spec get(scope(), [key()]) -> kvlist(). | |
get(Scope, Keys) when is_list(Keys) -> | |
get_values(Scope, Keys). | |
-spec value(scope(), key()) -> value() | undefined. | |
value(Scope, K) -> | |
PL = ?MODULE:get(Scope, [K]), | |
proplists:get_value(K, PL, undefined). | |
-spec replace(scope(), kvlist()) -> | |
{ok, [node()]} | {ok, [node()], warning, [node()]}. | |
replace(Scope, KVs) -> | |
gen_server:call(Scope, {replace, KVs}). | |
-spec get_whole_scope(scope()) -> kvlist(). | |
get_whole_scope(Scope) -> | |
kvlist_for_scope(Scope). | |
-spec set_post_hook(scope(), post_hook()) -> ok. | |
set_post_hook(Scope, Post) -> | |
gen_server:call(Scope, {set_post_hook, Post}). | |
%% ------------------------------------------------------------------ | |
%% gen_server Function Definitions | |
%% ------------------------------------------------------------------ | |
init(Opt) -> | |
Name = proplists:get_value(scope, Opt), | |
Mode = proplists:get_value(mode, Opt, local), | |
Hooks = proplists:get_all_values(post_hook, Opt), | |
Data = ets:new(Name, [named_table, protected, {read_concurrency, true}]), | |
KVs = case proplists:get_value(bootstrap, Opt) of | |
undefined -> []; | |
{M, F, A} -> erlang:apply(M, F, A); | |
F when is_function(F) -> F() | |
end, | |
ets:insert(Data, KVs), | |
(Mode == global) andalso | |
(KVs /= []) andalso | |
try_reg(self()), | |
put('$_start_time', erlang:now()), | |
{ok, #state{ | |
dat_tab = Data, | |
scope = Name, | |
mode = Mode, | |
hooks = Hooks | |
}}. | |
handle_call({set_post_hook, Post}, _From, #state{ hooks=H } = State) -> | |
H1 = lists:usort([Post | H]), | |
{reply, ok, State#state{ hooks=H1 }}; | |
handle_call({replace, KVs}, _From, | |
#state{ mode=global, scope=Scope } = State) -> | |
R = case get_master(Scope) of | |
P when P == self() -> | |
set_values(State, KVs); | |
P when is_pid(P) -> | |
replace(P, KVs) | |
end, | |
{reply, R, State}; | |
handle_call({replace, KVs}, _From, State) -> | |
R = set_values(State, KVs), | |
{reply, R, State}; | |
handle_call({remote_insert, KVs}, From, State) -> | |
?LOG_INF("Config pushed from ~p", [From]), | |
update_values(State, KVs), | |
{reply, ok, State}; | |
handle_call(Request, _From, State) -> | |
error_logger:error_msg("[~p] Unexpected call ~p", [?MODULE, Request]), | |
{noreply, ok, State}. | |
handle_cast(try_reg, State) -> | |
net_kernel:monitor_nodes(true), | |
GName = global_name(State#state.scope), | |
ResMF = fun ?MODULE:global_name_resolver/3, | |
case global:register_name(GName, self(), ResMF) of | |
yes -> | |
?LOG_INF("~p elected as master for ~p.", | |
[node(), State#state.scope]), | |
[self() ! {nodeup, N} || N <- nodes()]; | |
no -> | |
void | |
end, | |
{noreply, State}; | |
handle_cast(Request, State) -> | |
error_logger:error_msg("[~p] Unexpected cast ~p", [?MODULE, Request]), | |
{noreply, State}. | |
handle_info({nodedown, _Node}, #state{ mode=global } = State) -> | |
try_reg(self()), | |
{noreply, State}; | |
handle_info({nodeup, _Node} = M, #state{ mode=global, scope=Scope } = State) -> | |
(get_master(Scope) == self()) andalso begin | |
KVs = kvlist_for_scope(Scope), | |
case send_values(Scope, KVs) of | |
{ok, _Nodes} -> ok; | |
{ok, _Nodes, warning, _FNodes} -> | |
erlang:send_after(?NODEUP_PAUSE, self(), M) | |
end | |
end, | |
{noreply, State}; | |
handle_info(Info, State) -> | |
error_logger:error_msg("[~p] Unexpected message ~p", [?MODULE, Info]), | |
{noreply, State}. | |
terminate(_Reason, _State) -> | |
ok. | |
code_change(_OldVsn, State, _Extra) -> | |
{ok, State}. | |
%% ------------------------------------------------------------------ | |
%% Internal Function Definitions | |
%% ------------------------------------------------------------------ | |
-spec post_commit(#state{}, kvlist()) -> [any()]. | |
post_commit(State, KVs) -> | |
Ks = [K || {K,_V} <- KVs], | |
Hs = State#state.hooks, | |
[notify(H, Ks) || H <- Hs]. | |
-spec notify(post_hook(), [key()]) -> any(). | |
notify(F, Ks) -> | |
try notify_(F, Ks) catch | |
E:W -> | |
?LOG_ERR("Notifier ~p failed with reason: ~p", [F, {E,W}]) | |
end. | |
notify_({M,F,A}, Ks) when is_list(A) -> | |
apply(M, F, A ++ [Ks]); | |
notify_(F, Ks) when is_function(F) -> | |
F(Ks); | |
notify_(_, _) -> ok. | |
-spec get_values(atom(), [any()]) -> [{any(), any()}]. | |
get_values(Scope, Keys) -> | |
lists:concat([get_value(Scope, Ki) || Ki <- Keys]). | |
-spec get_value(atom(), any()) -> [{any(), any()}]. | |
get_value(Scope, Key) -> | |
case ets:lookup(Scope, Key) of | |
[] -> [{Key, undefined}]; | |
R -> [{Key, Val} || {_K, Val} <- R] | |
end. | |
-spec update_values(#state{}, kvlist()) -> ok. | |
update_values(#state{ scope=Scope } = St, KVs) -> | |
Ks = [K || {K,_V} <- KVs], | |
?LOG_DBG("Inserting values for keys: ~p", [Ks]), | |
% build list 'undefined' of with keys of old values | |
Old = [{Ki, undefined} | |
|| {Ki,_V} <- kvlist_for_scope(Scope)], | |
% merge it | |
Res = lists:ukeymerge(1, lists:usort(KVs), lists:usort(Old)), | |
% publish changes *atomically* | |
ets:insert(Scope, Res), | |
% *eventually* get rid of 'undefined' values in ETS | |
% but it shouldn't change anything from the client's POV | |
% since the getter must return 'undefined' anyway | |
ets:match_delete(Scope, {'_', undefined}), | |
% do post-commit actions | |
post_commit(St, KVs), | |
ok. | |
-spec set_values(#state{}, kvlist()) -> | |
{ok, [node()]} | {ok, [node()], warning, [node()]}. | |
set_values(#state{ mode=global, scope=Scope } = St, KVs) -> | |
global:trans({{?MODULE, Scope}, self()}, | |
fun() -> | |
update_values(St, KVs), | |
spawn_link( | |
fun() -> send_values(Scope, KVs) end), | |
{ok} | |
end); | |
set_values(St, KVs) -> | |
update_values(St, KVs), | |
{ok, []}. | |
send_values(Scope, KVs) -> | |
N = nodes(), | |
{_OKs, BADs} = gen_server:multi_call( | |
N, Scope, | |
{remote_insert, KVs}), | |
case BADs of | |
[] -> | |
{ok, N}; | |
L -> | |
?LOG_ERR( | |
"Unable to push ~p config to the next nodes:~p", | |
[Scope, L]), | |
{ok, N--L, warning, L} | |
end. | |
kvlist_for_scope(Scope) -> | |
ets:tab2list(Scope). | |
try_reg(ID) -> | |
gen_server:cast(ID, try_reg). | |
start_time(Pid) when is_pid(Pid) -> | |
{dictionary, D} = rpc:call(node(Pid), erlang, process_info, [Pid, dictionary]), | |
{ok, proplists:get_value('$_start_time', D, {0, 0, 0})}; | |
start_time(ID) -> | |
start_time(whereis(ID)). | |
global_name_resolver(Name, P1, P2) -> | |
?LOG_INF("Resolving config master for ~p", [Name]), | |
{ok, T1} = start_time(P1), | |
{ok, T2} = start_time(P2), | |
if | |
T1 =< T2 -> P1; | |
true -> P2 | |
end. | |
global_name(Scope) -> {?MODULE, Scope}. | |
get_master(Scope) -> | |
global:sync(), | |
case global:whereis_name(global_name(Scope)) of | |
undefined -> | |
try_reg(self()), | |
get_master(Scope); | |
P when is_pid(P) -> P | |
end. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment