Last active
August 29, 2019 10:45
-
-
Save maxlapshin/ea632f04f79188aaba46d3a92406f8f6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| -module(events_router). | |
| -export([start_link/0]). | |
| -export([init/1, handle_info/2, handle_call/3, terminate/2]). | |
| -export([sync/0]). | |
| -export([deliver/2, remote_deliver/1]). | |
| -export([load_config/1, add_handler/1, remove_handler/1]). | |
| -export([suitable_events/1, match_event/2]). | |
| -include("event_pt.hrl"). | |
| -record(route, { | |
| event, | |
| name, | |
| only, | |
| except, | |
| verbose, | |
| pid | |
| }). | |
| deliver(Event, Metadata0) when is_list(Metadata0) -> | |
| deliver(Event, maps:from_list(Metadata0)); | |
| deliver(Event, Metadata0) -> | |
| EvtId = erlang:unique_integer([positive,monotonic]), | |
| Metadata1 = Metadata0#{event_id => EvtId}, | |
| case known_events:find(Event) of | |
| false -> | |
| false; | |
| EventDesc = #event{level = Level} -> | |
| {PDateTime, PTime, UtcMs} = print_now(), | |
| Metadata2 = Metadata1#{time => PTime, date => PDateTime, utc_ms => UtcMs, event => Event}, | |
| Metadata3 = case Event of | |
| text -> | |
| Metadata2; | |
| _ -> | |
| case Metadata2 of | |
| #{message := _M, loglevel := _} -> | |
| Metadata2; | |
| _ -> | |
| Message = try events_util:format_message(EventDesc, Metadata2) | |
| catch | |
| _C:_E:_ST -> | |
| % io:format("Failed to format ~p:~p\n~s ~p ~p\n~p", [_C,_E, Event, EventDesc#event.fmt, Metadata1, _ST]), | |
| <<"failed to format">> | |
| end, | |
| Metadata2#{message => Message, loglevel => Level} | |
| end | |
| end, | |
| Metadata = Metadata3, | |
| % io:format(standard_error, "Deliver ~p to ~p\n",[Metadata0, ets:lookup(events_routes, Event)]), | |
| case whereis(events_proxy) of | |
| undefined -> | |
| local_deliver(Event, Metadata); | |
| Proxy -> | |
| Proxy ! {'$event', Metadata} | |
| end, | |
| EvtId | |
| end. | |
| print_now() -> | |
| UtcMs = minute:now_ms(), | |
| {Date, Time} = minute:log_time(UtcMs), | |
| {Date, Time, UtcMs}. | |
| remote_deliver(#{event := Event} = Metadata) -> | |
| local_deliver(Event, Metadata). | |
| local_deliver(Event, #{event_id := EvtId, loglevel := Level, time := PTime} = Metadata) -> | |
| try ets:lookup(events_routes, Event) of | |
| Routes -> | |
| [deliver_to_route(Route, Metadata) || Route <- Routes], | |
| EvtId | |
| catch | |
| _:_ when Level == debug -> | |
| ok; | |
| _:_ -> | |
| io:format("~s ~p ~p\n", [PTime, Level, maps:get(message,Metadata,undefined)]) | |
| end. | |
| deliver_to_route(#route{pid = undefined}, _) -> | |
| not_started; | |
| deliver_to_route(#route{name = Name, pid = Pid, event = Event} = Route, Metadata) -> | |
| case events_sink:overloaded(Name) of | |
| true -> | |
| ets:update_counter(events_handlers, Name, [{#handler.dropped, 1}]), | |
| overloaded; | |
| false -> | |
| case match_event(Route, Metadata) of | |
| true -> | |
| Pid ! {'$event', Metadata#{event => Event}}; | |
| false -> | |
| nomatch | |
| end | |
| end. | |
| loglevel(none) -> -1; | |
| loglevel(undefined) -> undefined; | |
| loglevel(debug) -> 0; | |
| loglevel(info) -> 1; | |
| loglevel(notice) -> 2; | |
| loglevel(warning) -> 3; | |
| loglevel(error) -> 4; | |
| loglevel(critical) -> 5; | |
| loglevel(alert) -> 6; | |
| loglevel(emergency) -> 7. | |
| match_event(#route{only = Only, except = Except, event = Event, verbose = Verbose} = _R, Metadata) -> | |
| Loglevel = maps:get(loglevel, Metadata), | |
| VerboseCode = loglevel(Verbose), | |
| LoglevelCode = loglevel(Loglevel), | |
| Skip = if | |
| Verbose == undefined -> false; | |
| Verbose == undefined andalso Loglevel == none -> false; | |
| true -> VerboseCode > LoglevelCode | |
| end, | |
| % io:format(standard_error, "~p ~p ~p ~p ~p\n", [_R#route.name, Event, Verbose, Loglevel, Skip]), | |
| case Skip of | |
| true -> false; | |
| false -> match_event(Except, Only, Metadata#{event => Event}) | |
| end. | |
| match_event(Except, Only, Meta) -> | |
| Blacklisted = case Except of | |
| [] -> false; | |
| _ -> lists:any(fun(E) -> match_meta(E,Meta) end, Except) | |
| end, | |
| case Blacklisted of | |
| true -> | |
| false; | |
| false -> | |
| case Only of | |
| [] -> true; | |
| _ -> lists:any(fun(O) -> match_meta(O,Meta) end, Only) | |
| end | |
| end. | |
| match_meta([], _) -> true; | |
| match_meta([{K,Values}|List], Meta) when is_list(Values) -> | |
| case lists:member(maps:get(K,Meta, undefined), Values) of | |
| true -> match_meta(List, Meta); | |
| _ -> false | |
| end; | |
| match_meta([{K,V}|List], Meta) -> | |
| case maps:get(K,Meta,undefined) of | |
| V0 when V0 =/= undefined andalso V == <<"*">> -> match_meta(List, Meta); | |
| V -> match_meta(List, Meta); | |
| _ -> false | |
| end. | |
| load_config(#{} = Notifies) -> | |
| gen_server:call(?MODULE, {install_notifies, Notifies}). | |
| add_handler(#{name := _} = Notify) -> | |
| gen_server:call(?MODULE, {add_handler, Notify}). | |
| remove_handler(Name) -> | |
| gen_server:call(?MODULE, {remove_handler, Name}). | |
| sync() -> | |
| gen_server:call(?MODULE, sync), | |
| [begin | |
| Pid ! throttle_stop, | |
| try gen_server:call(Pid, sync) | |
| catch | |
| exit:_ -> ok | |
| end | |
| end || {_,Pid,_,_} <- supervisor:which_children(events_manager_sup)], | |
| ok. | |
| start_link() -> | |
| gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). | |
| -record(state, { | |
| refresh_timer, | |
| notifies = #{}, | |
| extra_notifies = #{} | |
| }). | |
| init([]) -> | |
| ets:new(events_routes, [public, named_table, bag, {read_concurrency,true}, {keypos, #route.event}]), | |
| ets:new(events_handlers, [public, named_table, {read_concurrency,true}, {keypos, #handler.name}]), | |
| {ok, #state{}}. | |
| handle_stop(Pid, Reason, List) -> | |
| case [N || {_, #{pid := P} = N} <- maps:to_list(List), P == Pid] of | |
| [] -> | |
| false; | |
| [#{name := Name} = Notify] -> | |
| shutdown_old(Name), | |
| case Reason of | |
| normal -> | |
| maps:remove(Name, List); | |
| _ -> | |
| maps:put(Name, maps:without([pid,ref], Notify), List) | |
| end | |
| end. | |
| handle_info({'DOWN', _, _, Pid, Reason}, #state{notifies = Notifies, extra_notifies = Extra} = State) -> | |
| {ok, State2} = case handle_stop(Pid, Reason, Notifies) of | |
| false -> | |
| case handle_stop(Pid, Reason, Extra) of | |
| false -> | |
| {ok, State}; | |
| Extra1 -> | |
| {ok, State#state{extra_notifies = Extra1}} | |
| end; | |
| Notifies1 -> | |
| {ok, State#state{notifies = Notifies1}} | |
| end, | |
| case State2#state.refresh_timer of | |
| undefined -> | |
| {noreply, State2#state{refresh_timer = erlang:send_after(2000, self(), refresh)}}; | |
| _ -> | |
| {noreply, State2} | |
| end; | |
| handle_info(refresh, #state{refresh_timer = T} = State) -> | |
| case T of | |
| undefined -> ok; | |
| _ -> erlang:cancel_timer(T) | |
| end, | |
| {ok, State1} = refresh(State#state{refresh_timer = undefined}), | |
| {noreply, State1}; | |
| handle_info(Msg, State) -> | |
| {stop, {bad_msg,Msg}, State}. | |
| handle_call(sync, _From, #state{} = State) -> | |
| {reply, ok, State}; | |
| handle_call({remove_handler, Name}, _From, #state{extra_notifies = Extra} = State) -> | |
| shutdown_old(Name), | |
| {ok, State1} = refresh(State#state{extra_notifies = maps:remove(Name, Extra)}), | |
| {reply, ok, State1}; | |
| handle_call({add_handler, #{name := Name, sink := NewSink} = Notify}, _From, #state{extra_notifies = Extra} = State) -> | |
| OldNotify = case maps:get(Name, Extra, #{}) of | |
| #{sink := NewSink} = ON -> maps:with([pid,ref],ON); | |
| #{sink := _} -> shutdown_old(Name), #{}; | |
| _ -> #{} | |
| end, | |
| Notify1 = maps:merge(Notify, OldNotify), | |
| {ok, State1} = refresh(State#state{extra_notifies = maps:put(Name, Notify1, Extra)}), | |
| {reply, Name, State1}; | |
| handle_call({install_notifies, Notifies}, _From, #state{notifies = OldNotifies} = State) -> | |
| Notifies2 = maps:map(fun(Name, #{sink := NewSink} = N) -> | |
| OldNotify = case maps:get(Name, OldNotifies, #{}) of | |
| #{sink := NewSink} = ON -> maps:with([pid,ref],ON); | |
| #{sink := _} -> shutdown_old(Name), #{}; | |
| _ -> #{} | |
| end, | |
| maps:merge(N, OldNotify) | |
| end, Notifies), | |
| {ok, State1} = refresh(State#state{notifies = Notifies2}), | |
| {reply, ok, State1}; | |
| handle_call(handlers, _From, #state{extra_notifies = Extra, notifies = Notifies} = State) -> | |
| {reply, {Notifies, Extra}, State}; | |
| handle_call(Call, _From, State) -> | |
| {reply, {bad_call, Call}, State}. | |
| terminate(_,_) -> ok. | |
| refresh(#state{notifies = Notifies, extra_notifies = Extra} = State) -> | |
| Existing = [Name || {Name,_,_,_} <- supervisor:which_children(events_manager_sup)], | |
| ToRemove = [Name || Name <- Existing, not maps:is_key(Name, Notifies) andalso not maps:is_key(Name, Extra) ], | |
| [shutdown_old(Name) || Name <- ToRemove], | |
| Notifies2 = maps:map(fun | |
| (_Name, #{pid := Pid} = N) when is_pid(Pid) -> N; | |
| (_Name, #{} = N) -> launch_new(N) | |
| end, Notifies), | |
| Extra2 = maps:map(fun | |
| (_Name, #{pid := Pid} = N) when is_pid(Pid) -> N; | |
| (_Name, #{} = N) -> launch_new(N) | |
| end, Extra), | |
| Installed = | |
| install_notifies(maps:values(Notifies2)) ++ | |
| install_notifies(maps:values(Extra2)), | |
| [case lists:member(Entry, Installed) of | |
| true -> ok; | |
| false -> ets:delete_object(events_routes, Entry) | |
| end || Entry <- ets:tab2list(events_routes)], | |
| {ok, State#state{notifies = Notifies2, extra_notifies = Extra2}}. | |
| shutdown_old(Name) -> | |
| supervisor:terminate_child(events_manager_sup, Name), | |
| supervisor:delete_child(events_manager_sup, Name), | |
| [ets:delete_object(events_routes, Route) || #route{name = N} = Route <- ets:tab2list(events_routes), N == Name], | |
| ets:delete(events_handlers, Name), | |
| ok. | |
| launch_new(#{name := Name, sink := Sink} = Notify) -> | |
| Proto = events_util:sink_protocol(Sink), | |
| Mod = case Proto of | |
| http -> events_sink_http; | |
| erl -> events_sink_erl; | |
| pid -> events_sink_pid; | |
| lua -> events_sink_lua; | |
| log -> events_sink_log; | |
| console -> events_sink_console; | |
| undefined -> undefined; | |
| {plugin, Plugin} -> list_to_atom("events_sink_"++atom_to_list(Plugin)) | |
| end, | |
| case Mod of | |
| undefined -> | |
| Notify; | |
| _ -> | |
| {ok, Pid} = supervisor:start_child(events_manager_sup, | |
| #{id => Name, start => {events_sink, start_link, [Mod, Notify]}, restart => transient, shutdown => 500}), | |
| Ref = erlang:monitor(process, Pid), | |
| Notify#{pid => Pid, ref => Ref} | |
| end. | |
| events() -> | |
| [Name || #event{name = Name} <- known_events:all()]. | |
| install_notifies([#{name := Name} = Notify|Notifies]) -> | |
| Only = maps:get(only, Notify, []), | |
| Except = maps:get(except, Notify, []), | |
| Suitable = suitable_events(Notify), | |
| Routes = [ #route{ | |
| event = Evt, | |
| name = Name, | |
| verbose = maps:get(verbose, Notify, undefined), | |
| only = [atomize(maps:to_list(O)) || O <- Only], | |
| except = [atomize(maps:to_list(E)) || E <- Except], | |
| pid = maps:get(pid, Notify, undefined) | |
| } || Evt <- Suitable ], | |
| ets:insert(events_routes, Routes), | |
| case maps:get(pid, Notify, undefined) of | |
| undefined -> ok; | |
| Pid -> Pid ! {update_options, Notify} | |
| end, | |
| Routes ++ install_notifies(Notifies); | |
| install_notifies([]) -> | |
| []. | |
| atomize([{application,Evt}|List]) when is_binary(Evt) -> [{application,binary_to_atom(Evt,latin1)}|List]; | |
| atomize([{module,Evt}|List]) when is_binary(Evt) -> [{module,binary_to_atom(Evt,latin1)}|List]; | |
| atomize([{event,Evt}|List]) when is_binary(Evt) -> [{event,binary_to_atom(Evt,latin1)}|List]; | |
| atomize([{event,Evts}|List]) when is_list(Evts) -> [{event,[binary_to_atom(Evt,latin1) || Evt <- Evts]}|List]; | |
| atomize([{K,V}|List]) -> [{K,V}|atomize(List)]; | |
| atomize([]) -> []. | |
| suitable_events(#{} = Notify) -> | |
| All0 = lists:usort(events()), | |
| All = case maps:get(verbose,Notify,undefined) of | |
| undefined -> All0 -- [crashed,text]; | |
| _ -> All0 | |
| end, | |
| Whitelist = select_whitelisted_events(maps:get(only, Notify, []), All), | |
| Blacklist = select_blacklisted_events(maps:get(except, Notify, []), All), | |
| case Whitelist of | |
| [] -> | |
| All -- Blacklist; | |
| _ -> | |
| Whitelist | |
| end. | |
| select_whitelisted_events(Notifies, All) -> | |
| merge_events(Notifies, [], All, white). | |
| select_blacklisted_events(Notifies, All) -> | |
| merge_events(Notifies, [], All, black). | |
| merge_events([], Acc, _, _) -> | |
| lists:usort(Acc); | |
| merge_events([#{event := Evts}|List], Acc, All, WhiteBlack) when is_list(Evts) -> | |
| merge_events(List, [binary_to_atom(Evt,latin1) || Evt <- Evts] ++ Acc, All, WhiteBlack); | |
| merge_events([#{event := Evt}|List], Acc, All, WhiteBlack) when is_binary(Evt) -> | |
| merge_events(List, [to_a(Evt)|Acc], All, WhiteBlack); | |
| merge_events([#{}|_], _, All, white) -> | |
| All; | |
| merge_events([#{}|_], _, _All, black) -> | |
| []. | |
| to_a(Atom) when is_atom(Atom) -> Atom; | |
| to_a(Bin) when is_binary(Bin) -> binary_to_atom(Bin, latin1). | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment