Skip to content

Instantly share code, notes, and snippets.

@sumerman
Created May 28, 2012 16:00
Show Gist options
  • Save sumerman/2819848 to your computer and use it in GitHub Desktop.
Save sumerman/2819848 to your computer and use it in GitHub Desktop.
Primitive config distribution server
-module(sp_config_srv).
-behaviour(gen_server).
-define(NODEUP_PAUSE, 100).
%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export([
start_link/2,
get/2,
replace/2,
value/2,
get_whole_scope/1,
set_post_hook/2
]).
%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([global_name_resolver/3]).
%% ------------------------------------------------------------------
%% Types
%% ------------------------------------------------------------------
-type scope() :: atom() | pid().
-type key() :: any().
-type value() :: any().
-type kvlist() :: [{key(), value()}].
-type post_hook_f() :: fun(([key()]) -> any()).
-type mfa_hook() :: {module(), atom(), [any()]}.
-type post_hook() :: none | undefined | mfa_hook() | post_hook_f().
-record(state, {
dat_tab :: ets:tab(),
scope :: scope(),
mode = local :: global | local,
hooks = [] :: [post_hook()]
}).
%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------
-type option() ::
{mode, local | global} |
{bootstrap, mfa_hook()}|
{post_hook, post_hook()}.
-type options() :: [option()].
-spec start_link(scope(), options()) ->
{ok, pid()} | ignore | {error, term()}.
start_link(ScopeName, Opt) ->
gen_server:start_link({local, ScopeName}, ?MODULE, [{scope, ScopeName} | Opt], []).
-spec get(scope(), [key()]) -> kvlist().
get(Scope, Keys) when is_list(Keys) ->
get_values(Scope, Keys).
-spec value(scope(), key()) -> value() | undefined.
value(Scope, K) ->
PL = ?MODULE:get(Scope, [K]),
proplists:get_value(K, PL, undefined).
-spec replace(scope(), kvlist()) ->
{ok, [node()]} | {ok, [node()], warning, [node()]}.
replace(Scope, KVs) ->
gen_server:call(Scope, {replace, KVs}).
-spec get_whole_scope(scope()) -> kvlist().
get_whole_scope(Scope) ->
kvlist_for_scope(Scope).
-spec set_post_hook(scope(), post_hook()) -> ok.
set_post_hook(Scope, Post) ->
gen_server:call(Scope, {set_post_hook, Post}).
%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------
init(Opt) ->
Name = proplists:get_value(scope, Opt),
Mode = proplists:get_value(mode, Opt, local),
Hooks = proplists:get_all_values(post_hook, Opt),
Data = ets:new(Name, [named_table, protected, {read_concurrency, true}]),
KVs = case proplists:get_value(bootstrap, Opt) of
undefined -> [];
{M, F, A} -> erlang:apply(M, F, A);
F when is_function(F) -> F()
end,
ets:insert(Data, KVs),
(Mode == global) andalso
(KVs /= []) andalso
try_reg(self()),
put('$_start_time', erlang:now()),
{ok, #state{
dat_tab = Data,
scope = Name,
mode = Mode,
hooks = Hooks
}}.
handle_call({set_post_hook, Post}, _From, #state{ hooks=H } = State) ->
H1 = lists:usort([Post | H]),
{reply, ok, State#state{ hooks=H1 }};
handle_call({replace, KVs}, _From,
#state{ mode=global, scope=Scope } = State) ->
R = case get_master(Scope) of
P when P == self() ->
set_values(State, KVs);
P when is_pid(P) ->
replace(P, KVs)
end,
{reply, R, State};
handle_call({replace, KVs}, _From, State) ->
R = set_values(State, KVs),
{reply, R, State};
handle_call({remote_insert, KVs}, From, State) ->
?LOG_INF("Config pushed from ~p", [From]),
update_values(State, KVs),
{reply, ok, State};
handle_call(Request, _From, State) ->
error_logger:error_msg("[~p] Unexpected call ~p", [?MODULE, Request]),
{noreply, ok, State}.
handle_cast(try_reg, State) ->
net_kernel:monitor_nodes(true),
GName = global_name(State#state.scope),
ResMF = fun ?MODULE:global_name_resolver/3,
case global:register_name(GName, self(), ResMF) of
yes ->
?LOG_INF("~p elected as master for ~p.",
[node(), State#state.scope]),
[self() ! {nodeup, N} || N <- nodes()];
no ->
void
end,
{noreply, State};
handle_cast(Request, State) ->
error_logger:error_msg("[~p] Unexpected cast ~p", [?MODULE, Request]),
{noreply, State}.
handle_info({nodedown, _Node}, #state{ mode=global } = State) ->
try_reg(self()),
{noreply, State};
handle_info({nodeup, _Node} = M, #state{ mode=global, scope=Scope } = State) ->
(get_master(Scope) == self()) andalso begin
KVs = kvlist_for_scope(Scope),
case send_values(Scope, KVs) of
{ok, _Nodes} -> ok;
{ok, _Nodes, warning, _FNodes} ->
erlang:send_after(?NODEUP_PAUSE, self(), M)
end
end,
{noreply, State};
handle_info(Info, State) ->
error_logger:error_msg("[~p] Unexpected message ~p", [?MODULE, Info]),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------
-spec post_commit(#state{}, kvlist()) -> [any()].
post_commit(State, KVs) ->
Ks = [K || {K,_V} <- KVs],
Hs = State#state.hooks,
[notify(H, Ks) || H <- Hs].
-spec notify(post_hook(), [key()]) -> any().
notify(F, Ks) ->
try notify_(F, Ks) catch
E:W ->
?LOG_ERR("Notifier ~p failed with reason: ~p", [F, {E,W}])
end.
notify_({M,F,A}, Ks) when is_list(A) ->
apply(M, F, A ++ [Ks]);
notify_(F, Ks) when is_function(F) ->
F(Ks);
notify_(_, _) -> ok.
-spec get_values(atom(), [any()]) -> [{any(), any()}].
get_values(Scope, Keys) ->
lists:concat([get_value(Scope, Ki) || Ki <- Keys]).
-spec get_value(atom(), any()) -> [{any(), any()}].
get_value(Scope, Key) ->
case ets:lookup(Scope, Key) of
[] -> [{Key, undefined}];
R -> [{Key, Val} || {_K, Val} <- R]
end.
-spec update_values(#state{}, kvlist()) -> ok.
update_values(#state{ scope=Scope } = St, KVs) ->
Ks = [K || {K,_V} <- KVs],
?LOG_DBG("Inserting values for keys: ~p", [Ks]),
% build list 'undefined' of with keys of old values
Old = [{Ki, undefined}
|| {Ki,_V} <- kvlist_for_scope(Scope)],
% merge it
Res = lists:ukeymerge(1, lists:usort(KVs), lists:usort(Old)),
% publish changes *atomically*
ets:insert(Scope, Res),
% *eventually* get rid of 'undefined' values in ETS
% but it shouldn't change anything from the client's POV
% since the getter must return 'undefined' anyway
ets:match_delete(Scope, {'_', undefined}),
% do post-commit actions
post_commit(St, KVs),
ok.
-spec set_values(#state{}, kvlist()) ->
{ok, [node()]} | {ok, [node()], warning, [node()]}.
set_values(#state{ mode=global, scope=Scope } = St, KVs) ->
global:trans({{?MODULE, Scope}, self()},
fun() ->
update_values(St, KVs),
spawn_link(
fun() -> send_values(Scope, KVs) end),
{ok}
end);
set_values(St, KVs) ->
update_values(St, KVs),
{ok, []}.
send_values(Scope, KVs) ->
N = nodes(),
{_OKs, BADs} = gen_server:multi_call(
N, Scope,
{remote_insert, KVs}),
case BADs of
[] ->
{ok, N};
L ->
?LOG_ERR(
"Unable to push ~p config to the next nodes:~p",
[Scope, L]),
{ok, N--L, warning, L}
end.
kvlist_for_scope(Scope) ->
ets:tab2list(Scope).
try_reg(ID) ->
gen_server:cast(ID, try_reg).
start_time(Pid) when is_pid(Pid) ->
{dictionary, D} = rpc:call(node(Pid), erlang, process_info, [Pid, dictionary]),
{ok, proplists:get_value('$_start_time', D, {0, 0, 0})};
start_time(ID) ->
start_time(whereis(ID)).
global_name_resolver(Name, P1, P2) ->
?LOG_INF("Resolving config master for ~p", [Name]),
{ok, T1} = start_time(P1),
{ok, T2} = start_time(P2),
if
T1 =< T2 -> P1;
true -> P2
end.
global_name(Scope) -> {?MODULE, Scope}.
get_master(Scope) ->
global:sync(),
case global:whereis_name(global_name(Scope)) of
undefined ->
try_reg(self()),
get_master(Scope);
P when is_pid(P) -> P
end.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment