Last active
August 29, 2015 14:19
-
-
Save everilae/e8c52cab6df838e7fe2a to your computer and use it in GitHub Desktop.
Learning Erlang, simple task queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(mq). | |
-export([ | |
start/0, disp_status/1, get_status/1, add_task/2, add_simple_worker/2, | |
add_worker/2, send_ack/1, echo_worker/0 | |
]). | |
-spec start() -> pid(). | |
%% @doc Spawn new queue and return process id. | |
start() -> | |
spawn(fun loop/0). | |
-spec disp_status(M :: pid()) -> ok. | |
%% @doc Print status of `M' to standard output. | |
disp_status(M) -> | |
M ! status, | |
ok. | |
-type task() :: any(). | |
-type worker() :: fun(() -> none()). | |
-spec get_status(M :: pid()) -> {ok, {[task()], [worker()]}} | {error, timeout}. | |
%% @doc Get status of `M'. | |
get_status(M) -> | |
Ref = make_ref(), | |
M ! {status, self(), Ref}, | |
receive | |
{M, Ref, Status} -> | |
{ok, Status} | |
after 1000 -> | |
{error, timeout} | |
end. | |
-spec add_task(M :: pid(), T :: task()) -> ok. | |
%% @doc Add task `T' to worker queue of `M'. | |
add_task(M, T) -> | |
M ! {task, T}, | |
ok. | |
-spec add_worker(M :: pid(), F :: worker()) -> ok. | |
%% @doc Spawn new worker `F' and add it to worker queue of `M'. | |
add_worker(M, F) -> | |
M ! {worker, spawn(F)}, | |
ok. | |
-spec add_simple_worker(M :: pid(), F :: fun((task()) -> none())) -> ok. | |
%% @doc Spawn new worker wrapping `F' and add it to worker queue of `M'. | |
add_simple_worker(M, F) -> | |
M ! {worker, spawn(fun W() -> | |
receive | |
{M, T} -> | |
F(T), | |
send_ack(M) | |
end, | |
W() | |
end)}, | |
ok. | |
-spec send_ack(M :: pid()) -> ok. | |
%% @doc Send an acknowledgement to `M'. Workers must always call this | |
%% when task is done, or they will not be put back to `Worker' queue. | |
send_ack(M) -> | |
M ! {ack, self()}, | |
ok. | |
-spec echo_worker() -> none(). | |
%% @doc Sample echo worker. | |
echo_worker() -> | |
receive | |
{M, T} -> | |
io:format("~w: ~w~n", [self(), T]), | |
send_ack(M) | |
end, | |
echo_worker(). | |
-spec dispatch(T :: task(), W :: worker()) -> ok. | |
%% @doc Dispatch task `T' to worker `W'. | |
dispatch(T, W) -> | |
W ! {self(), T}, | |
ok. | |
-spec wait(Tasks :: queue:queue(task()), Workers :: [worker()]) -> none(). | |
%% @doc Wait for tasks and workers to handle said tasks. Serve status | |
%% information. | |
wait(Tasks, Workers) -> | |
receive | |
{task, T} -> | |
loop(queue:in(T, Tasks), Workers); | |
{worker, W} -> | |
loop(Tasks, [W|Workers]); | |
{ack, W} -> | |
loop(Tasks, [W|Workers]); | |
status -> | |
io:format("Tasks: ~w, Workers: ~w~n", [queue:to_list(Tasks), Workers]); | |
{status, P, Ref} -> | |
P ! {self(), Ref, {queue:to_list(Tasks), Workers}} | |
end, | |
loop(Tasks, Workers). | |
-spec loop() -> none(). | |
% Nothing done, just wait. | |
loop() -> | |
wait(queue:new(), []). | |
-spec loop(Tasks :: queue:queue(task()), Workers :: [worker()]) -> none(). | |
% Maybe has tasks, but no workers. | |
loop(Tasks, Workers = []) -> | |
wait(Tasks, Workers); | |
% Maybe has tasks and has workers | |
loop(Tasks, Workers) -> | |
case queue:out(Tasks) of | |
{empty, _} -> | |
wait(Tasks, Workers); | |
{{value, T}, NewTasks} -> | |
dispatch(T, hd(Workers)), | |
loop(NewTasks, tl(Workers)) | |
end. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment