Created
August 29, 2019 10:47
-
-
Save maxlapshin/7873727e3f93519cf4bb14287264fd3d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| -module(events_sink). | |
| -export([start_link/2]). | |
| -export([init/1, handle_info/2, handle_call/3, terminate/2]). | |
| -export([overloaded/1]). | |
| -include("event_pt.hrl"). | |
| -define(RECHECK_INTERVAL, 200). | |
| -define(ALLOWED_DELAY, 60000). | |
| start_link(Mod, Notify) -> | |
| gen_server:start_link(?MODULE, [Mod, Notify], []). | |
| overloaded(Name) -> | |
| case ets:lookup(events_handlers, Name) of | |
| [#handler{rtt_at = LastSeenAt}] -> | |
| now_ms() - LastSeenAt > ?ALLOWED_DELAY; | |
| [] -> | |
| false | |
| end. | |
| -record(state, { | |
| name, | |
| mod, | |
| args, | |
| timer, | |
| buffer, | |
| throttle_delay, | |
| throttle_timer, | |
| throttle_buffer, | |
| resend_notifications_limit, | |
| resend_notifications_timeout, | |
| resend_notifications_queue, | |
| resend_notifications_queue_size, | |
| resend_notifications_timer, | |
| owner, | |
| ref, | |
| state | |
| }). | |
| now_ms() -> | |
| os:system_time(milli_seconds). | |
| init([Mod, #{name := Name} = Notify]) -> | |
| {ok, State} = Mod:init(Notify), | |
| Now = now_ms(), | |
| ets:insert(events_handlers, #handler{name = Name, pid = self(), rtt_at = Now, started_at = Now, | |
| verbose = maps:get(verbose, Notify, undefined), last_event_at = 0, rtt_delay = 0}), | |
| State1 = #state{ | |
| name = Name, | |
| mod = Mod, | |
| state = State, | |
| timer = erlang:send_after(0, self(), {round_trip, Now}), | |
| buffer = maps:get(buffer, Notify, undefined), | |
| throttle_delay = maps:get(throttle_delay, Notify, undefined), | |
| args = Notify | |
| }, | |
| State2 = monitor_owner(maps:get(owner,Notify,undefined), State1), | |
| State3 = update_options(Notify, State2), | |
| {ok, State3}. | |
| update_options(Notify, #state{buffer = Buffer, resend_notifications_queue = OldQueue} = State) -> | |
| ResendLimit = maps:get(resend_notifications_limit, Notify, undefined), | |
| ResendTimeout = maps:get(resend_notifications_timeout, Notify, 10), | |
| State1 = if | |
| ResendLimit =/= undefined andalso Buffer =/= false andalso OldQueue == undefined -> | |
| State#state{ | |
| resend_notifications_queue = [], | |
| resend_notifications_queue_size = 0, | |
| resend_notifications_limit = ResendLimit, | |
| resend_notifications_timeout = ResendTimeout*1000 | |
| }; | |
| ResendLimit =/= undefined andalso Buffer =/= false andalso OldQueue =/= undefined -> | |
| State#state{ | |
| resend_notifications_limit = ResendLimit, | |
| resend_notifications_timeout = ResendTimeout*1000 | |
| }; | |
| ResendLimit == undefined orelse Buffer == false -> | |
| State#state{ | |
| resend_notifications_queue = undefined, | |
| resend_notifications_queue_size = undefined, | |
| resend_notifications_limit = undefined, | |
| resend_notifications_timeout = undefined | |
| } | |
| end, | |
| State1. | |
| handle_call(sync, _, #state{mod = Mod, state = S0} = State) -> | |
| S1 = case erlang:function_exported(Mod,sync,1) of | |
| true -> Mod:sync(S0); | |
| false -> S0 | |
| end, | |
| {reply, ok, State#state{state = S1}}. | |
| handle_info({round_trip, SentAt}, #state{timer = OldTimer, name = Name} = State) -> | |
| erlang:cancel_timer(OldTimer), | |
| Now = now_ms(), | |
| Delay = Now - SentAt, | |
| ets:update_element(events_handlers, Name, [{#handler.rtt_at, Now}, {#handler.rtt_delay, Delay}]), | |
| Timer = erlang:send_after(?RECHECK_INTERVAL, self(), {round_trip, Now + ?RECHECK_INTERVAL}), | |
| {noreply, State#state{timer = Timer}}; | |
| handle_info({'$event', Evt}, #state{throttle_delay = ThrottleDelay} = State) when is_integer(ThrottleDelay) -> | |
| Events = [Evt|collect_events(99)], | |
| ThrottleBuffer = queue_add_list(Events, State#state.throttle_buffer), | |
| State1 = ensure_throttle_timer(State), | |
| {noreply, State1#state{throttle_buffer = ThrottleBuffer}}; | |
| handle_info(throttle_stop, #state{} = State0) -> | |
| case State0#state.throttle_timer of | |
| undefined -> ok; | |
| _ -> erlang:cancel_timer(State0#state.throttle_timer) | |
| end, | |
| Events = collect_events(99), | |
| ThrottleBuffer = queue:to_list(queue_add_list(Events, State0#state.throttle_buffer)), | |
| State1 = State0#state{throttle_timer = undefined, throttle_buffer = undefined}, | |
| case ThrottleBuffer of | |
| [] -> | |
| {noreply, State1}; | |
| _ -> | |
| Reply = deliver_events(ThrottleBuffer, State1), | |
| Reply | |
| end; | |
| handle_info(resend_notifications_send, #state{resend_notifications_queue = Queue} = State) -> | |
| if | |
| Queue == undefined orelse Queue == [] -> | |
| {noreply, State}; | |
| true -> | |
| Reply = deliver_events([], State), | |
| Reply | |
| end; | |
| handle_info({'$event', Evt}, #state{buffer = false} = State) -> | |
| Reply = deliver_events([Evt], State), | |
| Reply; | |
| handle_info({'$event', Evt}, #state{} = State) -> | |
| Events = [Evt|collect_events(99)], | |
| Reply = deliver_events(Events, State), | |
| Reply; | |
| handle_info({'DOWN', Ref, _, _, _}, #state{ref = Ref} = State) -> | |
| {stop, normal, State}; | |
| handle_info(Msg, #state{mod = Mod, state = S0} = State) -> | |
| case Mod:handle_info(Msg, S0) of | |
| {ok, S1} -> | |
| {noreply, State#state{state = S1}}; | |
| {stop, Reason, S1} -> | |
| {stop, Reason, State#state{state = S1}} | |
| end. | |
| deliver_events(Events, #state{mod = Mod, state = S0, resend_notifications_queue = Queue, resend_notifications_limit = Cnt, name = Name} = State) -> | |
| {Events1, Queue1} = if | |
| Queue == undefined orelse Queue == [] -> | |
| {Events, Queue}; | |
| true -> | |
| % Распиливаем queue_size на 4 части, что бы не одним большим батчем слать | |
| lists:split(lists:min([lists:max([Cnt div 4,100]),State#state.resend_notifications_queue_size+length(Events)]), Queue ++ Events) | |
| end, | |
| Size1 = if | |
| Queue1 == undefined -> undefined; | |
| true -> length(Queue1) | |
| end, | |
| % [_|_] = Events1, | |
| Reply = case Mod:handle_events(Events1, S0) of | |
| {ok, S1} -> | |
| Now = now_ms(), | |
| ets:update_counter(events_handlers, Name, [{#handler.handled, length(Events)}, {#handler.last_event_at, Now, Now, Now}]), | |
| {noreply, State#state{state = S1, resend_notifications_queue = Queue1, resend_notifications_queue_size = Size1}}; | |
| {resend, S1} when Queue == undefined -> | |
| {noreply, State#state{state = S1}}; | |
| {resend, S1} when Queue =/= undefined -> | |
| Queue2 = Events1++Queue1, | |
| Size2 = length(Events1) + Size1, | |
| Now = now_ms(), | |
| ets:update_counter(events_handlers, Name, [{#handler.handled, length(Events)}, {#handler.last_event_at, Now, Now, Now}]), | |
| State1 = State#state{resend_notifications_queue = Queue2, resend_notifications_queue_size = Size2, state = S1}, | |
| State2 = if | |
| Size2 - 0 > 0 -> | |
| Timer = case State#state.resend_notifications_timer of | |
| undefined -> erlang:send_after(State1#state.resend_notifications_timeout, self(), resend_notifications_send); | |
| RT0 -> RT0 | |
| end, | |
| maybe_flush_resend_notifications_queue(State1#state{resend_notifications_timer = Timer}); | |
| true -> | |
| State1 | |
| end, | |
| {noreply, State2}; | |
| {stop, Reason, S1} -> | |
| {stop, Reason, State#state{state = S1}} | |
| end, | |
| Reply. | |
| maybe_flush_resend_notifications_queue(#state{resend_notifications_queue_size = S, resend_notifications_limit = L, resend_notifications_queue = Q} = State) when | |
| S - L > 0 -> | |
| Q1 = lists:nthtail(S - L, Q), | |
| State#state{resend_notifications_queue_size = L, resend_notifications_queue = Q1}; | |
| maybe_flush_resend_notifications_queue(#state{} = State) -> | |
| State. | |
| queue_add_list(Items, {_,_}=Queue) -> queue:join(Queue, queue:from_list(Items)); | |
| queue_add_list(Items, _) -> queue:from_list(Items). | |
| ensure_throttle_timer(#state{throttle_timer = undefined, throttle_delay = PushAfter} = State) -> | |
| State#state{throttle_timer = erlang:send_after(PushAfter*1000, self(), throttle_stop)}; | |
| ensure_throttle_timer(#state{} = State) -> | |
| State. | |
| collect_events(Limit) when Limit =< 0 -> []; | |
| collect_events(Limit) -> | |
| receive | |
| {'$event', E} -> [E|collect_events(Limit - 1)] | |
| after | |
| 0 -> [] | |
| end. | |
| monitor_owner(undefined, #state{} = State) -> | |
| State; | |
| monitor_owner(Pid, #state{} = State) -> | |
| case Pid of | |
| <<"<",_/binary>> -> | |
| Owner = list_to_pid(binary_to_list(Pid)), | |
| Ref = erlang:monitor(process, Owner), | |
| State#state{owner = Owner, ref = Ref}; | |
| <<_,_/binary>> -> | |
| Owner = whereis(binary_to_atom(Pid,latin1)), | |
| Ref = erlang:monitor(process, Owner), | |
| State#state{owner = Owner, ref = Ref} | |
| end. | |
| terminate(_,_) -> ok. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment