Last active
November 24, 2016 14:05
-
-
Save vladdu/911a3ccccc6fa8b0aed08a93ec8fa37e to your computer and use it in GitHub Desktop.
Cancellable worker process
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(cancellable_worker). | |
-behaviour(gen_server). | |
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). | |
%% ==================================================================== | |
%% API functions | |
%% ==================================================================== | |
-export([ | |
start/1, | |
start/3, | |
cancel/1, | |
cancel/2, | |
yield/1, | |
yield/2, | |
check/1 | |
]). | |
%% Implement a worker process that can be cancelled and then may return a | |
%% partial answer. | |
%% The function doing the actual work takes as argument a Reporter function to | |
%% use to report results: | |
%% - Reporter(partial, Value) for a partial result | |
%% - Reporter(final, Value) for the whole result (if partial results are not | |
%% possible); do not report this after any partial values | |
%% If partial results are sent, they are aggregated in a list, which is returned | |
start(WorkerFun) -> | |
gen_server:start(?MODULE, WorkerFun, []). | |
start(Module, Function, Args) -> | |
start(fun() -> apply(Module, Function, Args) end). | |
%% Check/1 checks if there are any answers from the worker. It can return | |
%% - {partial, Values} : the list of all currently reported values | |
%% - {final, Value} : the final result | |
%% - {error, {Value1, Value2}} : unexpected 'final' Value2 reported (either | |
%% after another 'final' or after 'partial's Value1) | |
check(MonPid) when is_pid(MonPid) -> | |
gen_server:call(MonPid, check). | |
%% Cancels the worker and returns the current results. | |
cancel(MonPid) when is_pid(MonPid) -> | |
gen_server:call(MonPid, cancel). | |
cancel(MonPid, Timeout) when is_pid(MonPid) -> | |
gen_server:call(MonPid, cancel, Timeout). | |
%% Wait until the the worker has finished and return the final result. | |
%% TODO don't return partial/final | |
yield(MonPid) when is_pid(MonPid) -> | |
gen_server:call(MonPid, yield). | |
yield(MonPid, Timeout) when is_pid(MonPid) -> | |
gen_server:call(MonPid, yield, Timeout). | |
%% ==================================================================== | |
%% Behavioural functions | |
%% ==================================================================== | |
-record(state, { | |
worker_pid, | |
results = {partial, undefined}, | |
yielding = false, | |
worker_running = false | |
}). | |
init(WorkerFun) -> | |
Monitor = self(), | |
Report = fun(partial, V) -> gen_server:cast(Monitor, {partial, V}); | |
(final, V) -> gen_server:cast(Monitor, {final, V}) | |
end, | |
{WrkPid, _Ref} = spawn_monitor(fun() -> | |
WorkerFun(Report) | |
end), | |
{ok, #state{worker_pid=WrkPid, worker_running = true}}. | |
handle_call(check, _From, State=#state{results=Results, worker_running=true}) -> | |
Reply = adjust(Results), | |
{reply, Reply, State}; | |
handle_call(check, _From, State=#state{results=Results, worker_running=false}) -> | |
{_, Reply} = adjust(Results), | |
{reply, {final, Reply}, State}; | |
handle_call(cancel, _From, State=#state{results=Results, worker_pid=Pid}) -> | |
exit(Pid, kill), | |
{_, Reply} = adjust(Results), | |
{stop, normal, {ok, Reply}, State}; | |
handle_call(yield, _From, State=#state{worker_running=false, results=Results}) -> | |
{_, Reply} = adjust(Results), | |
{stop, normal, {ok, Reply}, State}; | |
handle_call(yield, From, State) -> | |
{noreply, State#state{yielding=From}}; | |
handle_call(Request, _From, State) -> | |
Reply = {error, {unknown, Request}}, | |
{reply, Reply, State}. | |
handle_cast(V, State=#state{results=Results}) -> | |
NewResults = merge_result(V, Results), | |
{noreply, State#state{results=NewResults}}; | |
handle_cast(_Msg, State) -> | |
{noreply, State}. | |
handle_info({'DOWN', _, process, Pid, _Reason}, | |
State=#state{worker_pid=Pid, | |
yielding=From, | |
results=Results}) when From /= false -> | |
{_, Reply} = adjust(Results), | |
gen_server:reply(From, {ok, Reply}), | |
{noreply, State#state{worker_running=false}}; | |
handle_info({'DOWN', _, process, Pid, _Reason}, State=#state{worker_pid=Pid}) -> | |
{noreply, State#state{worker_running=false}}; | |
handle_info(_Info, State) -> | |
{noreply, State}. | |
terminate(_Reason, _State) -> | |
ok. | |
code_change(_OldVsn, State, _Extra) -> | |
{ok, State}. | |
%% ==================================================================== | |
%% Internal functions | |
%% ==================================================================== | |
adjust({K, Results}=Arg) -> | |
if is_list(Results) -> | |
{K, lists:reverse(Results)}; | |
true -> | |
Arg | |
end. | |
merge_result({final, V}, {partial, undefined}) -> | |
{final, V}; | |
merge_result({partial, V}, {partial, undefined}) -> | |
{partial, [V]}; | |
merge_result({final, V}, {partial, R}) -> | |
{final, [V|R]}; | |
merge_result({partial, V}, {partial, R}) -> | |
{partial, [V|R]}; | |
merge_result(_V, R) -> | |
R. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(cancellable_worker_tests). | |
-include_lib("eunit/include/eunit.hrl"). | |
run_test(Fun, Test) -> | |
{ok, Worker} = cancellable_worker:start(Fun), | |
Result = (catch Test(Worker)), | |
Result. | |
run_test_() -> | |
[ | |
?_assertMatch({final, undefined}, | |
run_test(fun(_M)-> ok end, | |
fun(W) -> cancellable_worker:check(W) end | |
) | |
), | |
?_assertMatch({final, [v2]}, | |
run_test(fun(M) -> M(partial, v2), ok end, | |
fun(W) -> w(10), cancellable_worker:check(W) end | |
) | |
), | |
?_assertMatch({final, [v3, v4]}, | |
run_test(fun(M) -> M(partial, v3), M(partial, v4), ok end, | |
fun(W) -> w(10), cancellable_worker:check(W) end | |
) | |
), | |
?_assertMatch({partial, undefined}, | |
run_test(fun(M)-> w(10), M(partial, v5) end, | |
fun(W) -> cancellable_worker:check(W) end | |
) | |
), | |
?_assertMatch({final, v7}, | |
run_test(fun(M) -> M(final, v7), ok end, | |
fun(W) -> w(10), cancellable_worker:check(W) end | |
) | |
), | |
?_assertMatch({final, v8}, | |
run_test(fun(M) -> M(final, v8), M(final, v9), ok end, | |
fun(W) -> w(10), cancellable_worker:check(W) end | |
) | |
), | |
?_assertMatch({final, v10}, | |
run_test(fun(M) -> M(final, v10), M(partial, v11), ok end, | |
fun(W) -> w(10), cancellable_worker:check(W) end | |
) | |
), | |
?_assertMatch({final, [v12, v13]}, | |
run_test(fun(M) -> M(partial, v12), M(final, v13), ok end, | |
fun(W) -> w(10), cancellable_worker:check(W) end | |
) | |
), | |
?_assertMatch({{partial, [v12a]}, {partial, [v12a, v13a]}, {final, [v12a, v13a]}}, | |
run_test(fun(M) -> M(partial, v12a), w(20), M(partial, v13a), w(20), ok end, | |
fun(W) -> w(10), A=cancellable_worker:check(W), | |
w(30), B=cancellable_worker:check(W), | |
w(50), C=cancellable_worker:check(W), | |
{A, B, C} | |
end | |
) | |
), | |
?_assertMatch({ok, [v14]}, | |
run_test(fun(M) -> M(partial, v14), w(50), M(partial, v15), ok end, | |
fun(W) -> w(10), cancellable_worker:cancel(W) end | |
) | |
), | |
?_assertMatch({ok, [v16, v17]}, | |
run_test(fun(M) -> M(partial, v16), w(10), M(partial, v17), ok end, | |
fun(W) -> w(30), cancellable_worker:cancel(W) end | |
) | |
), | |
?_assertMatch({final, [v18]}, | |
run_test(fun(M) -> M(partial, v18), w(5), crash:crash(), w(5), M(partial, v19), ok end, | |
fun(W) -> w(20), cancellable_worker:check(W) end | |
) | |
), | |
?_assertMatch({ok, undefined}, | |
run_test(fun(_M)-> ok end, | |
fun(W) -> cancellable_worker:yield(W) end | |
) | |
), | |
?_assertMatch({ok, [v6]}, | |
run_test(fun(M) -> M(partial, v6) end, | |
fun(W) -> w(10), cancellable_worker:yield(W) end | |
) | |
), | |
?_assertMatch({ok, [v19, v20]}, | |
run_test(fun(M) -> M(partial, v19), M(partial, v20) end, | |
fun(W) -> w(10), cancellable_worker:yield(W) end | |
) | |
), | |
?_assertMatch({ok, [v21]}, | |
run_test(fun(M) -> w(10), M(partial, v21) end, | |
fun(W) -> cancellable_worker:yield(W) end | |
) | |
), | |
?_assertMatch({ok, [v22, v23]}, | |
run_test(fun(M) -> M(partial, v22), M(final, v23) end, | |
fun(W) -> cancellable_worker:yield(W) end | |
) | |
), | |
?_assertMatch({ok, [v24]}, | |
run_test(fun(M) -> M(partial, v24), w(5), crash:crash(), M(partial, v25), ok end, | |
fun(W) -> cancellable_worker:yield(W) end | |
) | |
), | |
?_assert(true) | |
]. | |
w(N) -> | |
receive after N -> ok end. | |
@okeuday Thanks, I see your point, but I think my worker here is actually your controlling process. At least, in this latest version where it does little more than gathering partial results and checking that the real worker has finished.
My use case is for a server that waits for commands from the UI and executes them by spawning one of these processes. The user or other commands might request cancellation of pending operations. The UI can retrieve partial results when the operation takes a long time, to keep the user happy.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
To isolate potential instability of the worker processing, and to not lose partial results of an iterative algorithm, it is better to have a worker that passively receives details about the work it needs to do, from a controller process, with the worker's response stored before pursuing the next iteration of work. There are other reasons, like scalability, caching, memory use, start-up time, etc., to organize things this way, but the most important reasons were listed first. This is something that can be done with CloudI, as shown with the hexpi integration test (https://github.com/CloudI/CloudI/tree/develop/src/tests/hexpi) with the configuration sections https://github.com/CloudI/CloudI/blob/develop/src/cloudi_tests.conf.in#L99-L196 , https://github.com/CloudI/CloudI/blob/develop/src/cloudi_tests.conf.in#L458-L468 , https://github.com/CloudI/CloudI/blob/develop/src/cloudi_tests.conf.in#L480-L487 (segments pasted below, excluding line comments):