Last active
August 29, 2015 14:16
-
-
Save allyourcode/4dc34fc9b8463fe4ae04 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| -module(assassin). | |
| -export([linked/2]). | |
| % Monitors a gen_server2 for responsiveness by periodically sending | |
| % is_healthy requests, and expecting a response in an amount of time | |
| % specified by user of this module. | |
| % | |
| % Like supervisor, N out of M failures are allowed (where failure | |
| % means response took too long). | |
| % Public for test purposes only. | |
| -export([count_failures/1, push_pop/3, kill/2, soft_kill/2]). | |
| -define(SOFT_KILL_DEFAULT_TIMEOUT, 5000). | |
| linked(Target, Cfg=#{poll_period := _PollPeriod, | |
| response_time := _ResponseTime, | |
| failure_tolerance := {_Max, _OutOf}}) -> | |
| spawn_link(fun() -> run(Target, Cfg, queue:new()) end) | |
| . | |
| run(Target, | |
| Cfg=#{poll_period := PollPeriod, | |
| response_time := ResponseTime, | |
| failure_tolerance := {Max, OutOf}}, | |
| History) -> | |
| Start = os:timestamp(), | |
| % Ping. | |
| NewHistory = try gen_server2:sync(Target, is_healthy, ResponseTime) of | |
| ok -> push_pop(History, success, OutOf) | |
| catch | |
| throw:timeout -> | |
| MoreFail = push_pop(History, failure, OutOf), | |
| case count_failures(MoreFail) > Max of | |
| true -> | |
| kill(Target, #{}), | |
| exit(normal); | |
| false -> MoreFail | |
| end | |
| end, | |
| Ellapsed = timer:now_diff(os:timestamp(), Start) div 1000, | |
| timer:sleep(PollPeriod - Ellapsed), | |
| run(Target, Cfg, NewHistory) | |
| . | |
| count_failures(History) -> | |
| lists:foldl(fun(Result, Count) -> | |
| case Result of | |
| success -> Count; | |
| failure -> Count + 1 | |
| end | |
| end, | |
| 0, | |
| queue:to_list(History)) | |
| . | |
| push_pop(Q, Element, MaxLength) -> | |
| Popped = case queue:len(Q) >= MaxLength of | |
| false -> Q; | |
| true -> | |
| element(2, queue:out(Q)) | |
| end, | |
| queue:in(Element, Popped) | |
| . | |
| kill(Target, Opt=#{}) -> | |
| Timeout = maps:get(soft_kill_timeout, Opt, ?SOFT_KILL_DEFAULT_TIMEOUT), | |
| case soft_kill(Target, Timeout) of | |
| true -> soft; | |
| false -> | |
| exit(Target, kill), % No mercy! | |
| hard | |
| end | |
| . | |
| soft_kill(Target, Timeout) -> | |
| Await = gen_server2:async(Target, stop), | |
| try Await(Timeout) of | |
| _StopResponse -> true | |
| catch | |
| throw:timeout -> false | |
| end | |
| . |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| -module(assassin_tests). | |
| -include_lib("eunit/include/eunit.hrl"). | |
| soft_kill_test() -> | |
| Timeout = 100, | |
| P1 = spawn_link(fun freeze/0), | |
| ?assertNot(assassin:soft_kill(P1, Timeout)), | |
| exit(P1, normal), | |
| P2 = spawn_link(fun let_it_go/0), | |
| ?assert(assassin:soft_kill(P2, Timeout)), | |
| exit(P2, normal) | |
| . | |
| freeze() -> | |
| receive | |
| herp -> derp | |
| end | |
| . | |
| let_it_go() -> | |
| receive | |
| {stop, Reply} -> Reply(ok) | |
| end | |
| . | |
| kill_and_assert(Fun, ExpectedKillStyle, ExpectedDownReason) -> | |
| P = spawn(Fun), | |
| Mon = monitor(process, P), | |
| ?assertNotEqual(undefined, process_info(P)), | |
| ?assertEqual(ExpectedKillStyle, | |
| assassin:kill(P, #{soft_kill_timeout => 100})), | |
| receive | |
| {'DOWN', Mon, process, P, Reason} -> | |
| ?assertEqual(ExpectedDownReason, Reason) | |
| after 5000 -> | |
| exit(P, kill), | |
| ?assert(false) | |
| end, | |
| ?assertEqual(process_info(P), undefined) | |
| . | |
| kill_test() -> | |
| kill_and_assert(fun freeze/0, hard, killed), | |
| kill_and_assert(fun let_it_go/0, soft, normal) | |
| . | |
| push_pop_test() -> | |
| lists:foldl(fun({Element, Expected}, BeforeQueue) -> | |
| AfterQueue = assassin:push_pop(BeforeQueue, Element, 2), | |
| ?assertEqual(Expected, queue:to_list(AfterQueue)), | |
| AfterQueue | |
| end, | |
| queue:new(), | |
| [{1, [1]}, {2, [1,2]}, {3, [2,3]}, {4, [3,4]}]) | |
| . | |
| count_failures_test() -> | |
| Run = fun(List) -> assassin:count_failures(queue:from_list(List)) end, | |
| ?assertEqual(0, Run([])), | |
| ?assertEqual(0, Run([success])), | |
| ?assertEqual(2, Run([success, failure, failure, success])), | |
| ?assertEqual(0, Run([success, success, success, success])) | |
| . | |
| linked_safe_test() -> | |
| Self = self(), | |
| Done = make_ref(), | |
| Notify = fun() -> Self ! Done end, | |
| Victim = victim:linked([false, true, true, false, true], Notify), | |
| Cfg = #{poll_period => 50, | |
| response_time => 5, | |
| failure_tolerance => {1, 3}}, | |
| Assassin = assassin:linked(Victim, Cfg), | |
| receive | |
| Done -> exit(Assassin, normal) | |
| after 2500 -> | |
| ?assert(false) | |
| end, | |
| exit(Victim, normal), | |
| % Still alive, still alive. | |
| ?assertNotEqual(undefined, process_info(Victim)) | |
| . | |
| linked_exit_test() -> | |
| Victim = victim:linked([false, true, false, true], | |
| fun() -> error(not_supposed_to_happend) end), | |
| Cfg = #{poll_period => 50, | |
| response_time => 5, | |
| failure_tolerance => {1, 3}}, | |
| Assassin = assassin:linked(Victim, Cfg), | |
| Monitor = monitor(process, Assassin), | |
| receive | |
| {'DOWN', Monitor, process, Assassin, Reason} -> | |
| ?assertEqual(normal, Reason), | |
| ?assertEqual(undefined, process_info(Victim)) | |
| after 2500 -> | |
| ?assertNotEqual(undefined, process_info(Assassin)), % racy | |
| exit(Assassin, normal), | |
| exit(Victim), | |
| ?assert(false) | |
| end | |
| . |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| -module(gen_server2). | |
| % Similar to venerable gen_server, but simpler. One thing that this is missing | |
| % is code_change. Not really sure how gen_server uses that, but similar | |
| % functionality should be added to this. | |
| % | |
| % This combines gen_server's various handle_* functions into one: handle. | |
| % | |
| % Implementor does not need to supply terminate. Instead, the same thing should | |
| % be accomplished by handling stop message. In that case, normal termination can | |
| % be signaled by throw(normal_termination). | |
| % | |
| % If handle does not implement stop, a default implementation is provided, which | |
| % simply returns, ending the process normally. | |
| % | |
| % There is also a default implementation for handle is_healthy, which simply | |
| % replies ok. | |
| % | |
| % Synchronous and asynchronous calls are supported. In addition, there are "raw" | |
| % async calls, which are performed like so: | |
| % MyServer ! {Request, fun(Response) -> herp_derp() end} | |
| % One way to use raw async is to send off a whole bunch of calls at once, and | |
| % handle the responses as they arrive. However, there is a better way when you | |
| % want to send the same request to many servers: broadcast(Servers, Request) | |
| -export([behaviour_info/1]). | |
| -export([linked/2, unlinked/2]). | |
| -export([async/2, sync/3, sync/2, broadcast/2]). | |
| -define(DEFAULT_TIMEOUT, 5000). | |
| behaviour_info(callbacks) -> | |
| [{init, 1}, | |
| {handle, 3}] | |
| ; | |
| behaviour_info(_Other) -> | |
| undefined | |
| . | |
| linked(Module, Init) -> | |
| State = Module:init(Init), | |
| spawn_link(fun() -> loop(Module, State) end) | |
| . | |
| unlinked(Module, Init) -> | |
| State = Module:init(Init), | |
| spawn(fun() -> loop(Module, State) end) | |
| . | |
| loop(Module, State) -> | |
| receive | |
| {Request, Reply} -> | |
| try Module:handle(Request, Reply, State) of | |
| NewState -> | |
| loop(Module, NewState) | |
| catch | |
| % Provide default implementations for stop and is_healthy. | |
| % | |
| % Seems like this way (i.e. error:function_clause) of detecting lack of | |
| % stop and/or is_healthy clause in Module:handle might catch stuff that | |
| % we don't want to catch, but not sure how else to do it. Plus, having | |
| % to re-raise everything else is ugly. | |
| error:function_clause -> | |
| StackTrace = erlang:get_stacktrace(), | |
| case Request of | |
| stop -> | |
| Reply(ok), | |
| ok; | |
| is_healthy -> | |
| Reply(ok), | |
| loop(Module, State); | |
| _ -> | |
| erlang:raise(error, function_clause, StackTrace) | |
| end; | |
| throw:terminate_normally -> | |
| ok | |
| end | |
| end | |
| . | |
| async(Server, Request) -> | |
| broadcast([Server], Request) | |
| . | |
| sync(Server, Request) -> | |
| sync(Server, Request, ?DEFAULT_TIMEOUT) | |
| . | |
| sync(Server, Request, Timeout) -> | |
| Await = async(Server, Request), | |
| Await(Timeout) | |
| . | |
| broadcast(Servers, Request) -> | |
| Self = self(), | |
| RequestId = make_ref(), | |
| Reply = fun(Response) -> | |
| % TODO: Prevent this if timeout has occurred. Otherwise, mailbox of Self | |
| % will get spammed. | |
| Self ! {RequestId, Response} | |
| end, | |
| lists:foreach(fun(Server) -> Server ! {Request, Reply} end, | |
| Servers), | |
| % TODO: How to garbage collect messages that are delivered but never received? | |
| fun(Timeout) -> | |
| await_response(RequestId, Timeout) | |
| end | |
| . | |
| await_response(RequestId, Timeout) -> | |
| receive | |
| {RequestId, Response} -> | |
| Response | |
| after Timeout -> | |
| throw(timeout) | |
| end | |
| . |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| -module(victim). | |
| -behavior(gen_server2). | |
| -export([init/1, handle/3]). | |
| -export([linked/2]). | |
| % Mock gen_server2 that is used by assassin_tests. is_healthy responses are | |
| % scripted; History is a sequence of boolean values indicating whether to | |
| % respond to is_healthy requests. OnExhausted is called as soon as History is | |
| % emptied. | |
| linked(History, OnExhausted) -> | |
| gen_server2:linked(?MODULE, {History, OnExhausted}) | |
| . | |
| init(Init) -> | |
| Init | |
| . | |
| handle(is_healthy, Reply, {[CallReply | NextHistory], OnExhausted}) -> | |
| case CallReply of | |
| true -> Reply(ok); | |
| false -> ok | |
| end, | |
| case NextHistory of | |
| [] -> | |
| OnExhausted(); | |
| _Whatever -> | |
| ok | |
| end, | |
| {NextHistory, OnExhausted} | |
| ; | |
| handle(is_healthy, _Reply, []) -> | |
| throw(asplode) | |
| . |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hmm. I just realized that I can generalize this by replacing Target with Fun: instead of assuming that Target knows how to respond to a specific message, just call Fun! One way to use this is to have the Fun send request and receive response.