Created
August 29, 2019 10:47
-
-
Save maxlapshin/7873727e3f93519cf4bb14287264fd3d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(events_sink). | |
-export([start_link/2]). | |
-export([init/1, handle_info/2, handle_call/3, terminate/2]). | |
-export([overloaded/1]). | |
-include("event_pt.hrl"). | |
-define(RECHECK_INTERVAL, 200). | |
-define(ALLOWED_DELAY, 60000). | |
start_link(Mod, Notify) -> | |
gen_server:start_link(?MODULE, [Mod, Notify], []). | |
overloaded(Name) -> | |
case ets:lookup(events_handlers, Name) of | |
[#handler{rtt_at = LastSeenAt}] -> | |
now_ms() - LastSeenAt > ?ALLOWED_DELAY; | |
[] -> | |
false | |
end. | |
-record(state, { | |
name, | |
mod, | |
args, | |
timer, | |
buffer, | |
throttle_delay, | |
throttle_timer, | |
throttle_buffer, | |
resend_notifications_limit, | |
resend_notifications_timeout, | |
resend_notifications_queue, | |
resend_notifications_queue_size, | |
resend_notifications_timer, | |
owner, | |
ref, | |
state | |
}). | |
now_ms() -> | |
os:system_time(milli_seconds). | |
init([Mod, #{name := Name} = Notify]) -> | |
{ok, State} = Mod:init(Notify), | |
Now = now_ms(), | |
ets:insert(events_handlers, #handler{name = Name, pid = self(), rtt_at = Now, started_at = Now, | |
verbose = maps:get(verbose, Notify, undefined), last_event_at = 0, rtt_delay = 0}), | |
State1 = #state{ | |
name = Name, | |
mod = Mod, | |
state = State, | |
timer = erlang:send_after(0, self(), {round_trip, Now}), | |
buffer = maps:get(buffer, Notify, undefined), | |
throttle_delay = maps:get(throttle_delay, Notify, undefined), | |
args = Notify | |
}, | |
State2 = monitor_owner(maps:get(owner,Notify,undefined), State1), | |
State3 = update_options(Notify, State2), | |
{ok, State3}. | |
update_options(Notify, #state{buffer = Buffer, resend_notifications_queue = OldQueue} = State) -> | |
ResendLimit = maps:get(resend_notifications_limit, Notify, undefined), | |
ResendTimeout = maps:get(resend_notifications_timeout, Notify, 10), | |
State1 = if | |
ResendLimit =/= undefined andalso Buffer =/= false andalso OldQueue == undefined -> | |
State#state{ | |
resend_notifications_queue = [], | |
resend_notifications_queue_size = 0, | |
resend_notifications_limit = ResendLimit, | |
resend_notifications_timeout = ResendTimeout*1000 | |
}; | |
ResendLimit =/= undefined andalso Buffer =/= false andalso OldQueue =/= undefined -> | |
State#state{ | |
resend_notifications_limit = ResendLimit, | |
resend_notifications_timeout = ResendTimeout*1000 | |
}; | |
ResendLimit == undefined orelse Buffer == false -> | |
State#state{ | |
resend_notifications_queue = undefined, | |
resend_notifications_queue_size = undefined, | |
resend_notifications_limit = undefined, | |
resend_notifications_timeout = undefined | |
} | |
end, | |
State1. | |
handle_call(sync, _, #state{mod = Mod, state = S0} = State) -> | |
S1 = case erlang:function_exported(Mod,sync,1) of | |
true -> Mod:sync(S0); | |
false -> S0 | |
end, | |
{reply, ok, State#state{state = S1}}. | |
handle_info({round_trip, SentAt}, #state{timer = OldTimer, name = Name} = State) -> | |
erlang:cancel_timer(OldTimer), | |
Now = now_ms(), | |
Delay = Now - SentAt, | |
ets:update_element(events_handlers, Name, [{#handler.rtt_at, Now}, {#handler.rtt_delay, Delay}]), | |
Timer = erlang:send_after(?RECHECK_INTERVAL, self(), {round_trip, Now + ?RECHECK_INTERVAL}), | |
{noreply, State#state{timer = Timer}}; | |
handle_info({'$event', Evt}, #state{throttle_delay = ThrottleDelay} = State) when is_integer(ThrottleDelay) -> | |
Events = [Evt|collect_events(99)], | |
ThrottleBuffer = queue_add_list(Events, State#state.throttle_buffer), | |
State1 = ensure_throttle_timer(State), | |
{noreply, State1#state{throttle_buffer = ThrottleBuffer}}; | |
handle_info(throttle_stop, #state{} = State0) -> | |
case State0#state.throttle_timer of | |
undefined -> ok; | |
_ -> erlang:cancel_timer(State0#state.throttle_timer) | |
end, | |
Events = collect_events(99), | |
ThrottleBuffer = queue:to_list(queue_add_list(Events, State0#state.throttle_buffer)), | |
State1 = State0#state{throttle_timer = undefined, throttle_buffer = undefined}, | |
case ThrottleBuffer of | |
[] -> | |
{noreply, State1}; | |
_ -> | |
Reply = deliver_events(ThrottleBuffer, State1), | |
Reply | |
end; | |
handle_info(resend_notifications_send, #state{resend_notifications_queue = Queue} = State) -> | |
if | |
Queue == undefined orelse Queue == [] -> | |
{noreply, State}; | |
true -> | |
Reply = deliver_events([], State), | |
Reply | |
end; | |
handle_info({'$event', Evt}, #state{buffer = false} = State) -> | |
Reply = deliver_events([Evt], State), | |
Reply; | |
handle_info({'$event', Evt}, #state{} = State) -> | |
Events = [Evt|collect_events(99)], | |
Reply = deliver_events(Events, State), | |
Reply; | |
handle_info({'DOWN', Ref, _, _, _}, #state{ref = Ref} = State) -> | |
{stop, normal, State}; | |
handle_info(Msg, #state{mod = Mod, state = S0} = State) -> | |
case Mod:handle_info(Msg, S0) of | |
{ok, S1} -> | |
{noreply, State#state{state = S1}}; | |
{stop, Reason, S1} -> | |
{stop, Reason, State#state{state = S1}} | |
end. | |
deliver_events(Events, #state{mod = Mod, state = S0, resend_notifications_queue = Queue, resend_notifications_limit = Cnt, name = Name} = State) -> | |
{Events1, Queue1} = if | |
Queue == undefined orelse Queue == [] -> | |
{Events, Queue}; | |
true -> | |
% Распиливаем queue_size на 4 части, что бы не одним большим батчем слать | |
lists:split(lists:min([lists:max([Cnt div 4,100]),State#state.resend_notifications_queue_size+length(Events)]), Queue ++ Events) | |
end, | |
Size1 = if | |
Queue1 == undefined -> undefined; | |
true -> length(Queue1) | |
end, | |
% [_|_] = Events1, | |
Reply = case Mod:handle_events(Events1, S0) of | |
{ok, S1} -> | |
Now = now_ms(), | |
ets:update_counter(events_handlers, Name, [{#handler.handled, length(Events)}, {#handler.last_event_at, Now, Now, Now}]), | |
{noreply, State#state{state = S1, resend_notifications_queue = Queue1, resend_notifications_queue_size = Size1}}; | |
{resend, S1} when Queue == undefined -> | |
{noreply, State#state{state = S1}}; | |
{resend, S1} when Queue =/= undefined -> | |
Queue2 = Events1++Queue1, | |
Size2 = length(Events1) + Size1, | |
Now = now_ms(), | |
ets:update_counter(events_handlers, Name, [{#handler.handled, length(Events)}, {#handler.last_event_at, Now, Now, Now}]), | |
State1 = State#state{resend_notifications_queue = Queue2, resend_notifications_queue_size = Size2, state = S1}, | |
State2 = if | |
Size2 - 0 > 0 -> | |
Timer = case State#state.resend_notifications_timer of | |
undefined -> erlang:send_after(State1#state.resend_notifications_timeout, self(), resend_notifications_send); | |
RT0 -> RT0 | |
end, | |
maybe_flush_resend_notifications_queue(State1#state{resend_notifications_timer = Timer}); | |
true -> | |
State1 | |
end, | |
{noreply, State2}; | |
{stop, Reason, S1} -> | |
{stop, Reason, State#state{state = S1}} | |
end, | |
Reply. | |
maybe_flush_resend_notifications_queue(#state{resend_notifications_queue_size = S, resend_notifications_limit = L, resend_notifications_queue = Q} = State) when | |
S - L > 0 -> | |
Q1 = lists:nthtail(S - L, Q), | |
State#state{resend_notifications_queue_size = L, resend_notifications_queue = Q1}; | |
maybe_flush_resend_notifications_queue(#state{} = State) -> | |
State. | |
queue_add_list(Items, {_,_}=Queue) -> queue:join(Queue, queue:from_list(Items)); | |
queue_add_list(Items, _) -> queue:from_list(Items). | |
ensure_throttle_timer(#state{throttle_timer = undefined, throttle_delay = PushAfter} = State) -> | |
State#state{throttle_timer = erlang:send_after(PushAfter*1000, self(), throttle_stop)}; | |
ensure_throttle_timer(#state{} = State) -> | |
State. | |
collect_events(Limit) when Limit =< 0 -> []; | |
collect_events(Limit) -> | |
receive | |
{'$event', E} -> [E|collect_events(Limit - 1)] | |
after | |
0 -> [] | |
end. | |
monitor_owner(undefined, #state{} = State) -> | |
State; | |
monitor_owner(Pid, #state{} = State) -> | |
case Pid of | |
<<"<",_/binary>> -> | |
Owner = list_to_pid(binary_to_list(Pid)), | |
Ref = erlang:monitor(process, Owner), | |
State#state{owner = Owner, ref = Ref}; | |
<<_,_/binary>> -> | |
Owner = whereis(binary_to_atom(Pid,latin1)), | |
Ref = erlang:monitor(process, Owner), | |
State#state{owner = Owner, ref = Ref} | |
end. | |
terminate(_,_) -> ok. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment