-
-
Save puzza007/545955 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
%% | |
%% Simple pool for gen_servers that only use :call | |
%% | |
-module(gen_server_call_pool). | |
-behaviour(gen_server). | |
%% -------------------------------------------------------------------- | |
%% External exports | |
-export([start_link/3, stats/1]). | |
%% gen_server callbacks | |
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). | |
-record(state, { | |
ready, % worker pids ready | |
busy, % worker pids currently busy | |
work, % FIFO: jobs to send to workers | |
name, % gen_server name to register as | |
m,f,a | |
}). | |
-record(job, {request, % opaque gen_server request tuple | |
from, % opaque gen_server reply tuple | |
dob % time job entered work queue | |
}). | |
start_link(Name, {M,F,A}, Num) -> | |
gen_server:start_link({local, Name}, ?MODULE, [Name,{M,F,A},Num], []). | |
stats(Name) -> | |
gen_server:call(Name, {?MODULE,stats}). | |
%% -------------------------------------------------------------------- | |
init([Name, {M,F,A}, Num]) -> | |
process_flag(trap_exit, true), | |
% start all workers: | |
Ready = lists:map(fun(_N) -> {ok, Pid} = erlang:apply(M,F,A), Pid end, lists:seq(1,Num)), | |
io:format("Started ~B db workers: ~p", [length(Ready), Ready]), | |
{ok, #state{ | |
ready=Ready, | |
busy=[], | |
work=queue:new(), | |
name=Name, | |
m=M,f=F,a=A | |
}}. | |
handle_call({?MODULE,stats}, _From, State) -> | |
case queue:out(State#state.work) of | |
{empty, _} -> T = 0; | |
{value, #job{dob=Dob}} -> T = timer:now_diff(erlang:now(), Dob)/1000 | |
end, | |
S = [ | |
{num_busy, length(State#state.busy)}, | |
{num_ready, length(State#state.ready)}, | |
{jobs_queued, queue:len(State#state.work)}, | |
{current_wait, T} | |
], | |
{reply, S, State}; | |
% job arrives when worker available immediately | |
handle_call(Request, From, State=#state{ready=Ready}) when Ready /= [] -> | |
[Worker|NewReady] = Ready, | |
F = fun() -> | |
Reply = gen_server:call(Worker, Request), | |
% this sends the reply to 'From' and puts worker back in ready list: | |
gen_server:cast(State#state.name, {send_reply, Worker, From, Reply}) | |
end, | |
spawn(F), | |
NewState = State#state{ready=NewReady,busy=[Worker|State#state.busy]}, | |
{noreply, NewState}; | |
% job arrives, no available workers, add to queue | |
handle_call(Request, From, State=#state{ready=[]}) -> | |
Job = #job{request=Request, from=From, dob=erlang:now()}, | |
NewWork = queue:in(Job, State#state.work), | |
io:format("Job queue size: ~p, num workers: ~B", [queue:len(NewWork), length(State#state.busy)]), | |
NewState = State#state{work = NewWork}, | |
{noreply, NewState}. | |
% worker finished and sends response, dispatch next job, if any waiting | |
handle_cast({send_reply, Worker, To, Reply}, State) -> | |
gen_server:reply(To, Reply), | |
NewReady = [Worker | State#state.ready], | |
NewBusy = lists:delete(Worker, State#state.busy), | |
NewState = State#state{ready=NewReady, busy=NewBusy}, | |
case queue:out(State#state.work) of | |
{empty, _} -> | |
{noreply, NewState}; | |
{{value, #job{request=JobRequest, from=JobFrom, dob=Dob}}, NewWork} -> | |
T = timer:now_diff(erlang:now(), Dob)/1000, | |
io:format("Queued job started, wait time: ~p ms", [T]), | |
NewState2 = NewState#state{work=NewWork}, | |
handle_call(JobRequest, JobFrom, NewState2) | |
end. | |
handle_info({'EXIT', Pid, Reason}, State = #state{m=M,f=F,a=A}) -> | |
io:format("WORKER CRASH: ~p ~p ready:~p busy:~p",[Pid, Reason,State#state.ready, State#state.busy]), | |
R = lists:delete(Pid, State#state.ready), | |
B = lists:delete(Pid, State#state.busy), | |
% spawn a new worker to replace the crashed one | |
{ok, W} = erlang:apply(M,F,A), | |
io:format("Added new worker to pool to replace crashed: ~p", [W]), | |
NewState = State#state{ready=[W|R],busy=B}, | |
io:format("New ready:~p busy:~p",[NewState#state.ready, NewState#state.busy]), | |
{noreply, NewState}. | |
terminate(Reason, State) -> | |
ok. | |
code_change(OldVsn, State, Extra) -> | |
{ok, State}. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment