Skip to content

Instantly share code, notes, and snippets.

@vladdu
Last active November 24, 2016 14:05
Show Gist options
  • Save vladdu/911a3ccccc6fa8b0aed08a93ec8fa37e to your computer and use it in GitHub Desktop.
Save vladdu/911a3ccccc6fa8b0aed08a93ec8fa37e to your computer and use it in GitHub Desktop.
Cancellable worker process
-module(cancellable_worker).
-behaviour(gen_server).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
%% ====================================================================
%% API functions
%% ====================================================================
-export([
start/1,
start/3,
cancel/1,
cancel/2,
yield/1,
yield/2,
check/1
]).
%% Implement a worker process that can be cancelled and then may return a
%% partial answer.
%% The function doing the actual work takes as argument a Reporter function to
%% use to report results:
%% - Reporter(partial, Value) for a partial result
%% - Reporter(final, Value) for the whole result (if partial results are not
%% possible); do not report this after any partial values
%% If partial results are sent, they are aggregated in a list, which is returned
start(WorkerFun) ->
gen_server:start(?MODULE, WorkerFun, []).
start(Module, Function, Args) ->
start(fun() -> apply(Module, Function, Args) end).
%% Check/1 checks if there are any answers from the worker. It can return
%% - {partial, Values} : the list of all currently reported values
%% - {final, Value} : the final result
%% - {error, {Value1, Value2}} : unexpected 'final' Value2 reported (either
%% after another 'final' or after 'partial's Value1)
check(MonPid) when is_pid(MonPid) ->
gen_server:call(MonPid, check).
%% Cancels the worker and returns the current results.
cancel(MonPid) when is_pid(MonPid) ->
gen_server:call(MonPid, cancel).
cancel(MonPid, Timeout) when is_pid(MonPid) ->
gen_server:call(MonPid, cancel, Timeout).
%% Wait until the the worker has finished and return the final result.
%% TODO don't return partial/final
yield(MonPid) when is_pid(MonPid) ->
gen_server:call(MonPid, yield).
yield(MonPid, Timeout) when is_pid(MonPid) ->
gen_server:call(MonPid, yield, Timeout).
%% ====================================================================
%% Behavioural functions
%% ====================================================================
-record(state, {
worker_pid,
results = {partial, undefined},
yielding = false,
worker_running = false
}).
init(WorkerFun) ->
Monitor = self(),
Report = fun(partial, V) -> gen_server:cast(Monitor, {partial, V});
(final, V) -> gen_server:cast(Monitor, {final, V})
end,
{WrkPid, _Ref} = spawn_monitor(fun() ->
WorkerFun(Report)
end),
{ok, #state{worker_pid=WrkPid, worker_running = true}}.
handle_call(check, _From, State=#state{results=Results, worker_running=true}) ->
Reply = adjust(Results),
{reply, Reply, State};
handle_call(check, _From, State=#state{results=Results, worker_running=false}) ->
{_, Reply} = adjust(Results),
{reply, {final, Reply}, State};
handle_call(cancel, _From, State=#state{results=Results, worker_pid=Pid}) ->
exit(Pid, kill),
{_, Reply} = adjust(Results),
{stop, normal, {ok, Reply}, State};
handle_call(yield, _From, State=#state{worker_running=false, results=Results}) ->
{_, Reply} = adjust(Results),
{stop, normal, {ok, Reply}, State};
handle_call(yield, From, State) ->
{noreply, State#state{yielding=From}};
handle_call(Request, _From, State) ->
Reply = {error, {unknown, Request}},
{reply, Reply, State}.
handle_cast(V, State=#state{results=Results}) ->
NewResults = merge_result(V, Results),
{noreply, State#state{results=NewResults}};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({'DOWN', _, process, Pid, _Reason},
State=#state{worker_pid=Pid,
yielding=From,
results=Results}) when From /= false ->
{_, Reply} = adjust(Results),
gen_server:reply(From, {ok, Reply}),
{noreply, State#state{worker_running=false}};
handle_info({'DOWN', _, process, Pid, _Reason}, State=#state{worker_pid=Pid}) ->
{noreply, State#state{worker_running=false}};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% ====================================================================
%% Internal functions
%% ====================================================================
adjust({K, Results}=Arg) ->
if is_list(Results) ->
{K, lists:reverse(Results)};
true ->
Arg
end.
merge_result({final, V}, {partial, undefined}) ->
{final, V};
merge_result({partial, V}, {partial, undefined}) ->
{partial, [V]};
merge_result({final, V}, {partial, R}) ->
{final, [V|R]};
merge_result({partial, V}, {partial, R}) ->
{partial, [V|R]};
merge_result(_V, R) ->
R.
-module(cancellable_worker_tests).
-include_lib("eunit/include/eunit.hrl").
run_test(Fun, Test) ->
{ok, Worker} = cancellable_worker:start(Fun),
Result = (catch Test(Worker)),
Result.
run_test_() ->
[
?_assertMatch({final, undefined},
run_test(fun(_M)-> ok end,
fun(W) -> cancellable_worker:check(W) end
)
),
?_assertMatch({final, [v2]},
run_test(fun(M) -> M(partial, v2), ok end,
fun(W) -> w(10), cancellable_worker:check(W) end
)
),
?_assertMatch({final, [v3, v4]},
run_test(fun(M) -> M(partial, v3), M(partial, v4), ok end,
fun(W) -> w(10), cancellable_worker:check(W) end
)
),
?_assertMatch({partial, undefined},
run_test(fun(M)-> w(10), M(partial, v5) end,
fun(W) -> cancellable_worker:check(W) end
)
),
?_assertMatch({final, v7},
run_test(fun(M) -> M(final, v7), ok end,
fun(W) -> w(10), cancellable_worker:check(W) end
)
),
?_assertMatch({final, v8},
run_test(fun(M) -> M(final, v8), M(final, v9), ok end,
fun(W) -> w(10), cancellable_worker:check(W) end
)
),
?_assertMatch({final, v10},
run_test(fun(M) -> M(final, v10), M(partial, v11), ok end,
fun(W) -> w(10), cancellable_worker:check(W) end
)
),
?_assertMatch({final, [v12, v13]},
run_test(fun(M) -> M(partial, v12), M(final, v13), ok end,
fun(W) -> w(10), cancellable_worker:check(W) end
)
),
?_assertMatch({{partial, [v12a]}, {partial, [v12a, v13a]}, {final, [v12a, v13a]}},
run_test(fun(M) -> M(partial, v12a), w(20), M(partial, v13a), w(20), ok end,
fun(W) -> w(10), A=cancellable_worker:check(W),
w(30), B=cancellable_worker:check(W),
w(50), C=cancellable_worker:check(W),
{A, B, C}
end
)
),
?_assertMatch({ok, [v14]},
run_test(fun(M) -> M(partial, v14), w(50), M(partial, v15), ok end,
fun(W) -> w(10), cancellable_worker:cancel(W) end
)
),
?_assertMatch({ok, [v16, v17]},
run_test(fun(M) -> M(partial, v16), w(10), M(partial, v17), ok end,
fun(W) -> w(30), cancellable_worker:cancel(W) end
)
),
?_assertMatch({final, [v18]},
run_test(fun(M) -> M(partial, v18), w(5), crash:crash(), w(5), M(partial, v19), ok end,
fun(W) -> w(20), cancellable_worker:check(W) end
)
),
?_assertMatch({ok, undefined},
run_test(fun(_M)-> ok end,
fun(W) -> cancellable_worker:yield(W) end
)
),
?_assertMatch({ok, [v6]},
run_test(fun(M) -> M(partial, v6) end,
fun(W) -> w(10), cancellable_worker:yield(W) end
)
),
?_assertMatch({ok, [v19, v20]},
run_test(fun(M) -> M(partial, v19), M(partial, v20) end,
fun(W) -> w(10), cancellable_worker:yield(W) end
)
),
?_assertMatch({ok, [v21]},
run_test(fun(M) -> w(10), M(partial, v21) end,
fun(W) -> cancellable_worker:yield(W) end
)
),
?_assertMatch({ok, [v22, v23]},
run_test(fun(M) -> M(partial, v22), M(final, v23) end,
fun(W) -> cancellable_worker:yield(W) end
)
),
?_assertMatch({ok, [v24]},
run_test(fun(M) -> M(partial, v24), w(5), crash:crash(), M(partial, v25), ok end,
fun(W) -> cancellable_worker:yield(W) end
)
),
?_assert(true)
].
w(N) ->
receive after N -> ok end.
@vladdu
Copy link
Author

vladdu commented Nov 18, 2016

I need this in a project, but it may be of more general interest. In that case I will make a proper release of it, with tests and documentation.

I would be happy to receive comments, especially for (more or less glaring) bugs and (hints of) overengineering. Also please let me know if there are already implementations, as I couldn't find any.

@okeuday
Copy link

okeuday commented Nov 18, 2016

To isolate potential instability of the worker processing, and to not lose partial results of an iterative algorithm, it is better to have a worker that passively receives details about the work it needs to do, from a controller process, with the worker's response stored before pursuing the next iteration of work. There are other reasons, like scalability, caching, memory use, start-up time, etc., to organize things this way, but the most important reasons were listed first. This is something that can be done with CloudI, as shown with the hexpi integration test (https://github.com/CloudI/CloudI/tree/develop/src/tests/hexpi) with the configuration sections https://github.com/CloudI/CloudI/blob/develop/src/cloudi_tests.conf.in#L99-L196 , https://github.com/CloudI/CloudI/blob/develop/src/cloudi_tests.conf.in#L458-L468 , https://github.com/CloudI/CloudI/blob/develop/src/cloudi_tests.conf.in#L480-L487 (segments pasted below, excluding line comments):

...
% worker
{external,
    "/tests/", "tests/hexpi/priv/hexpi", "",
    [{"LD_LIBRARY_PATH", "api/c/lib/"},
     {"DYLD_LIBRARY_PATH", "api/c/lib/"}],
    none, tcp, default, 5000, 5000, 5000,
    undefined, undefined, 1, 1.5, 5, 300,
    [{request_timeout_adjustment, true},
     {nice, 15}]},
...
% storage
{internal,
    "/tests/http_req/",
    cloudi_service_filesystem,
    [{directory, "tests/http_req/public_html/"},
     {write_append, ["/tests/http_req/hexpi.txt"]},
    ...
     ],
    immediate_closest,
    5000, 5000, 5000, undefined, undefined, 1, 5, 300, []},
...
% controller
[{prefix, "/tests/"},
 {module, cloudi_service_map_reduce},
 {args, [{map_reduce, cloudi_service_hexpi}, % map-reduce module
         {map_reduce_args, [1, 65536]},  % index start, index end
         {concurrency, 1.5}]},
 {timeout_init, 20000},
 {dest_list_deny, [api]},
 {options, [{request_timeout_adjustment, true}]}],
...

@vladdu
Copy link
Author

vladdu commented Nov 24, 2016

@okeuday Thanks, I see your point, but I think my worker here is actually your controlling process. At least, in this latest version where it does little more than gathering partial results and checking that the real worker has finished.

My use case is for a server that waits for commands from the UI and executes them by spawning one of these processes. The user or other commands might request cancellation of pending operations. The UI can retrieve partial results when the operation takes a long time, to keep the user happy.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment