Created
July 24, 2012 00:44
-
-
Save w495/3167212 to your computer and use it in GitHub Desktop.
Old one-module postgresql connection pool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-compile(export_all). | |
-module(pgConPool). | |
-behaviour(gen_server). | |
%% -------------------------------------------------------------------- | |
%% Include files | |
%% -------------------------------------------------------------------- | |
-include("../include/common.hrl"). | |
-include_lib("epgsql/include/pgsql.hrl"). | |
%% ---------------------------------------------------------------------------- | |
%% Defines | |
%% ---------------------------------------------------------------------------- | |
-record(state, {connections, cPool, tasks, reconnectionTime, necessary, dbHost, dbName, dbUser, dbPassword}). | |
-define(RECONNECT_TIMEOUT, 5000). | |
%% ---------------------------------------------------------------------------- | |
%% External exports | |
%% ---------------------------------------------------------------------------- | |
-export([start_link/4, withConnection/2, withTransaction/2]). | |
%% -------------------------------------------------------------------- | |
%% gen_server callbacks | |
%% -------------------------------------------------------------------- | |
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, | |
terminate/2, code_change/3]). | |
%% ==================================================================== | |
%% External functions | |
%% ==================================================================== | |
start_link(Username, Password, DbName, DbHost)-> | |
Name = utils:to_atom(?FMT("pgconpool-~p", [DbName])), | |
flog:info(?FMT("NAME: ~p", [Name])), | |
Ret = gen_server:start_link({local, Name}, ?MODULE, [Username, Password, DbName, DbHost], []), | |
Ret. | |
withConnection(Fun, DbName) -> | |
Name = utils:to_atom(?FMT("pgconpool-~p", [DbName])), | |
case whereis(Name) of | |
Pid when is_pid(Pid) -> | |
gen_server:cast(Pid, {con_request, self()}), | |
receive | |
{pgconnection, Con} -> | |
try | |
Res = Fun(Con), | |
gen_server:cast(Pid, {con_free, Con}), | |
Res | |
catch | |
E:R -> | |
%%io:format("WITHCONNECTION ERROR (~p): ~p - ~p~n", [Con, E, R]), | |
pgsql:squery(Con, "rollback"), | |
%catch(pgsql:close(Con)), % ?? может его не убивать? тестовый вариант - отпускаю соединение в пул после ошибки. | |
gen_server:cast(Pid, {con_free, Con}), % ВНИМАНИЕ! может порождать ошибку | |
{pgcp_error, {E, R}} | |
end | |
end; | |
_ -> | |
{error, {not_started, DbName}} | |
end. | |
withTransaction(Fun, DbName) -> | |
withConnection( | |
fun(Con) -> | |
pgsql:squery(Con, "begin"), | |
Ret = Fun(Con), | |
pgsql:squery(Con, "commit"), | |
Ret | |
end, DbName). | |
mkCast(Param, DbName) -> | |
Name = utils:to_atom(?FMT("pgconpool-~p", [DbName])), | |
case whereis(Name) of | |
Pid when is_pid(Pid) -> | |
gen_server:cast(Pid, Param); | |
_ -> {error, {not_started, DbName}} | |
end. | |
%% ==================================================================== | |
%% Server functions | |
%% ==================================================================== | |
%% -------------------------------------------------------------------- | |
%% Function: init/1 | |
%% Description: Initiates the server | |
%% Returns: {ok, State} | | |
%% {ok, State, Timeout} | | |
%% ignore | | |
%% {stop, Reason} | |
%% -------------------------------------------------------------------- | |
init([Username, Password, DbName, DbHost]) -> | |
process_flag(trap_exit, true), | |
% Username = config:get(db_user, "cff"), | |
% DbName = config:get(db_name, "test1"), | |
% DbHost= config:get(db_host, "localhost"), | |
{ok, #state{ | |
connections=[], cPool = [], necessary=10, reconnectionTime=getNow(), tasks=[], | |
dbHost=DbHost, dbUser=Username, dbPassword=Password, dbName=DbName | |
}, 0}. | |
%% -------------------------------------------------------------------- | |
%% Function: handle_call/3 | |
%% Description: Handling call messages | |
%% Returns: {reply, Reply, State} | | |
%% {reply, Reply, State, Timeout} | | |
%% {noreply, State} | | |
%% {noreply, State, Timeout} | | |
%% {stop, Reason, Reply, State} | (terminate/2 is called) | |
%% {stop, Reason, State} (terminate/2 is called) | |
%% -------------------------------------------------------------------- | |
handle_call(Request, _From, State) -> | |
flog:info(?FMT("~p ~p~n", [?MODULE,{unexpected_call,Request}])), | |
{NState, Timeout} = checkReconnection(State), | |
Reply = {error, unexpected_call}, | |
{reply, Reply, NState, Timeout}. | |
%% -------------------------------------------------------------------- | |
%% Function: handle_cast/2 | |
%% Description: Handling cast messages | |
%% Returns: {noreply, State} | | |
%% {noreply, State, Timeout} | | |
%% {stop, Reason, State} (terminate/2 is called) | |
%% -------------------------------------------------------------------- | |
handle_cast({con_request, Pid}, State=#state{connections=Cons, tasks=Tasks}) -> | |
{RestTasks, RestCons} = execTasks(Tasks++[Pid], Cons), | |
{NState, Timeout} = checkReconnection(State#state{connections=RestCons, tasks=RestTasks}), | |
{noreply, NState, Timeout}; | |
handle_cast({con_free, Con}, State=#state{tasks=Tasks, connections=Cons, necessary=N, cPool=CPool}) -> | |
%%%handle_cast({con_free, Con}, State=#state{tasks=Tasks, connections=Cons}) -> | |
% flog:info(?FMT("confree ~p, tasks: ~p cons:~p pool:~p, necess: ~p~n", [Con, length(Tasks), length(Cons), length(CPool), N])), | |
{RestTasks, RestCons} = execTasks(Tasks, [Con|Cons]), | |
{NState, Timeout} = checkReconnection(State#state{connections=RestCons, tasks=RestTasks}), | |
{noreply, NState, Timeout}; | |
handle_cast(Msg, State) -> | |
flog:info(?FMT("~p ~p~n", [?MODULE,{unexpected_cast, Msg}])), | |
{NState, Timeout} = checkReconnection(State), | |
{noreply, NState, Timeout}. | |
%% -------------------------------------------------------------------- | |
%% Function: handle_info/2 | |
%% Description: Handling all non call/cast messages | |
%% Returns: {noreply, State} | | |
%% {noreply, State, Timeout} | | |
%% {stop, Reason, State} (terminate/2 is called) | |
%% -------------------------------------------------------------------- | |
handle_info(timeout, State=#state{tasks=Tasks, necessary=N}) -> | |
flog:info(?FMT("RECONNECT TIMER ~p ... tasks: ~p~n", [N, length(Tasks)])), | |
{NState, Timeout} = checkReconnection(State), | |
{noreply, NState, Timeout}; | |
handle_info({'EXIT', Con, Reason}, State=#state{connections=Cons, cPool=CPool, necessary=N, tasks=Tasks}) -> | |
flog:info(?FMT("Postgres connection ~p closed. Reason: ~p tasks: ~p cons:~p pool:~p, necess: ~p~n", | |
[Con, Reason, length(Tasks), length(Cons), length(CPool), N])), | |
case lists:member(Con, CPool) of | |
true -> | |
flog:info(?FMT("so.. reconnect ~p~n",[Con])), | |
NewNecessary = N+1; | |
false -> | |
flog:info(?FMT("illegal con ~p in ~p~n",[Con, CPool])), | |
NewNecessary = N | |
end, | |
StateStage1 = State#state{necessary=NewNecessary, cPool=CPool--[Con], connections=Cons--[Con]}, % 100% no Con in pools | |
{NState, Timeout} = checkReconnection(StateStage1), | |
{noreply, NState, Timeout}; | |
handle_info({Ref, {ok, Con}}, State) when is_pid(Con), is_reference(Ref) -> | |
flog:info(?FMT("MB Connection timeout result income: ~p.~n", [Con])), | |
catch(pgsql:close(Con)), | |
{NState, Timeout} = checkReconnection(State), | |
{noreply, NState, Timeout}; | |
handle_info(Info, State) -> | |
flog:info(?FMT("~p ~p~n", [?MODULE, {unexpected_info, Info}])), | |
{NState, Timeout} = checkReconnection(State), | |
{noreply, NState, Timeout}. | |
%% -------------------------------------------------------------------- | |
%% Function: terminate/2 | |
%% Description: Shutdown the server | |
%% Returns: any (ignored by gen_server) | |
%%% -------------------------------------------------------------------- | |
terminate(Reason, #state{connections=Cons}) -> | |
closeConnections(Cons), | |
global:unregister_name(?MODULE), | |
flog:info(?FMT("~p ~p~n", [?MODULE, {terminate, Reason}])), | |
ok. | |
%% -------------------------------------------------------------------- | |
%% Func: code_change/3 | |
%% Purpose: Convert process state when code is changed | |
%% Returns: {ok, NewState} | |
%% -------------------------------------------------------------------- | |
code_change(_OldVsn, State, _Extra) -> | |
{ok, State}. | |
%% -------------------------------------------------------------------- | |
%% Local | |
%% -------------------------------------------------------------------- | |
getNow() -> | |
{_M,S,Mi} = now(), | |
S*1000000+Mi. | |
closeConnections([C|T]) -> | |
pgsql:close(C), | |
closeConnections(T); | |
closeConnections([]) -> | |
done. | |
initConnectionPool(0, Ret, _DbHost, _Username, _Password, _DbName) -> | |
Ret; | |
initConnectionPool(N, Ret, DbHost, Username, Password, DbName) -> | |
case catch(pgsql:connect(DbHost, Username, Password, [{database, DbName}, {timeout, 3600000}])) of | |
{ok, Con}-> | |
flog:info(?FMT("new connection ~p~n",[Con])), | |
initConnectionPool(N-1, [Con|Ret], DbHost, Username, Password, DbName); | |
{Error, Reason} -> | |
flog:info(?FMT("init connection error ~p : ~p~n",[Error, Reason])), | |
Ret | |
end. | |
initConnectionPool(N, DbHost, Username, Password, DbName) -> | |
initConnectionPool(N, [], DbHost, Username, Password, DbName). | |
execTasks(Tasks, Cons=[Con|RC]) -> | |
{Pid, Tasks2} = getNextTask(Tasks), | |
if | |
Pid =:= none -> | |
{[], Cons}; | |
true -> | |
Pid ! {pgconnection, Con}, | |
execTasks(Tasks2, RC) | |
end; | |
execTasks(RT, RC) -> | |
{RT, RC}. | |
checkReconnection(State=#state{necessary=0}) -> | |
{State#state{reconnectionTime=infinity}, infinity}; | |
checkReconnection(State=#state{reconnectionTime=infinity}) -> | |
{State#state{reconnectionTime=getNow() + ?RECONNECT_TIMEOUT*1000}, ?RECONNECT_TIMEOUT}; | |
checkReconnection(State=#state{reconnectionTime=RT, necessary=N, connections=Cons, cPool=CPool, tasks=Tasks, | |
dbHost=DbHost, dbUser=Username, dbName=DbName, dbPassword=Password}) -> | |
Now = getNow(), | |
if | |
Now < RT -> | |
NState = State, | |
Timeout = trunc((RT - Now)/1000); | |
true -> | |
NewCons = initConnectionPool(N, DbHost, Username, Password, DbName), | |
{RestTasks, RestCons} = execTasks(Tasks, Cons++NewCons), | |
flog:info(?FMT("Reconnecting: necessary - ~p, new - ~p, rest: ~p~n", [N, length(NewCons), length(RestCons)])), | |
NState = State#state{necessary=N-length(NewCons), reconnectionTime=infinity, connections=RestCons, tasks=RestTasks, | |
cPool=CPool++NewCons}, | |
Timeout = infinity | |
end, | |
{NState, Timeout}. | |
getNextTask([Pid|T]) -> | |
case lists:member(Pid, processes()) of | |
true -> | |
{Pid, T}; | |
false -> | |
flog:info(?FMT("DROP INVALID PID ~p~n", [Pid])), | |
getNextTask(T) | |
end; | |
getNextTask([]) -> | |
{none, []}. | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment