Skip to content

Instantly share code, notes, and snippets.

@huangxiangdan
Last active December 16, 2015 13:28
Show Gist options
  • Save huangxiangdan/5441368 to your computer and use it in GitHub Desktop.
Save huangxiangdan/5441368 to your computer and use it in GitHub Desktop.
support R16A
--- mod_logdb.erl.orig 2010-05-12 17:21:38.000000000 +0300
+++ mod_logdb.erl 2010-05-12 16:22:34.000000000 +0300
@@ -0,0 +1,2088 @@
+%%%----------------------------------------------------------------------
+%%% File : mod_logdb.erl
+%%% Author : Oleg Palij (mailto,xmpp:[email protected])
+%%% Purpose : Frontend for log user messages to db
+%%% Version : trunk
+%%% Id : $Id: mod_logdb.erl 1360 2009-07-30 06:00:14Z malik $
+%%% Url : http://www.dp.uz.gov.ua/o.palij/mod_logdb/
+%%%----------------------------------------------------------------------
+
+-module(mod_logdb).
+-author('[email protected]').
+
+-behaviour(gen_server).
+-behaviour(gen_mod).
+
+% supervisor
+-export([start_link/2]).
+% gen_mod
+-export([start/2,stop/1]).
+% gen_server
+-export([code_change/3,handle_call/3,handle_cast/2,handle_info/2,init/1,terminate/2]).
+% hooks
+-export([send_packet/3, receive_packet/4, remove_user/2]).
+-export([get_local_identity/5,
+ get_local_features/5,
+ get_local_items/5,
+ adhoc_local_items/4,
+ adhoc_local_commands/4
+% get_sm_identity/5,
+% get_sm_features/5,
+% get_sm_items/5,
+% adhoc_sm_items/4,
+% adhoc_sm_commands/4]).
+ ]).
+% ejabberdctl
+-export([rebuild_stats/3,
+ copy_messages/1, copy_messages_ctl/3, copy_messages_int_tc/1]).
+%
+-export([get_vhost_stats/1, get_vhost_stats_at/2,
+ get_user_stats/2, get_user_messages_at/3,
+ get_dates/1,
+ sort_stats/1,
+ convert_timestamp/1, convert_timestamp_brief/1,
+ get_user_settings/2, set_user_settings/3,
+ user_messages_at_parse_query/4, user_messages_parse_query/3,
+ vhost_messages_parse_query/2, vhost_messages_at_parse_query/4,
+ list_to_bool/1, bool_to_list/1,
+ list_to_string/1, string_to_list/1,
+ get_module_settings/1, set_module_settings/2,
+ purge_old_records/2]).
+% webadmin hooks
+-export([webadmin_menu/3,
+ webadmin_user/4,
+ webadmin_page/3,
+ user_parse_query/5]).
+% webadmin queries
+-export([vhost_messages_stats/3,
+ vhost_messages_stats_at/4,
+ user_messages_stats/4,
+ user_messages_stats_at/5]).
+
+-include("mod_logdb.hrl").
+-include("ejabberd.hrl").
+-include("mod_roster.hrl").
+-include("jlib.hrl").
+-include("ejabberd_ctl.hrl").
+-include("adhoc.hrl").
+-include("web/ejabberd_web_admin.hrl").
+-include("web/ejabberd_http.hrl").
+
+-define(PROCNAME, ejabberd_mod_logdb).
+% gen_server call timeout
+-define(CALL_TIMEOUT, 10000).
+
+-record(state, {vhost, dbmod, backendPid, monref, purgeRef, pollRef, dbopts, dbs, dolog_default, ignore_jids, groupchat, purge_older_days, poll_users_settings, drop_messages_on_user_removal}).
+
+ets_settings_table(VHost) -> list_to_atom("ets_logdb_settings_" ++ VHost).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% gen_mod/gen_server callbacks
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+% ejabberd starts module
+start(VHost, Opts) ->
+ ChildSpec =
+ {gen_mod:get_module_proc(VHost, ?PROCNAME),
+ {?MODULE, start_link, [VHost, Opts]},
+ permanent,
+ 1000,
+ worker,
+ [?MODULE]},
+ % add child to ejabberd_sup
+ supervisor:start_child(ejabberd_sup, ChildSpec).
+
+% supervisor starts gen_server
+start_link(VHost, Opts) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ {ok, Pid} = gen_server:start_link({local, Proc}, ?MODULE, [VHost, Opts], []),
+ Pid ! start,
+ {ok, Pid}.
+
+init([VHost, Opts]) ->
+ ?MYDEBUG("Starting mod_logdb", []),
+ process_flag(trap_exit, true),
+ DBs = gen_mod:get_opt(dbs, Opts, [{mnesia, []}]),
+ VHostDB = gen_mod:get_opt(vhosts, Opts, [{VHost, mnesia}]),
+ % 10 is default becouse of using in clustered environment
+ PollUsersSettings = gen_mod:get_opt(poll_users_settings, Opts, 10),
+
+ {value,{_, DBName}} = lists:keysearch(VHost, 1, VHostDB),
+ {value, {DBName, DBOpts}} = lists:keysearch(DBName, 1, DBs),
+
+ ?MYDEBUG("Starting mod_logdb for ~p with ~p backend", [VHost, DBName]),
+
+ DBMod = list_to_atom(atom_to_list(?MODULE) ++ "_" ++ atom_to_list(DBName)),
+
+ {ok, #state{vhost=VHost,
+ dbmod=DBMod,
+ dbopts=DBOpts,
+ % dbs used for convert messages from one backend to other
+ dbs=DBs,
+ dolog_default=gen_mod:get_opt(dolog_default, Opts, true),
+ drop_messages_on_user_removal=gen_mod:get_opt(drop_messages_on_user_removal, Opts, true),
+ ignore_jids=gen_mod:get_opt(ignore_jids, Opts, []),
+ groupchat=gen_mod:get_opt(groupchat, Opts, none),
+ purge_older_days=gen_mod:get_opt(purge_older_days, Opts, never),
+ poll_users_settings=PollUsersSettings}}.
+
+cleanup(#state{vhost=VHost} = _State) ->
+ ?MYDEBUG("Stopping ~s for ~p", [?MODULE, VHost]),
+
+ %ets:delete(ets_settings_table(VHost)),
+
+ ejabberd_hooks:delete(remove_user, VHost, ?MODULE, remove_user, 90),
+ ejabberd_hooks:delete(user_send_packet, VHost, ?MODULE, send_packet, 90),
+ ejabberd_hooks:delete(user_receive_packet, VHost, ?MODULE, receive_packet, 90),
+ %ejabberd_hooks:delete(adhoc_sm_commands, VHost, ?MODULE, adhoc_sm_commands, 110),
+ %ejabberd_hooks:delete(adhoc_sm_items, VHost, ?MODULE, adhoc_sm_items, 110),
+ ejabberd_hooks:delete(adhoc_local_commands, VHost, ?MODULE, adhoc_local_commands, 110),
+ ejabberd_hooks:delete(adhoc_local_items, VHost, ?MODULE, adhoc_local_items, 110),
+ %ejabberd_hooks:delete(disco_sm_identity, VHost, ?MODULE, get_sm_identity, 110),
+ %ejabberd_hooks:delete(disco_sm_features, VHost, ?MODULE, get_sm_features, 110),
+ %ejabberd_hooks:delete(disco_sm_items, VHost, ?MODULE, get_sm_items, 110),
+ ejabberd_hooks:delete(disco_local_identity, VHost, ?MODULE, get_local_identity, 110),
+ ejabberd_hooks:delete(disco_local_features, VHost, ?MODULE, get_local_features, 110),
+ ejabberd_hooks:delete(disco_local_items, VHost, ?MODULE, get_local_items, 110),
+
+ ejabberd_hooks:delete(webadmin_menu_host, VHost, ?MODULE, webadmin_menu, 70),
+ ejabberd_hooks:delete(webadmin_user, VHost, ?MODULE, webadmin_user, 50),
+ ejabberd_hooks:delete(webadmin_page_host, VHost, ?MODULE, webadmin_page, 50),
+ ejabberd_hooks:delete(webadmin_user_parse_query, VHost, ?MODULE, user_parse_query, 50),
+
+ ?MYDEBUG("Removed hooks for ~p", [VHost]),
+
+ %ejabberd_ctl:unregister_commands(VHost, [{"rebuild_stats", "rebuild mod_logdb module stats for vhost"}], ?MODULE, rebuild_stats),
+ %Supported_backends = lists:flatmap(fun({Backend, _Opts}) ->
+ % [atom_to_list(Backend), " "]
+ % end, State#state.dbs),
+ %ejabberd_ctl:unregister_commands(
+ % VHost,
+ % [{"copy_messages backend", "copy messages from backend to current backend. backends could be: " ++ Supported_backends }],
+ % ?MODULE, copy_messages_ctl),
+ ?MYDEBUG("Unregistered commands for ~p", [VHost]).
+
+stop(VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ %gen_server:call(Proc, {cleanup}),
+ %?MYDEBUG("Cleanup in stop finished!!!!", []),
+ %timer:sleep(10000),
+ ok = supervisor:terminate_child(ejabberd_sup, Proc),
+ ok = supervisor:delete_child(ejabberd_sup, Proc).
+
+handle_call({cleanup}, _From, State) ->
+ cleanup(State),
+ ?MYDEBUG("Cleanup finished!!!!!", []),
+ {reply, ok, State};
+handle_call({get_dates}, _From, #state{dbmod=DBMod, vhost=VHost}=State) ->
+ Reply = DBMod:get_dates(VHost),
+ {reply, Reply, State};
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+% ejabberd_web_admin callbacks
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+handle_call({delete_messages_by_user_at, PMsgs, Date}, _From, #state{dbmod=DBMod, vhost=VHost}=State) ->
+ Reply = DBMod:delete_messages_by_user_at(VHost, PMsgs, Date),
+ {reply, Reply, State};
+handle_call({delete_all_messages_by_user_at, User, Date}, _From, #state{dbmod=DBMod, vhost=VHost}=State) ->
+ Reply = DBMod:delete_all_messages_by_user_at(User, VHost, Date),
+ {reply, Reply, State};
+handle_call({delete_messages_at, Date}, _From, #state{dbmod=DBMod, vhost=VHost}=State) ->
+ Reply = DBMod:delete_messages_at(VHost, Date),
+ {reply, Reply, State};
+handle_call({get_vhost_stats}, _From, #state{dbmod=DBMod, vhost=VHost}=State) ->
+ Reply = DBMod:get_vhost_stats(VHost),
+ {reply, Reply, State};
+handle_call({get_vhost_stats_at, Date}, _From, #state{dbmod=DBMod, vhost=VHost}=State) ->
+ Reply = DBMod:get_vhost_stats_at(VHost, Date),
+ {reply, Reply, State};
+handle_call({get_user_stats, User}, _From, #state{dbmod=DBMod, vhost=VHost}=State) ->
+ Reply = DBMod:get_user_stats(User, VHost),
+ {reply, Reply, State};
+handle_call({get_user_messages_at, User, Date}, _From, #state{dbmod=DBMod, vhost=VHost}=State) ->
+ Reply = DBMod:get_user_messages_at(User, VHost, Date),
+ {reply, Reply, State};
+handle_call({get_user_settings, User}, _From, #state{dbmod=_DBMod, vhost=VHost}=State) ->
+ Reply = case ets:match_object(ets_settings_table(VHost),
+ #user_settings{owner_name=User, _='_'}) of
+ [Set] -> Set;
+ _ -> #user_settings{owner_name=User,
+ dolog_default=State#state.dolog_default,
+ dolog_list=[],
+ donotlog_list=[]}
+ end,
+ {reply, Reply, State};
+% TODO: remove User ??
+handle_call({set_user_settings, User, GSet}, _From, #state{dbmod=DBMod, vhost=VHost}=State) ->
+ Set = GSet#user_settings{owner_name=User},
+ Reply =
+ case ets:match_object(ets_settings_table(VHost),
+ #user_settings{owner_name=User, _='_'}) of
+ [Set] ->
+ ?MYDEBUG("Settings is equal", []),
+ ok;
+ _ ->
+ case DBMod:set_user_settings(User, VHost, Set) of
+ error ->
+ error;
+ ok ->
+ true = ets:insert(ets_settings_table(VHost), Set),
+ ok
+ end
+ end,
+ {reply, Reply, State};
+handle_call({get_module_settings}, _From, State) ->
+ {reply, State, State};
+handle_call({set_module_settings, #state{purge_older_days=PurgeDays,
+ poll_users_settings=PollSec} = Settings},
+ _From,
+ #state{purgeRef=PurgeRefOld,
+ pollRef=PollRefOld,
+ purge_older_days=PurgeDaysOld,
+ poll_users_settings=PollSecOld} = State) ->
+ PurgeRef = if
+ PurgeDays == never, PurgeDaysOld /= never ->
+ {ok, cancel} = timer:cancel(PurgeRefOld),
+ disabled;
+ is_integer(PurgeDays), PurgeDaysOld == never ->
+ set_purge_timer(PurgeDays);
+ true ->
+ PurgeRefOld
+ end,
+
+ PollRef = if
+ PollSec == PollSecOld ->
+ PollRefOld;
+ PollSec == 0, PollSecOld /= 0 ->
+ {ok, cancel} = timer:cancel(PollRefOld),
+ disabled;
+ is_integer(PollSec), PollSecOld == 0 ->
+ set_poll_timer(PollSec);
+ is_integer(PollSec), PollSecOld /= 0 ->
+ {ok, cancel} = timer:cancel(PollRefOld),
+ set_poll_timer(PollSec)
+ end,
+
+ NewState = State#state{dolog_default=Settings#state.dolog_default,
+ ignore_jids=Settings#state.ignore_jids,
+ groupchat=Settings#state.groupchat,
+ drop_messages_on_user_removal=Settings#state.drop_messages_on_user_removal,
+ purge_older_days=PurgeDays,
+ poll_users_settings=PollSec,
+ purgeRef=PurgeRef,
+ pollRef=PollRef},
+ {reply, ok, NewState};
+handle_call(Msg, _From, State) ->
+ ?INFO_MSG("Got call Msg: ~p, State: ~p", [Msg, State]),
+ {noreply, State}.
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+% end ejabberd_web_admin callbacks
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+% ejabberd_hooks call
+handle_cast({addlog, Direction, Owner, Peer, Packet}, #state{dbmod=DBMod, vhost=VHost}=State) ->
+ case filter(Owner, Peer, State) of
+ true ->
+ case catch packet_parse(Owner, Peer, Packet, Direction, State) of
+ ignore ->
+ ok;
+ {'EXIT', Reason} ->
+ ?ERROR_MSG("Failed to parse: ~p", [Reason]);
+ Msg ->
+ DBMod:log_message(VHost, Msg)
+ end;
+ false ->
+ ok
+ end,
+ {noreply, State};
+handle_cast({remove_user, User}, #state{dbmod=DBMod, vhost=VHost}=State) ->
+ case State#state.drop_messages_on_user_removal of
+ true ->
+ DBMod:drop_user(User, VHost),
+ ?INFO_MSG("Launched ~s@~s removal", [User, VHost]);
+ false ->
+ ?INFO_MSG("Message removing is disabled. Keeping messages for ~s@~s", [User, VHost])
+ end,
+ {noreply, State};
+% ejabberdctl rebuild_stats/3
+handle_cast({rebuild_stats}, #state{dbmod=DBMod, vhost=VHost}=State) ->
+ DBMod:rebuild_stats(VHost),
+ {noreply, State};
+handle_cast({copy_messages, Backend}, State) ->
+ spawn(?MODULE, copy_messages, [[State, Backend]]),
+ {noreply, State};
+handle_cast({copy_messages, Backend, Date}, State) ->
+ spawn(?MODULE, copy_messages, [[State, Backend, Date]]),
+ {noreply, State};
+handle_cast(Msg, State) ->
+ ?INFO_MSG("Got cast Msg:~p, State:~p", [Msg, State]),
+ {noreply, State}.
+
+% return: disabled | timer reference
+set_purge_timer(PurgeDays) ->
+ case PurgeDays of
+ never -> disabled;
+ Days when is_integer(Days) ->
+ {ok, Ref1} = timer:send_interval(timer:hours(24), scheduled_purging),
+ Ref1
+ end.
+
+% return: disabled | timer reference
+set_poll_timer(PollSec) ->
+ if
+ PollSec > 0 ->
+ {ok, Ref2} = timer:send_interval(timer:seconds(PollSec), poll_users_settings),
+ Ref2;
+ % db polling disabled
+ PollSec == 0 ->
+ disabled;
+ true ->
+ {ok, Ref3} = timer:send_interval(timer:seconds(10), poll_users_settings),
+ Ref3
+ end.
+
+% actual starting of logging
+% from timer:send_after (in init)
+handle_info(start, #state{dbmod=DBMod, vhost=VHost}=State) ->
+ case DBMod:start(VHost, State#state.dbopts) of
+ {error,{already_started,_}} ->
+ ?MYDEBUG("backend module already started - trying to stop it", []),
+ DBMod:stop(VHost),
+ {stop, already_started, State};
+ {error, Reason} ->
+ timer:sleep(30000),
+ ?ERROR_MSG("Failed to start: ~p", [Reason]),
+ {stop, db_connection_failed, State};
+ {ok, SPid} ->
+ ?INFO_MSG("~p connection established", [DBMod]),
+
+ MonRef = erlang:monitor(process, SPid),
+
+ ets:new(ets_settings_table(VHost), [named_table,public,set,{keypos, #user_settings.owner_name}]),
+ {ok, DoLog} = DBMod:get_users_settings(VHost),
+ ets:insert(ets_settings_table(VHost), DoLog),
+
+ TrefPurge = set_purge_timer(State#state.purge_older_days),
+ TrefPoll = set_poll_timer(State#state.poll_users_settings),
+
+ ejabberd_hooks:add(remove_user, VHost, ?MODULE, remove_user, 90),
+ ejabberd_hooks:add(user_send_packet, VHost, ?MODULE, send_packet, 90),
+ ejabberd_hooks:add(user_receive_packet, VHost, ?MODULE, receive_packet, 90),
+
+ ejabberd_hooks:add(disco_local_items, VHost, ?MODULE, get_local_items, 110),
+ ejabberd_hooks:add(disco_local_features, VHost, ?MODULE, get_local_features, 110),
+ ejabberd_hooks:add(disco_local_identity, VHost, ?MODULE, get_local_identity, 110),
+ %ejabberd_hooks:add(disco_sm_items, VHost, ?MODULE, get_sm_items, 110),
+ %ejabberd_hooks:add(disco_sm_features, VHost, ?MODULE, get_sm_features, 110),
+ %ejabberd_hooks:add(disco_sm_identity, VHost, ?MODULE, get_sm_identity, 110),
+ ejabberd_hooks:add(adhoc_local_items, VHost, ?MODULE, adhoc_local_items, 110),
+ ejabberd_hooks:add(adhoc_local_commands, VHost, ?MODULE, adhoc_local_commands, 110),
+ %ejabberd_hooks:add(adhoc_sm_items, VHost, ?MODULE, adhoc_sm_items, 110),
+ %ejabberd_hooks:add(adhoc_sm_commands, VHost, ?MODULE, adhoc_sm_commands, 110),
+
+ ejabberd_hooks:add(webadmin_menu_host, VHost, ?MODULE, webadmin_menu, 70),
+ ejabberd_hooks:add(webadmin_user, VHost, ?MODULE, webadmin_user, 50),
+ ejabberd_hooks:add(webadmin_page_host, VHost, ?MODULE, webadmin_page, 50),
+ ejabberd_hooks:add(webadmin_user_parse_query, VHost, ?MODULE, user_parse_query, 50),
+
+ ?MYDEBUG("Added hooks for ~p", [VHost]),
+
+ %ejabberd_ctl:register_commands(
+ % VHost,
+ % [{"rebuild_stats", "rebuild mod_logdb module stats for vhost"}],
+ % ?MODULE, rebuild_stats),
+ %Supported_backends = lists:flatmap(fun({Backend, _Opts}) ->
+ % [atom_to_list(Backend), " "]
+ % end, State#state.dbs),
+ %ejabberd_ctl:register_commands(
+ % VHost,
+ % [{"copy_messages backend", "copy messages from backend to current backend. backends could be: " ++ Supported_backends }],
+ % ?MODULE, copy_messages_ctl),
+ ?MYDEBUG("Registered commands for ~p", [VHost]),
+
+ NewState=State#state{monref = MonRef, backendPid=SPid, purgeRef=TrefPurge, pollRef=TrefPoll},
+ {noreply, NewState};
+ Rez ->
+ ?ERROR_MSG("Rez=~p", [Rez]),
+ timer:sleep(30000),
+ {stop, db_connection_failed, State}
+ end;
+% from timer:send_interval/2 (in start handle_info)
+handle_info(scheduled_purging, #state{vhost=VHost, purge_older_days=Days} = State) ->
+ ?MYDEBUG("Starting scheduled purging of old records for ~p", [VHost]),
+ spawn(?MODULE, purge_old_records, [VHost, integer_to_list(Days)]),
+ {noreply, State};
+% from timer:send_interval/2 (in start handle_info)
+handle_info(poll_users_settings, #state{dbmod=DBMod, vhost=VHost}=State) ->
+ {ok, DoLog} = DBMod:get_users_settings(VHost),
+ ?MYDEBUG("DoLog=~p", [DoLog]),
+ true = ets:delete_all_objects(ets_settings_table(VHost)),
+ ets:insert(ets_settings_table(VHost), DoLog),
+ {noreply, State};
+handle_info({'DOWN', _MonitorRef, process, _Pid, _Info}, State) ->
+ {stop, db_connection_dropped, State};
+handle_info({fetch_result, _, _}, State) ->
+ ?MYDEBUG("Got timed out mysql fetch result", []),
+ {noreply, State};
+handle_info(Info, State) ->
+ ?INFO_MSG("Got Info:~p, State:~p", [Info, State]),
+ {noreply, State}.
+
+terminate(db_connection_failed, _State) ->
+ ok;
+terminate(db_connection_dropped, State) ->
+ ?MYDEBUG("Got terminate with db_connection_dropped", []),
+ cleanup(State),
+ ok;
+terminate(Reason, #state{monref=undefined} = State) ->
+ ?MYDEBUG("Got terminate with undefined monref.~nReason: ~p", [Reason]),
+ cleanup(State),
+ ok;
+terminate(Reason, #state{dbmod=DBMod, vhost=VHost, monref=MonRef, backendPid=Pid} = State) ->
+ ?INFO_MSG("Reason: ~p", [Reason]),
+ case erlang:is_process_alive(Pid) of
+ true ->
+ erlang:demonitor(MonRef, [flush]),
+ DBMod:stop(VHost);
+ false ->
+ ok
+ end,
+ cleanup(State),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% ejabberd_hooks callbacks
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+% TODO: change to/from to list as sql stores it as list
+send_packet(Owner, Peer, P) ->
+ VHost = Owner#jid.lserver,
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:cast(Proc, {addlog, to, Owner, Peer, P}).
+
+receive_packet(_JID, Peer, Owner, P) ->
+ VHost = Owner#jid.lserver,
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:cast(Proc, {addlog, from, Owner, Peer, P}).
+
+remove_user(User, Server) ->
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Server),
+ Proc = gen_mod:get_module_proc(LServer, ?PROCNAME),
+ gen_server:cast(Proc, {remove_user, LUser}).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% ejabberdctl
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+rebuild_stats(_Val, VHost, ["rebuild_stats"]) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:cast(Proc, {rebuild_stats}),
+ {stop, ?STATUS_SUCCESS};
+rebuild_stats(Val, _VHost, _Args) ->
+ Val.
+
+copy_messages_ctl(_Val, VHost, ["copy_messages", Backend]) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:cast(Proc, {copy_messages, Backend}),
+ {stop, ?STATUS_SUCCESS};
+copy_messages_ctl(_Val, VHost, ["copy_messages", Backend, Date]) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:cast(Proc, {copy_messages, Backend, Date}),
+ {stop, ?STATUS_SUCCESS};
+copy_messages_ctl(Val, _VHost, _Args) ->
+ Val.
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% misc operations
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+% handle_cast({addlog, E}, _)
+% raw packet -> #msg
+packet_parse(Owner, Peer, Packet, Direction, State) ->
+ case xml:get_subtag(Packet, "body") of
+ false ->
+ ignore;
+ Body_xml ->
+ Message_type =
+ case xml:get_tag_attr_s("type", Packet) of
+ [] -> "normal";
+ MType -> MType
+ end,
+
+ case Message_type of
+ "groupchat" when State#state.groupchat == send, Direction == to ->
+ ok;
+ "groupchat" when State#state.groupchat == send, Direction == from ->
+ throw(ignore);
+ "groupchat" when State#state.groupchat == half ->
+ Rooms = ets:match(muc_online_room, '$1'),
+ Ni=lists:foldl(fun([{muc_online_room, {GName, GHost}, Pid}], Names) ->
+ case gen_fsm:sync_send_all_state_event(Pid, {get_jid_nick,Owner}) of
+ [] -> Names;
+ Nick ->
+ lists:append(Names, [jlib:jid_to_string({GName, GHost, Nick})])
+ end
+ end, [], Rooms),
+ case lists:member(jlib:jid_to_string(Peer), Ni) of
+ true when Direction == from ->
+ throw(ignore);
+ _ ->
+ ok
+ end;
+ "groupchat" when State#state.groupchat == none ->
+ throw(ignore);
+ _ ->
+ ok
+ end,
+
+ Message_body = xml:get_tag_cdata(Body_xml),
+ Message_subject =
+ case xml:get_subtag(Packet, "subject") of
+ false ->
+ "";
+ Subject_xml ->
+ xml:get_tag_cdata(Subject_xml)
+ end,
+
+ OwnerName = stringprep:tolower(Owner#jid.user),
+ PName = stringprep:tolower(Peer#jid.user),
+ PServer = stringprep:tolower(Peer#jid.server),
+ PResource = Peer#jid.resource,
+
+ #msg{timestamp=get_timestamp(),
+ owner_name=OwnerName,
+ peer_name=PName,
+ peer_server=PServer,
+ peer_resource=PResource,
+ direction=Direction,
+ type=Message_type,
+ subject=Message_subject,
+ body=Message_body}
+ end.
+
+% called from handle_cast({addlog, _}, _) -> true (log messages) | false (do not log messages)
+filter(Owner, Peer, State) ->
+ OwnerStr = Owner#jid.luser++"@"++Owner#jid.lserver,
+ OwnerServ = "@"++Owner#jid.lserver,
+ PeerStr = Peer#jid.luser++"@"++Peer#jid.lserver,
+ PeerServ = "@"++Peer#jid.lserver,
+
+ LogTo = case ets:match_object(ets_settings_table(State#state.vhost),
+ #user_settings{owner_name=Owner#jid.luser, _='_'}) of
+ [#user_settings{dolog_default=Default,
+ dolog_list=DLL,
+ donotlog_list=DNLL}] ->
+ A = lists:member(PeerStr, DLL),
+ B = lists:member(PeerStr, DNLL),
+ if
+ A -> true;
+ B -> false;
+ Default == true -> true;
+ Default == false -> false;
+ true -> State#state.dolog_default
+ end;
+ _ -> State#state.dolog_default
+ end,
+
+ lists:all(fun(O) -> O end,
+ [not lists:member(OwnerStr, State#state.ignore_jids),
+ not lists:member(PeerStr, State#state.ignore_jids),
+ not lists:member(OwnerServ, State#state.ignore_jids),
+ not lists:member(PeerServ, State#state.ignore_jids),
+ LogTo]).
+
+purge_old_records(VHost, Days) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+
+ Dates = ?MODULE:get_dates(VHost),
+ DateNow = calendar:datetime_to_gregorian_seconds({date(), {0,0,1}}),
+ DateDiff = list_to_integer(Days)*24*60*60,
+ ?MYDEBUG("Purging tables older than ~s days", [Days]),
+ lists:foreach(fun(Date) ->
+ {ok, [Year, Month, Day]} = re:split(Date, "[^0-9]+"),
+ DateInSec = calendar:datetime_to_gregorian_seconds({{list_to_integer(Year), list_to_integer(Month), list_to_integer(Day)}, {0,0,1}}),
+ if
+ (DateNow - DateInSec) > DateDiff ->
+ gen_server:call(Proc, {delete_messages_at, Date});
+ true ->
+ ?MYDEBUG("Skipping messages at ~p", [Date])
+ end
+ end, Dates).
+
+% called from get_vhost_stats/2, get_user_stats/3
+sort_stats(Stats) ->
+ % Stats = [{"2003-4-15",1}, {"2006-8-18",1}, ... ]
+ CFun = fun({TableName, Count}) ->
+ {ok, [Year, Month, Day]} = re:split(TableName, "[^0-9]+"),
+ { calendar:datetime_to_gregorian_seconds({{list_to_integer(Year), list_to_integer(Month), list_to_integer(Day)}, {0,0,1}}), Count }
+ end,
+ % convert to [{63364377601,1}, {63360662401,1}, ... ]
+ CStats = lists:map(CFun, Stats),
+ % sort by date
+ SortedStats = lists:reverse(lists:keysort(1, CStats)),
+ % convert to [{"2007-12-9",1}, {"2007-10-27",1}, ... ] sorted list
+ [{mod_logdb:convert_timestamp_brief(TableSec), Count} || {TableSec, Count} <- SortedStats].
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% Date/Time operations
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+% return float seconds elapsed from "zero hour" as list
+get_timestamp() ->
+ {MegaSec, Sec, MicroSec} = now(),
+ [List] = io_lib:format("~.5f", [MegaSec*1000000 + Sec + MicroSec/1000000]),
+ List.
+
+% convert float seconds elapsed from "zero hour" to local time "%Y-%m-%d %H:%M:%S" string
+convert_timestamp(Seconds) when is_list(Seconds) ->
+ case string:to_float(Seconds++".0") of
+ {F,_} when is_float(F) -> convert_timestamp(F);
+ _ -> erlang:error(badarg, [Seconds])
+ end;
+convert_timestamp(Seconds) when is_float(Seconds) ->
+ GregSec = trunc(Seconds + 719528*86400),
+ UnivDT = calendar:gregorian_seconds_to_datetime(GregSec),
+ {{Year, Month, Day},{Hour, Minute, Sec}} = calendar:universal_time_to_local_time(UnivDT),
+ integer_to_list(Year) ++ "-" ++ integer_to_list(Month) ++ "-" ++ integer_to_list(Day) ++ " " ++ integer_to_list(Hour) ++ ":" ++ integer_to_list(Minute) ++ ":" ++ integer_to_list(Sec).
+
+% convert float seconds elapsed from "zero hour" to local time "%Y-%m-%d" string
+convert_timestamp_brief(Seconds) when is_list(Seconds) ->
+ convert_timestamp_brief(list_to_float(Seconds));
+convert_timestamp_brief(Seconds) when is_float(Seconds) ->
+ GregSec = trunc(Seconds + 719528*86400),
+ UnivDT = calendar:gregorian_seconds_to_datetime(GregSec),
+ {{Year, Month, Day},{_Hour, _Minute, _Sec}} = calendar:universal_time_to_local_time(UnivDT),
+ integer_to_list(Year) ++ "-" ++ integer_to_list(Month) ++ "-" ++ integer_to_list(Day);
+convert_timestamp_brief(Seconds) when is_integer(Seconds) ->
+ {{Year, Month, Day},{_Hour, _Minute, _Sec}} = calendar:gregorian_seconds_to_datetime(Seconds),
+ integer_to_list(Year) ++ "-" ++ integer_to_list(Month) ++ "-" ++ integer_to_list(Day).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% DB operations (get)
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+get_vhost_stats(VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_vhost_stats}, ?CALL_TIMEOUT).
+
+get_vhost_stats_at(VHost, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_vhost_stats_at, Date}, ?CALL_TIMEOUT).
+
+get_user_stats(User, VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_user_stats, User}, ?CALL_TIMEOUT).
+
+get_user_messages_at(User, VHost, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_user_messages_at, User, Date}, ?CALL_TIMEOUT).
+
+get_dates(VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_dates}, ?CALL_TIMEOUT).
+
+get_user_settings(User, VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_user_settings, User}, ?CALL_TIMEOUT).
+
+set_user_settings(User, VHost, Set) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {set_user_settings, User, Set}).
+
+get_module_settings(VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_module_settings}).
+
+set_module_settings(VHost, Settings) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {set_module_settings, Settings}).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% Web admin callbacks (delete)
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+user_messages_at_parse_query(VHost, Date, Msgs, Query) ->
+ case lists:keysearch("delete", 1, Query) of
+ {value, _} ->
+ PMsgs = lists:filter(
+ fun(Msg) ->
+ ID = jlib:encode_base64(binary_to_list(term_to_binary(Msg#msg.timestamp))),
+ lists:member({"selected", ID}, Query)
+ end, Msgs),
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {delete_messages_by_user_at, PMsgs, Date}, ?CALL_TIMEOUT);
+ false ->
+ nothing
+ end.
+
+user_messages_parse_query(User, VHost, Query) ->
+ case lists:keysearch("delete", 1, Query) of
+ {value, _} ->
+ Dates = get_dates(VHost),
+ PDates = lists:filter(
+ fun(Date) ->
+ ID = jlib:encode_base64(binary_to_list(term_to_binary(User++Date))),
+ lists:member({"selected", ID}, Query)
+ end, Dates),
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ Rez = lists:foldl(
+ fun(Date, Acc) ->
+ lists:append(Acc,
+ [gen_server:call(Proc,
+ {delete_all_messages_by_user_at, User, Date},
+ ?CALL_TIMEOUT)])
+ end, [], PDates),
+ case lists:member(error, Rez) of
+ true ->
+ error;
+ false ->
+ nothing
+ end;
+ false ->
+ nothing
+ end.
+
+vhost_messages_parse_query(VHost, Query) ->
+ case lists:keysearch("delete", 1, Query) of
+ {value, _} ->
+ Dates = get_dates(VHost),
+ PDates = lists:filter(
+ fun(Date) ->
+ ID = jlib:encode_base64(binary_to_list(term_to_binary(VHost++Date))),
+ lists:member({"selected", ID}, Query)
+ end, Dates),
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ Rez = lists:foldl(fun(Date, Acc) ->
+ lists:append(Acc, [gen_server:call(Proc,
+ {delete_messages_at, Date},
+ ?CALL_TIMEOUT)])
+ end, [], PDates),
+ case lists:member(error, Rez) of
+ true ->
+ error;
+ false ->
+ nothing
+ end;
+ false ->
+ nothing
+ end.
+
+vhost_messages_at_parse_query(VHost, Date, Stats, Query) ->
+ case lists:keysearch("delete", 1, Query) of
+ {value, _} ->
+ PStats = lists:filter(
+ fun({User, _Count}) ->
+ ID = jlib:encode_base64(binary_to_list(term_to_binary(User++VHost))),
+ lists:member({"selected", ID}, Query)
+ end, Stats),
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ Rez = lists:foldl(fun({User, _Count}, Acc) ->
+ lists:append(Acc, [gen_server:call(Proc,
+ {delete_all_messages_by_user_at,
+ User, Date},
+ ?CALL_TIMEOUT)])
+ end, [], PStats),
+ case lists:member(error, Rez) of
+ true ->
+ error;
+ false ->
+ ok
+ end;
+ false ->
+ nothing
+ end.
+
+copy_messages([#state{vhost=VHost}=State, From]) ->
+ ?INFO_MSG("Going to copy messages from ~p for ~p", [From, VHost]),
+
+ {FromDBName, FromDBOpts} =
+ case lists:keysearch(list_to_atom(From), 1, State#state.dbs) of
+ {value, {FN, FO}} ->
+ {FN, FO};
+ false ->
+ ?ERROR_MSG("Failed to find record for ~p in dbs", [From]),
+ throw(error)
+ end,
+
+ FromDBMod = list_to_atom(atom_to_list(?MODULE) ++ "_" ++ atom_to_list(FromDBName)),
+
+ {ok, _FromPid} = FromDBMod:start(VHost, FromDBOpts),
+
+ Dates = FromDBMod:get_dates(VHost),
+ DatesLength = length(Dates),
+
+ lists:foldl(fun(Date, Acc) ->
+ case copy_messages_int([FromDBMod, State#state.dbmod, VHost, Date]) of
+ ok ->
+ ?INFO_MSG("Copied messages at ~p (~p/~p)", [Date, Acc, DatesLength]);
+ Value ->
+ ?ERROR_MSG("Failed to copy messages at ~p (~p/~p): ~p", [Date, Acc, DatesLength, Value]),
+ FromDBMod:stop(VHost),
+ throw(error)
+ end,
+ Acc + 1
+ end, 1, Dates),
+ ?INFO_MSG("Copied messages from ~p", [From]),
+ FromDBMod:stop(VHost);
+copy_messages([#state{vhost=VHost}=State, From, Date]) ->
+ {value, {FromDBName, FromDBOpts}} = lists:keysearch(list_to_atom(From), 1, State#state.dbs),
+ FromDBMod = list_to_atom(atom_to_list(?MODULE) ++ "_" ++ atom_to_list(FromDBName)),
+ {ok, _FromPid} = FromDBMod:start(VHost, FromDBOpts),
+ case catch copy_messages_int([FromDBMod, State#state.dbmod, VHost, Date]) of
+ {'exit', Reason} ->
+ ?ERROR_MSG("Failed to copy messages at ~p: ~p", [Date, Reason]);
+ ok ->
+ ?INFO_MSG("Copied messages at ~p", [Date]);
+ Value ->
+ ?ERROR_MSG("Failed to copy messages at ~p: ~p", [Date, Value])
+ end,
+ FromDBMod:stop(VHost).
+
+copy_messages_int([FromDBMod, ToDBMod, VHost, Date]) ->
+ ets:new(mod_logdb_temp, [named_table, set, public]),
+ {Time, Value} = timer:tc(?MODULE, copy_messages_int_tc, [[FromDBMod, ToDBMod, VHost, Date]]),
+ ets:delete_all_objects(mod_logdb_temp),
+ ets:delete(mod_logdb_temp),
+ ?INFO_MSG("copy_messages at ~p elapsed ~p sec", [Date, Time/1000000]),
+ Value.
+
+copy_messages_int_tc([FromDBMod, ToDBMod, VHost, Date]) ->
+ ?INFO_MSG("Going to copy messages from ~p for ~p at ~p", [FromDBMod, VHost, Date]),
+
+ ok = FromDBMod:rebuild_stats_at(VHost, Date),
+ catch mod_logdb:rebuild_stats_at(VHost, Date),
+ {ok, FromStats} = FromDBMod:get_vhost_stats_at(VHost, Date),
+ ToStats = case mod_logdb:get_vhost_stats_at(VHost, Date) of
+ {ok, Stats} -> Stats;
+ {error, _} -> []
+ end,
+
+ FromStatsS = lists:keysort(1, FromStats),
+ ToStatsS = lists:keysort(1, ToStats),
+
+ StatsLength = length(FromStats),
+
+ CopyFun = if
+ % destination table is empty
+ FromDBMod /= mod_logdb_mnesia_old, ToStats == [] ->
+ fun({User, _Count}, Acc) ->
+ {ok, Msgs} = FromDBMod:get_user_messages_at(User, VHost, Date),
+ MAcc =
+ lists:foldl(fun(Msg, MFAcc) ->
+ ok = ToDBMod:log_message(VHost, Msg),
+ MFAcc + 1
+ end, 0, Msgs),
+ NewAcc = Acc + 1,
+ ?INFO_MSG("Copied ~p messages for ~p (~p/~p) at ~p", [MAcc, User, NewAcc, StatsLength, Date]),
+ %timer:sleep(100),
+ NewAcc
+ end;
+ % destination table is not empty
+ FromDBMod /= mod_logdb_mnesia_old, ToStats /= [] ->
+ fun({User, _Count}, Acc) ->
+ {ok, ToMsgs} = ToDBMod:get_user_messages_at(User, VHost, Date),
+ lists:foreach(fun(#msg{timestamp=Tst}) when length(Tst) == 16 ->
+ ets:insert(mod_logdb_temp, {Tst});
+ % mysql, pgsql removes final zeros after decimal point
+ (#msg{timestamp=Tst}) when length(Tst) < 16 ->
+ {F, _} = string:to_float(Tst++".0"),
+ [T] = io_lib:format("~.5f", [F]),
+ ets:insert(mod_logdb_temp, {T})
+ end, ToMsgs),
+ {ok, Msgs} = FromDBMod:get_user_messages_at(User, VHost, Date),
+ MAcc =
+ lists:foldl(fun(#msg{timestamp=ToTimestamp} = Msg, MFAcc) ->
+ case ets:member(mod_logdb_temp, ToTimestamp) of
+ false ->
+ ok = ToDBMod:log_message(VHost, Msg),
+ ets:insert(mod_logdb_temp, {ToTimestamp}),
+ MFAcc + 1;
+ true ->
+ MFAcc
+ end
+ end, 0, Msgs),
+ NewAcc = Acc + 1,
+ ets:delete_all_objects(mod_logdb_temp),
+ ?INFO_MSG("Copied ~p messages for ~p (~p/~p) at ~p", [MAcc, User, NewAcc, StatsLength, Date]),
+ %timer:sleep(100),
+ NewAcc
+ end;
+ % copying from mod_logmnesia
+ true ->
+ fun({User, _Count}, Acc) ->
+ ToStats =
+ case ToDBMod:get_user_messages_at(User, VHost, Date) of
+ {ok, []} ->
+ ok;
+ {ok, ToMsgs} ->
+ lists:foreach(fun(#msg{timestamp=Tst}) when length(Tst) == 16 ->
+ ets:insert(mod_logdb_temp, {Tst});
+ % mysql, pgsql removes final zeros after decimal point
+ (#msg{timestamp=Tst}) when length(Tst) < 15 ->
+ {F, _} = string:to_float(Tst++".0"),
+ [T] = io_lib:format("~.5f", [F]),
+ ets:insert(mod_logdb_temp, {T})
+ end, ToMsgs);
+ {error, _} ->
+ ok
+ end,
+ {ok, Msgs} = FromDBMod:get_user_messages_at(User, VHost, Date),
+
+ MAcc =
+ lists:foldl(
+ fun({msg, TU, TS, TR, FU, FS, FR, Type, Subj, Body, Timest},
+ MFAcc) ->
+ [Timestamp] = if is_float(Timest) == true ->
+ io_lib:format("~.5f", [Timest]);
+ % early versions of mod_logmnesia
+ is_integer(Timest) == true ->
+ io_lib:format("~.5f", [Timest-719528*86400.0]);
+ true ->
+ ?ERROR_MSG("Incorrect timestamp ~p", [Timest]),
+ throw(error)
+ end,
+ case ets:member(mod_logdb_temp, Timestamp) of
+ false ->
+ if
+ % from
+ TS == VHost ->
+ TMsg = #msg{timestamp=Timestamp,
+ owner_name=TU,
+ peer_name=FU, peer_server=FS, peer_resource=FR,
+ direction=from,
+ type=Type,
+ subject=Subj, body=Body},
+ ok = ToDBMod:log_message(VHost, TMsg);
+ true -> ok
+ end,
+ if
+ % to
+ FS == VHost ->
+ FMsg = #msg{timestamp=Timestamp,
+ owner_name=FU,
+ peer_name=TU, peer_server=TS, peer_resource=TR,
+ direction=to,
+ type=Type,
+ subject=Subj, body=Body},
+ ok = ToDBMod:log_message(VHost, FMsg);
+ true -> ok
+ end,
+ ets:insert(mod_logdb_temp, {Timestamp}),
+ MFAcc + 1;
+ true -> % not ets:member
+ MFAcc
+ end % case
+ end, 0, Msgs), % foldl
+ NewAcc = Acc + 1,
+ ?INFO_MSG("Copied ~p messages for ~p (~p/~p) at ~p", [MAcc, User, NewAcc, StatsLength, Date]),
+ %timer:sleep(100),
+ NewAcc
+ end % fun
+ end, % if FromDBMod /= mod_logdb_mnesia_old
+
+ if
+ FromStats == [] ->
+ ?INFO_MSG("No messages were found at ~p", [Date]);
+ FromStatsS == ToStatsS ->
+ ?INFO_MSG("Stats are equal at ~p", [Date]);
+ FromStatsS /= ToStatsS ->
+ lists:foldl(CopyFun, 0, FromStats),
+ ok = ToDBMod:rebuild_stats_at(VHost, Date)
+ %timer:sleep(1000)
+ end,
+
+ ok.
+
+list_to_bool(Num) ->
+ case lists:member(Num, ["t", "true", "y", "yes", "1"]) of
+ true ->
+ true;
+ false ->
+ case lists:member(Num, ["f", "false", "n", "no", "0"]) of
+ true ->
+ false;
+ false ->
+ error
+ end
+ end.
+
+bool_to_list(true) ->
+ "TRUE";
+bool_to_list(false) ->
+ "FALSE".
+
+list_to_string([]) ->
+ "";
+list_to_string(List) when is_list(List) ->
+ Str = lists:flatmap(fun(Elm) -> Elm ++ "\n" end, List),
+ lists:sublist(Str, length(Str)-1).
+
+string_to_list(null) ->
+ [];
+string_to_list([]) ->
+ [];
+string_to_list(String) ->
+ {ok, List} = re:split(String, "\n"),
+ List.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% ad-hoc (copy/pasted from mod_configure.erl)
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+-define(ITEMS_RESULT(Allow, LNode, Fallback),
+ case Allow of
+ deny ->
+ Fallback;
+ allow ->
+ case get_local_items(LServer, LNode,
+ jlib:jid_to_string(To), Lang) of
+ {result, Res} ->
+ {result, Res};
+ {error, Error} ->
+ {error, Error}
+ end
+ end).
+
+get_local_items(Acc, From, #jid{lserver = LServer} = To, "", Lang) ->
+ case gen_mod:is_loaded(LServer, mod_adhoc) of
+ false ->
+ Acc;
+ _ ->
+ Items = case Acc of
+ {result, Its} -> Its;
+ empty -> []
+ end,
+ AllowUser = acl:match_rule(LServer, mod_logdb, From),
+ AllowAdmin = acl:match_rule(LServer, mod_logdb_admin, From),
+ if
+ AllowUser == allow; AllowAdmin == allow ->
+ case get_local_items(LServer, [],
+ jlib:jid_to_string(To), Lang) of
+ {result, Res} ->
+ {result, Items ++ Res};
+ {error, _Error} ->
+ {result, Items}
+ end;
+ true ->
+ {result, Items}
+ end
+ end;
+get_local_items(Acc, From, #jid{lserver = LServer} = To, Node, Lang) ->
+ case gen_mod:is_loaded(LServer, mod_adhoc) of
+ false ->
+ Acc;
+ _ ->
+ LNode = string:tokens(Node, "/"),
+ AllowAdmin = acl:match_rule(LServer, mod_logdb_admin, From),
+ case LNode of
+ ["mod_logdb"] ->
+ ?ITEMS_RESULT(AllowAdmin, LNode, {error, ?ERR_FORBIDDEN});
+ ["mod_logdb_users"] ->
+ ?ITEMS_RESULT(AllowAdmin, LNode, {error, ?ERR_FORBIDDEN});
+ ["mod_logdb_users", [$@ | _]] ->
+ ?ITEMS_RESULT(AllowAdmin, LNode, {error, ?ERR_FORBIDDEN});
+ ["mod_logdb_users", _User] ->
+ ?ITEMS_RESULT(AllowAdmin, LNode, {error, ?ERR_FORBIDDEN});
+ ["mod_logdb_settings"] ->
+ ?ITEMS_RESULT(AllowAdmin, LNode, {error, ?ERR_FORBIDDEN});
+ _ ->
+ Acc
+ end
+ end.
+
+-define(NODE(Name, Node),
+ {xmlelement, "item",
+ [{"jid", Server},
+ {"name", translate:translate(Lang, Name)},
+ {"node", Node}], []}).
+
+get_local_items(_Host, [], Server, Lang) ->
+ {result,
+ [?NODE("Messages logging engine", "mod_logdb")]
+ };
+get_local_items(_Host, ["mod_logdb"], Server, Lang) ->
+ {result,
+ [?NODE("Messages logging engine users", "mod_logdb_users"),
+ ?NODE("Messages logging engine settings", "mod_logdb_settings")]
+ };
+get_local_items(Host, ["mod_logdb_users"], Server, Lang) ->
+ {result, get_all_vh_users(Host, Server, Lang)};
+get_local_items(_Host, ["mod_logdb_users", [$@ | Diap]], Server, Lang) ->
+ case catch ejabberd_auth:dirty_get_registered_users() of
+ {'EXIT', _Reason} ->
+ ?ERR_INTERNAL_SERVER_ERROR;
+ Users ->
+ SUsers = lists:sort([{S, U} || {U, S} <- Users]),
+ case catch begin
+ {ok, [S1, S2]} = re:split(Diap, "-"),
+ N1 = list_to_integer(S1),
+ N2 = list_to_integer(S2),
+ Sub = lists:sublist(SUsers, N1, N2 - N1 + 1),
+ lists:map(fun({S, U}) ->
+ ?NODE(U ++ "@" ++ S, "mod_logdb_users/" ++ U ++ "@" ++ S)
+ end, Sub)
+ end of
+ {'EXIT', _Reason} ->
+ ?ERR_NOT_ACCEPTABLE;
+ Res ->
+ {result, Res}
+ end
+ end;
+get_local_items(_Host, ["mod_logdb_users", _User], _Server, _Lang) ->
+ {result, []};
+get_local_items(_Host, ["mod_logdb_settings"], _Server, _Lang) ->
+ {result, []};
+get_local_items(_Host, Item, _Server, _Lang) ->
+ ?MYDEBUG("asked for items in ~p", [Item]),
+ {error, ?ERR_ITEM_NOT_FOUND}.
+
+-define(INFO_RESULT(Allow, Feats),
+ case Allow of
+ deny ->
+ {error, ?ERR_FORBIDDEN};
+ allow ->
+ {result, Feats}
+ end).
+
+get_local_features(Acc, From, #jid{lserver = LServer} = _To, Node, _Lang) ->
+ case gen_mod:is_loaded(LServer, mod_adhoc) of
+ false ->
+ Acc;
+ _ ->
+ LNode = string:tokens(Node, "/"),
+ AllowUser = acl:match_rule(LServer, mod_logdb, From),
+ AllowAdmin = acl:match_rule(LServer, mod_logdb_admin, From),
+ case LNode of
+ ["mod_logdb"] when AllowUser == allow; AllowAdmin == allow ->
+ ?INFO_RESULT(allow, [?NS_COMMANDS]);
+ ["mod_logdb"] ->
+ ?INFO_RESULT(deny, [?NS_COMMANDS]);
+ ["mod_logdb_users"] ->
+ ?INFO_RESULT(AllowAdmin, []);
+ ["mod_logdb_users", [$@ | _]] ->
+ ?INFO_RESULT(AllowAdmin, []);
+ ["mod_logdb_users", _User] ->
+ ?INFO_RESULT(AllowAdmin, [?NS_COMMANDS]);
+ ["mod_logdb_settings"] ->
+ ?INFO_RESULT(AllowAdmin, [?NS_COMMANDS]);
+ [] ->
+ Acc;
+ _ ->
+ %?MYDEBUG("asked for ~p features: ~p", [LNode, Allow]),
+ Acc
+ end
+ end.
+
+-define(INFO_IDENTITY(Category, Type, Name, Lang),
+ [{xmlelement, "identity",
+ [{"category", Category},
+ {"type", Type},
+ {"name", translate:translate(Lang, Name)}], []}]).
+
+-define(INFO_COMMAND(Name, Lang),
+ ?INFO_IDENTITY("automation", "command-node", Name, Lang)).
+
+get_local_identity(Acc, _From, _To, Node, Lang) ->
+ LNode = string:tokens(Node, "/"),
+ case LNode of
+ ["mod_logdb"] ->
+ ?INFO_COMMAND("Messages logging engine", Lang);
+ ["mod_logdb_users"] ->
+ ?INFO_COMMAND("Messages logging engine users", Lang);
+ ["mod_logdb_users", [$@ | _]] ->
+ Acc;
+ ["mod_logdb_users", User] ->
+ ?INFO_COMMAND(User, Lang);
+ ["mod_logdb_settings"] ->
+ ?INFO_COMMAND("Messages logging engine settings", Lang);
+ [] ->
+ Acc;
+ _ ->
+ Acc
+ end.
+
+%get_sm_items(Acc, From, To, Node, Lang) ->
+% ?MYDEBUG("get_sm_items Acc=~p From=~p To=~p Node=~p Lang=~p", [Acc, From, To, Node, Lang]),
+% Acc.
+
+%get_sm_features(Acc, From, To, Node, Lang) ->
+% ?MYDEBUG("get_sm_features Acc=~p From=~p To=~p Node=~p Lang=~p", [Acc, From, To, Node, Lang]),
+% Acc.
+
+%get_sm_identity(Acc, From, To, Node, Lang) ->
+% ?MYDEBUG("get_sm_identity Acc=~p From=~p To=~p Node=~p Lang=~p", [Acc, From, To, Node, Lang]),
+% Acc.
+
+adhoc_local_items(Acc, From, #jid{lserver = LServer, server = Server} = To,
+ Lang) ->
+ Items = case Acc of
+ {result, Its} -> Its;
+ empty -> []
+ end,
+ Nodes = recursively_get_local_items(LServer, "", Server, Lang),
+ Nodes1 = lists:filter(
+ fun(N) ->
+ Nd = xml:get_tag_attr_s("node", N),
+ F = get_local_features([], From, To, Nd, Lang),
+ case F of
+ {result, [?NS_COMMANDS]} ->
+ true;
+ _ ->
+ false
+ end
+ end, Nodes),
+ {result, Items ++ Nodes1}.
+
+recursively_get_local_items(_LServer, "mod_logdb_users", _Server, _Lang) ->
+ [];
+recursively_get_local_items(LServer, Node, Server, Lang) ->
+ LNode = string:tokens(Node, "/"),
+ Items = case get_local_items(LServer, LNode, Server, Lang) of
+ {result, Res} ->
+ Res;
+ {error, _Error} ->
+ []
+ end,
+ Nodes = lists:flatten(
+ lists:map(
+ fun(N) ->
+ S = xml:get_tag_attr_s("jid", N),
+ Nd = xml:get_tag_attr_s("node", N),
+ if (S /= Server) or (Nd == "") ->
+ [];
+ true ->
+ [N, recursively_get_local_items(
+ LServer, Nd, Server, Lang)]
+ end
+ end, Items)),
+ Nodes.
+
+-define(COMMANDS_RESULT(Allow, From, To, Request),
+ case Allow of
+ deny ->
+ {error, ?ERR_FORBIDDEN};
+ allow ->
+ adhoc_local_commands(From, To, Request)
+ end).
+
+adhoc_local_commands(Acc, From, #jid{lserver = LServer} = To,
+ #adhoc_request{node = Node} = Request) ->
+ LNode = string:tokens(Node, "/"),
+ AllowUser = acl:match_rule(LServer, mod_logdb, From),
+ AllowAdmin = acl:match_rule(LServer, mod_logdb_admin, From),
+ case LNode of
+ ["mod_logdb"] when AllowUser == allow; AllowAdmin == allow ->
+ ?COMMANDS_RESULT(allow, From, To, Request);
+ ["mod_logdb_users", _User] when AllowAdmin == allow ->
+ ?COMMANDS_RESULT(allow, From, To, Request);
+ ["mod_logdb_settings"] when AllowAdmin == allow ->
+ ?COMMANDS_RESULT(allow, From, To, Request);
+ _ ->
+ Acc
+ end.
+
+adhoc_local_commands(From, #jid{lserver = LServer} = _To,
+ #adhoc_request{lang = Lang,
+ node = Node,
+ sessionid = SessionID,
+ action = Action,
+ xdata = XData} = Request) ->
+ LNode = string:tokens(Node, "/"),
+ %% If the "action" attribute is not present, it is
+ %% understood as "execute". If there was no <actions/>
+ %% element in the first response (which there isn't in our
+ %% case), "execute" and "complete" are equivalent.
+ ActionIsExecute = lists:member(Action,
+ ["", "execute", "complete"]),
+ if Action == "cancel" ->
+ %% User cancels request
+ adhoc:produce_response(
+ Request,
+ #adhoc_response{status = canceled});
+ XData == false, ActionIsExecute ->
+ %% User requests form
+ case get_form(LServer, LNode, From, Lang) of
+ {result, Form} ->
+ adhoc:produce_response(
+ Request,
+ #adhoc_response{status = executing,
+ elements = Form});
+ {error, Error} ->
+ {error, Error}
+ end;
+ XData /= false, ActionIsExecute ->
+ %% User returns form.
+ case jlib:parse_xdata_submit(XData) of
+ invalid ->
+ {error, ?ERR_BAD_REQUEST};
+ Fields ->
+ case set_form(From, LServer, LNode, Lang, Fields) of
+ {result, _Res} ->
+ adhoc:produce_response(
+ #adhoc_response{lang = Lang,
+ node = Node,
+ sessionid = SessionID,
+ status = completed});
+ {error, Error} ->
+ {error, Error}
+ end
+ end;
+ true ->
+ {error, ?ERR_BAD_REQUEST}
+ end.
+
+-define(LISTLINE(Label, Value),
+ {xmlelement, "option", [{"label", Label}],
+ [{xmlelement, "value", [], [{xmlcdata, Value}]}]}).
+-define(DEFVAL(Value), {xmlelement, "value", [], [{xmlcdata, Value}]}).
+
+get_user_form(LUser, LServer, Lang) ->
+ %From = jlib:jid_to_string(jlib:jid_remove_resource(Jid)),
+ #user_settings{dolog_default=DLD,
+ dolog_list=DLL,
+ donotlog_list=DNLL} = get_user_settings(LUser, LServer),
+ {result, [{xmlelement, "x", [{"xmlns", ?NS_XDATA}],
+ [{xmlelement, "title", [],
+ [{xmlcdata,
+ translate:translate(
+ Lang, "Messages logging engine settings")}]},
+ {xmlelement, "instructions", [],
+ [{xmlcdata,
+ translate:translate(
+ Lang, "Set logging preferences")++ ": " ++ LUser ++ "@" ++ LServer}]},
+ % default to log
+ {xmlelement, "field", [{"type", "list-single"},
+ {"label",
+ translate:translate(Lang, "Default")},
+ {"var", "dolog_default"}],
+ [?DEFVAL(atom_to_list(DLD)),
+ ?LISTLINE(translate:translate(Lang, "Log Messages"), "true"),
+ ?LISTLINE(translate:translate(Lang, "Do Not Log Messages"), "false")
+ ]},
+ % do log list
+ {xmlelement, "field", [{"type", "text-multi"},
+ {"label",
+ translate:translate(
+ Lang, "Log Messages")},
+ {"var", "dolog_list"}],
+ [{xmlelement, "value", [], [{xmlcdata, list_to_string(DLL)}]}]},
+ % do not log list
+ {xmlelement, "field", [{"type", "text-multi"},
+ {"label",
+ translate:translate(
+ Lang, "Do Not Log Messages")},
+ {"var", "donotlog_list"}],
+ [{xmlelement, "value", [], [{xmlcdata, list_to_string(DNLL)}]}]}
+ ]}]}.
+
+get_settings_form(Host, Lang) ->
+ #state{dbmod=DBMod,
+ dbs=DBs,
+ dolog_default=DLD,
+ ignore_jids=IgnoreJids,
+ groupchat=GroupChat,
+ purge_older_days=PurgeDaysT,
+ drop_messages_on_user_removal=MRemoval,
+ poll_users_settings=PollTime} = mod_logdb:get_module_settings(Host),
+
+ Backends = lists:map(fun({Backend, _Opts}) ->
+ ?LISTLINE(atom_to_list(Backend), atom_to_list(Backend))
+ end, DBs),
+ DB = lists:sublist(atom_to_list(DBMod), length(atom_to_list(?MODULE)) + 2, length(atom_to_list(DBMod))),
+ DBsL = lists:append([?DEFVAL(DB)], Backends),
+
+ PurgeDays =
+ case PurgeDaysT of
+ never -> "never";
+ Num when is_integer(Num) -> integer_to_list(Num);
+ _ -> "unknown"
+ end,
+ {result, [{xmlelement, "x", [{"xmlns", ?NS_XDATA}],
+ [{xmlelement, "title", [],
+ [{xmlcdata,
+ translate:translate(
+ Lang, "Messages logging engine settings") ++ " (run-time)"}]},
+ {xmlelement, "instructions", [],
+ [{xmlcdata,
+ translate:translate(
+ Lang, "Set run-time settings")}]},
+ % backends
+ {xmlelement, "field", [{"type", "list-single"},
+ {"label",
+ translate:translate(Lang, "Backend")},
+ {"var", "backend"}],
+ DBsL},
+ % dbs
+ {xmlelement, "field", [{"type", "text-multi"},
+ {"label",
+ translate:translate(
+ Lang, "dbs")},
+ {"var", "dbs"}],
+ [{xmlelement, "value", [], [{xmlcdata, lists:flatten(io_lib:format("~p.",[DBs]))}]}]},
+ % default to log
+ {xmlelement, "field", [{"type", "list-single"},
+ {"label",
+ translate:translate(Lang, "Default")},
+ {"var", "dolog_default"}],
+ [?DEFVAL(atom_to_list(DLD)),
+ ?LISTLINE(translate:translate(Lang, "Log Messages"), "true"),
+ ?LISTLINE(translate:translate(Lang, "Do Not Log Messages"), "false")
+ ]},
+ % drop_messages_on_user_removal
+ {xmlelement, "field", [{"type", "list-single"},
+ {"label",
+ translate:translate(Lang, "Drop messages on user removal")},
+ {"var", "drop_messages_on_user_removal"}],
+ [?DEFVAL(atom_to_list(MRemoval)),
+ ?LISTLINE(translate:translate(Lang, "Drop"), "true"),
+ ?LISTLINE(translate:translate(Lang, "Do not drop"), "false")
+ ]},
+ % groupchat
+ {xmlelement, "field", [{"type", "list-single"},
+ {"label",
+ translate:translate(Lang, "Groupchat messages logging")},
+ {"var", "groupchat"}],
+ [?DEFVAL(atom_to_list(GroupChat)),
+ ?LISTLINE("all", "all"),
+ ?LISTLINE("none", "none"),
+ ?LISTLINE("send", "send"),
+ ?LISTLINE("half", "half")
+ ]},
+ % ignore_jids
+ {xmlelement, "field", [{"type", "text-multi"},
+ {"label",
+ translate:translate(
+ Lang, "Jids/Domains to ignore")},
+ {"var", "ignore_list"}],
+ [{xmlelement, "value", [], [{xmlcdata, list_to_string(IgnoreJids)}]}]},
+ % purge older days
+ {xmlelement, "field", [{"type", "text-single"},
+ {"label",
+ translate:translate(
+ Lang, "Purge messages older than (days)")},
+ {"var", "purge_older_days"}],
+ [{xmlelement, "value", [], [{xmlcdata, PurgeDays}]}]},
+ % poll users settings
+ {xmlelement, "field", [{"type", "text-single"},
+ {"label",
+ translate:translate(
+ Lang, "Poll users settings (seconds)")},
+ {"var", "poll_users_settings"}],
+ [{xmlelement, "value", [], [{xmlcdata, integer_to_list(PollTime)}]}]}
+ ]}]}.
+
+get_form(_Host, ["mod_logdb"], #jid{luser = LUser, lserver = LServer} = _Jid, Lang) ->
+ get_user_form(LUser, LServer, Lang);
+get_form(_Host, ["mod_logdb_users", User], _JidFrom, Lang) ->
+ #jid{luser=LUser, lserver=LServer} = jlib:string_to_jid(User),
+ get_user_form(LUser, LServer, Lang);
+get_form(Host, ["mod_logdb_settings"], _JidFrom, Lang) ->
+ get_settings_form(Host, Lang);
+get_form(_Host, Command, _, _Lang) ->
+ ?MYDEBUG("asked for form ~p", [Command]),
+ {error, ?ERR_SERVICE_UNAVAILABLE}.
+
+check_log_list([Head | Tail]) ->
+ case lists:member($@, Head) of
+ true -> ok;
+ false -> throw(error)
+ end,
+ % this check for Head to be valid jid
+ case jlib:string_to_jid(Head) of
+ error ->
+ throw(error);
+ _ ->
+ check_log_list(Tail)
+ end;
+check_log_list([]) ->
+ ok.
+
+check_ignore_list([Head | Tail]) ->
+ case lists:member($@, Head) of
+ true -> ok;
+ false -> throw(error)
+ end,
+ % this check for Head to be valid jid
+ case jlib:string_to_jid(Head) of
+ error ->
+ % this check for Head to be valid domain "@domain.org"
+ case lists:nth(1, Head) of
+ $@ ->
+ % TODO: this allows spaces and special characters in Head. May be change to nodeprep?
+ case jlib:nameprep(lists:delete($@, Head)) of
+ error -> throw(error);
+ _ -> check_log_list(Tail)
+ end;
+ _ -> throw(error)
+ end;
+ _ ->
+ check_ignore_list(Tail)
+ end;
+check_ignore_list([]) ->
+ ok.
+
+parse_users_settings(XData) ->
+ DLD = case lists:keysearch("dolog_default", 1, XData) of
+ {value, {_, [String]}} when String == "true"; String == "false" ->
+ list_to_bool(String);
+ _ ->
+ throw(bad_request)
+ end,
+ DLL = case lists:keysearch("dolog_list", 1, XData) of
+ false ->
+ throw(bad_request);
+ {value, {_, [[]]}} ->
+ [];
+ {value, {_, List1}} ->
+ case catch check_log_list(List1) of
+ error ->
+ throw(bad_request);
+ ok ->
+ List1
+ end
+ end,
+ DNLL = case lists:keysearch("donotlog_list", 1, XData) of
+ false ->
+ throw(bad_request);
+ {value, {_, [[]]}} ->
+ [];
+ {value, {_, List2}} ->
+ case catch check_log_list(List2) of
+ error ->
+ throw(bad_request);
+ ok ->
+ List2
+ end
+ end,
+ #user_settings{dolog_default=DLD,
+ dolog_list=DLL,
+ donotlog_list=DNLL}.
+
+parse_module_settings(XData) ->
+ DLD = case lists:keysearch("dolog_default", 1, XData) of
+ {value, {_, [Str1]}} when Str1 == "true"; Str1 == "false" ->
+ list_to_bool(Str1);
+ _ ->
+ throw(bad_request)
+ end,
+ MRemoval = case lists:keysearch("drop_messages_on_user_removal", 1, XData) of
+ {value, {_, [Str5]}} when Str5 == "true"; Str5 == "false" ->
+ list_to_bool(Str5);
+ _ ->
+ throw(bad_request)
+ end,
+ GroupChat = case lists:keysearch("groupchat", 1, XData) of
+ {value, {_, [Str2]}} when Str2 == "none";
+ Str2 == "all";
+ Str2 == "send";
+ Str2 == "half" ->
+ list_to_atom(Str2);
+ _ ->
+ throw(bad_request)
+ end,
+ Ignore = case lists:keysearch("ignore_list", 1, XData) of
+ {value, {_, List}} ->
+ case catch check_ignore_list(List) of
+ ok ->
+ List;
+ error ->
+ throw(bad_request)
+ end;
+ _ ->
+ throw(bad_request)
+ end,
+ Purge = case lists:keysearch("purge_older_days", 1, XData) of
+ {value, {_, ["never"]}} ->
+ never;
+ {value, {_, [Str3]}} ->
+ case catch list_to_integer(Str3) of
+ {'EXIT', {badarg, _}} -> throw(bad_request);
+ Int1 -> Int1
+ end;
+ _ ->
+ throw(bad_request)
+ end,
+ Poll = case lists:keysearch("poll_users_settings", 1, XData) of
+ {value, {_, [Str4]}} ->
+ case catch list_to_integer(Str4) of
+ {'EXIT', {badarg, _}} -> throw(bad_request);
+ Int2 -> Int2
+ end;
+ _ ->
+ throw(bad_request)
+ end,
+ #state{dolog_default=DLD,
+ groupchat=GroupChat,
+ ignore_jids=Ignore,
+ purge_older_days=Purge,
+ drop_messages_on_user_removal=MRemoval,
+ poll_users_settings=Poll}.
+
+set_form(From, _Host, ["mod_logdb"], _Lang, XData) ->
+ #jid{luser=LUser, lserver=LServer} = From,
+ case catch parse_users_settings(XData) of
+ bad_request ->
+ {error, ?ERR_BAD_REQUEST};
+ UserSettings ->
+ case mod_logdb:set_user_settings(LUser, LServer, UserSettings) of
+ ok ->
+ {result, []};
+ error ->
+ {error, ?ERR_INTERNAL_SERVER_ERROR}
+ end
+ end;
+set_form(_From, _Host, ["mod_logdb_users", User], _Lang, XData) ->
+ #jid{luser=LUser, lserver=LServer} = jlib:string_to_jid(User),
+ case catch parse_users_settings(XData) of
+ bad_request -> {error, ?ERR_BAD_REQUEST};
+ UserSettings ->
+ case mod_logdb:set_user_settings(LUser, LServer, UserSettings) of
+ ok ->
+ {result, []};
+ error ->
+ {error, ?ERR_INTERNAL_SERVER_ERROR}
+ end
+ end;
+set_form(_From, Host, ["mod_logdb_settings"], _Lang, XData) ->
+ case catch parse_module_settings(XData) of
+ bad_request -> {error, ?ERR_BAD_REQUEST};
+ Settings ->
+ case mod_logdb:set_module_settings(Host, Settings) of
+ ok ->
+ {result, []};
+ error ->
+ {error, ?ERR_INTERNAL_SERVER_ERROR}
+ end
+ end;
+set_form(From, _Host, Node, _Lang, XData) ->
+ User = jlib:jid_to_string(jlib:jid_remove_resource(From)),
+ ?MYDEBUG("set form for ~p at ~p XData=~p", [User, Node, XData]),
+ {error, ?ERR_SERVICE_UNAVAILABLE}.
+
+%adhoc_sm_items(Acc, From, To, Request) ->
+% ?MYDEBUG("adhoc_sm_items Acc=~p From=~p To=~p Request=~p", [Acc, From, To, Request]),
+% Acc.
+
+%adhoc_sm_commands(Acc, From, To, Request) ->
+% ?MYDEBUG("adhoc_sm_commands Acc=~p From=~p To=~p Request=~p", [Acc, From, To, Request]),
+% Acc.
+
+get_all_vh_users(Host, Server, Lang) ->
+ case catch ejabberd_auth:get_vh_registered_users(Host) of
+ {'EXIT', _Reason} ->
+ [];
+ Users ->
+ SUsers = lists:sort([{S, U} || {U, S} <- Users]),
+ case length(SUsers) of
+ N when N =< 100 ->
+ lists:map(fun({S, U}) ->
+ ?NODE(U ++ "@" ++ S, "mod_logdb_users/" ++ U ++ "@" ++ S)
+ end, SUsers);
+ N ->
+ NParts = trunc(math:sqrt(N * 0.618)) + 1,
+ M = trunc(N / NParts) + 1,
+ lists:map(fun(K) ->
+ L = K + M - 1,
+ Node =
+ "@" ++ integer_to_list(K) ++
+ "-" ++ integer_to_list(L),
+ {FS, FU} = lists:nth(K, SUsers),
+ {LS, LU} =
+ if L < N -> lists:nth(L, SUsers);
+ true -> lists:last(SUsers)
+ end,
+ Name =
+ FU ++ "@" ++ FS ++
+ " -- " ++
+ LU ++ "@" ++ LS,
+ ?NODE(Name, "mod_logdb_users/" ++ Node)
+ end, lists:seq(1, N, M))
+ end
+ end.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% webadmin hooks
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+webadmin_menu(Acc, _Host, Lang) ->
+ [{"messages", ?T("Users Messages")} | Acc].
+
+webadmin_user(Acc, User, Server, Lang) ->
+ Sett = get_user_settings(User, Server),
+ Log =
+ case Sett#user_settings.dolog_default of
+ false ->
+ ?INPUTT("submit", "dolog", "Log Messages");
+ true ->
+ ?INPUTT("submit", "donotlog", "Do Not Log Messages");
+ _ -> []
+ end,
+ Acc ++ [?XE("h3", [?ACT("messages/", "Messages"), ?C(" "), Log])].
+
+webadmin_page(_, Host,
+ #request{path = ["messages"],
+ q = Query,
+ lang = Lang}) when is_list(Host) ->
+ Res = vhost_messages_stats(Host, Query, Lang),
+ {stop, Res};
+webadmin_page(_, Host,
+ #request{path = ["messages", Date],
+ q = Query,
+ lang = Lang}) when is_list(Host) ->
+ Res = vhost_messages_stats_at(Host, Query, Lang, Date),
+ {stop, Res};
+webadmin_page(_, Host,
+ #request{path = ["user", U, "messages"],
+ q = Query,
+ lang = Lang}) ->
+ Res = user_messages_stats(U, Host, Query, Lang),
+ {stop, Res};
+webadmin_page(_, Host,
+ #request{path = ["user", U, "messages", Date],
+ q = Query,
+ lang = Lang}) ->
+ Res = mod_logdb:user_messages_stats_at(U, Host, Query, Lang, Date),
+ {stop, Res};
+webadmin_page(Acc, _, _) -> Acc.
+
+user_parse_query(_, "dolog", User, Server, _Query) ->
+ Sett = get_user_settings(User, Server),
+ % TODO: check returned value
+ set_user_settings(User, Server, Sett#user_settings{dolog_default=true}),
+ {stop, ok};
+user_parse_query(_, "donotlog", User, Server, _Query) ->
+ Sett = get_user_settings(User, Server),
+ % TODO: check returned value
+ set_user_settings(User, Server, Sett#user_settings{dolog_default=false}),
+ {stop, ok};
+user_parse_query(Acc, _Action, _User, _Server, _Query) ->
+ Acc.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% webadmin funcs
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+vhost_messages_stats(Server, Query, Lang) ->
+ Res = case catch vhost_messages_parse_query(Server, Query) of
+ {'EXIT', Reason} ->
+ ?ERROR_MSG("~p", [Reason]),
+ error;
+ VResult -> VResult
+ end,
+ {Time, Value} = timer:tc(mod_logdb, get_vhost_stats, [Server]),
+ ?INFO_MSG("get_vhost_stats(~p) elapsed ~p sec", [Server, Time/1000000]),
+ %case get_vhost_stats(Server) of
+ case Value of
+ {'EXIT', CReason} ->
+ ?ERROR_MSG("Failed to get_vhost_stats: ~p", [CReason]),
+ [?XC("h1", ?T("Error occupied while fetching list"))];
+ {error, GReason} ->
+ ?ERROR_MSG("Failed to get_vhost_stats: ~p", [GReason]),
+ [?XC("h1", ?T("Error occupied while fetching list"))];
+ {ok, []} ->
+ [?XC("h1", ?T("No logged messages for ") ++ Server)];
+ {ok, Dates} ->
+ Fun = fun({Date, Count}) ->
+ ID = jlib:encode_base64(binary_to_list(term_to_binary(Server++Date))),
+ ?XE("tr",
+ [?XE("td", [?INPUT("checkbox", "selected", ID)]),
+ ?XE("td", [?AC(Date, Date)]),
+ ?XC("td", integer_to_list(Count))
+ ])
+ end,
+ [?XC("h1", ?T("Logged messages for ") ++ Server)] ++
+ case Res of
+ ok -> [?CT("Submitted"), ?P];
+ error -> [?CT("Bad format"), ?P];
+ nothing -> []
+ end ++
+ [?XAE("form", [{"action", ""}, {"method", "post"}],
+ [?XE("table",
+ [?XE("thead",
+ [?XE("tr",
+ [?X("td"),
+ ?XCT("td", "Date"),
+ ?XCT("td", "Count")
+ ])]),
+ ?XE("tbody",
+ lists:map(Fun, Dates)
+ )]),
+ ?BR,
+ ?INPUTT("submit", "delete", "Delete Selected")
+ ])]
+ end.
+
+vhost_messages_stats_at(Server, Query, Lang, Date) ->
+ {Time, Value} = timer:tc(mod_logdb, get_vhost_stats_at, [Server, Date]),
+ ?INFO_MSG("get_vhost_stats_at(~p,~p) elapsed ~p sec", [Server, Date, Time/1000000]),
+ %case get_vhost_stats_at(Server, Date) of
+ case Value of
+ {'EXIT', CReason} ->
+ ?ERROR_MSG("Failed to get_vhost_stats_at: ~p", [CReason]),
+ [?XC("h1", ?T("Error occupied while fetching list"))];
+ {error, GReason} ->
+ ?ERROR_MSG("Failed to get_vhost_stats_at: ~p", [GReason]),
+ [?XC("h1", ?T("Error occupied while fetching list"))];
+ {ok, []} ->
+ [?XC("h1", ?T("No logged messages for ") ++ Server ++ ?T(" at ") ++ Date)];
+ {ok, Users} ->
+ Res = case catch vhost_messages_at_parse_query(Server, Date, Users, Query) of
+ {'EXIT', Reason} ->
+ ?ERROR_MSG("~p", [Reason]),
+ error;
+ VResult -> VResult
+ end,
+ Fun = fun({User, Count}) ->
+ ID = jlib:encode_base64(binary_to_list(term_to_binary(User++Server))),
+ ?XE("tr",
+ [?XE("td", [?INPUT("checkbox", "selected", ID)]),
+ ?XE("td", [?AC("../user/"++User++"/messages/"++Date, User)]),
+ ?XC("td", integer_to_list(Count))
+ ])
+ end,
+ [?XC("h1", ?T("Logged messages for ") ++ Server ++ ?T(" at ") ++ Date)] ++
+ case Res of
+ ok -> [?CT("Submitted"), ?P];
+ error -> [?CT("Bad format"), ?P];
+ nothing -> []
+ end ++
+ [?XAE("form", [{"action", ""}, {"method", "post"}],
+ [?XE("table",
+ [?XE("thead",
+ [?XE("tr",
+ [?X("td"),
+ ?XCT("td", "User"),
+ ?XCT("td", "Count")
+ ])]),
+ ?XE("tbody",
+ lists:map(Fun, Users)
+ )]),
+ ?BR,
+ ?INPUTT("submit", "delete", "Delete Selected")
+ ])]
+ end.
+
+user_messages_stats(User, Server, Query, Lang) ->
+ Jid = jlib:jid_to_string({User, Server, ""}),
+
+ Res = case catch user_messages_parse_query(User, Server, Query) of
+ {'EXIT', Reason} ->
+ ?ERROR_MSG("~p", [Reason]),
+ error;
+ VResult -> VResult
+ end,
+
+ {Time, Value} = timer:tc(mod_logdb, get_user_stats, [User, Server]),
+ ?INFO_MSG("get_user_stats(~p,~p) elapsed ~p sec", [User, Server, Time/1000000]),
+
+ case Value of
+ {'EXIT', CReason} ->
+ ?ERROR_MSG("Failed to get_user_stats: ~p", [CReason]),
+ [?XC("h1", ?T("Error occupied while fetching days"))];
+ {error, GReason} ->
+ ?ERROR_MSG("Failed to get_user_stats: ~p", [GReason]),
+ [?XC("h1", ?T("Error occupied while fetching days"))];
+ {ok, []} ->
+ [?XC("h1", ?T("No logged messages for ") ++ Jid)];
+ {ok, Dates} ->
+ Fun = fun({Date, Count}) ->
+ ID = jlib:encode_base64(binary_to_list(term_to_binary(User++Date))),
+ ?XE("tr",
+ [?XE("td", [?INPUT("checkbox", "selected", ID)]),
+ ?XE("td", [?AC(Date, Date)]),
+ ?XC("td", integer_to_list(Count))
+ ])
+ %[?AC(Date, Date ++ " (" ++ integer_to_list(Count) ++ ")"), ?BR]
+ end,
+ [?XC("h1", ?T("Logged messages for ") ++ Jid)] ++
+ case Res of
+ ok -> [?CT("Submitted"), ?P];
+ error -> [?CT("Bad format"), ?P];
+ nothing -> []
+ end ++
+ [?XAE("form", [{"action", ""}, {"method", "post"}],
+ [?XE("table",
+ [?XE("thead",
+ [?XE("tr",
+ [?X("td"),
+ ?XCT("td", "Date"),
+ ?XCT("td", "Count")
+ ])]),
+ ?XE("tbody",
+ lists:map(Fun, Dates)
+ )]),
+ ?BR,
+ ?INPUTT("submit", "delete", "Delete Selected")
+ ])]
+ end.
+
+search_user_nick(User, List) ->
+ case lists:keysearch(User, 1, List) of
+ {value,{User, []}} ->
+ nothing;
+ {value,{User, Nick}} ->
+ Nick;
+ false ->
+ nothing
+ end.
+
+user_messages_stats_at(User, Server, Query, Lang, Date) ->
+ Jid = jlib:jid_to_string({User, Server, ""}),
+
+ {Time, Value} = timer:tc(mod_logdb, get_user_messages_at, [User, Server, Date]),
+ ?INFO_MSG("get_user_messages_at(~p,~p,~p) elapsed ~p sec", [User, Server, Date, Time/1000000]),
+ case Value of
+ {'EXIT', CReason} ->
+ ?ERROR_MSG("Failed to get_user_messages_at: ~p", [CReason]),
+ [?XC("h1", ?T("Error occupied while fetching messages"))];
+ {error, GReason} ->
+ ?ERROR_MSG("Failed to get_user_messages_at: ~p", [GReason]),
+ [?XC("h1", ?T("Error occupied while fetching messages"))];
+ {ok, []} ->
+ [?XC("h1", ?T("No logged messages for ") ++ Jid ++ ?T(" at ") ++ Date)];
+ {ok, User_messages} ->
+ Res = case catch user_messages_at_parse_query(Server,
+ Date,
+ User_messages,
+ Query) of
+ {'EXIT', Reason} ->
+ ?ERROR_MSG("~p", [Reason]),
+ error;
+ VResult -> VResult
+ end,
+
+ UR = ejabberd_hooks:run_fold(roster_get, Server, [], [{User, Server}]),
+ UserRoster =
+ lists:map(fun(Item) ->
+ {jlib:jid_to_string(Item#roster.jid), Item#roster.name}
+ end, UR),
+
+ UniqUsers = lists:foldl(fun(#msg{peer_name=PName, peer_server=PServer}, List) ->
+ ToAdd = PName++"@"++PServer,
+ case lists:member(ToAdd, List) of
+ true -> List;
+ false -> lists:append([ToAdd], List)
+ end
+ end, [], User_messages),
+
+ % Users to filter (sublist of UniqUsers)
+ CheckedUsers = case lists:keysearch("filter", 1, Query) of
+ {value, _} ->
+ lists:filter(fun(UFUser) ->
+ ID = jlib:encode_base64(binary_to_list(term_to_binary(UFUser))),
+ lists:member({"selected", ID}, Query)
+ end, UniqUsers);
+ false -> []
+ end,
+
+ % UniqUsers in html (noone selected -> everyone selected)
+ Users = lists:map(fun(UHUser) ->
+ ID = jlib:encode_base64(binary_to_list(term_to_binary(UHUser))),
+ Input = case lists:member(UHUser, CheckedUsers) of
+ true -> [?INPUTC("checkbox", "selected", ID)];
+ false when CheckedUsers == [] -> [?INPUTC("checkbox", "selected", ID)];
+ false -> [?INPUT("checkbox", "selected", ID)]
+ end,
+ Nick =
+ case search_user_nick(UHUser, UserRoster) of
+ nothing -> "";
+ N -> " ("++ N ++")"
+ end,
+ ?XE("tr",
+ [?XE("td", Input),
+ ?XC("td", UHUser++Nick)])
+ end, lists:sort(UniqUsers)),
+ % Messages to show (based on Users)
+ User_messages_filtered = case CheckedUsers of
+ [] -> User_messages;
+ _ -> lists:filter(fun(#msg{peer_name=PName, peer_server=PServer}) ->
+ lists:member(PName++"@"++PServer, CheckedUsers)
+ end, User_messages)
+ end,
+
+ Msgs_Fun = fun(#msg{timestamp=Timestamp,
+ subject=Subject,
+ direction=Direction,
+ peer_name=PName, peer_server=PServer, peer_resource=PRes,
+ type=Type,
+ body=Body}) ->
+ TextRaw = case Subject of
+ "" -> Body;
+ _ -> [?T("Subject"),": ",Subject,"<br>", Body]
+ end,
+ ID = jlib:encode_base64(binary_to_list(term_to_binary(Timestamp))),
+ % replace \n with <br>
+ Text = lists:map(fun(10) -> "<br>";
+ (A) -> A
+ end, TextRaw),
+ Resource = case PRes of
+ [] -> [];
+ undefined -> [];
+ R -> "/" ++ R
+ end,
+ UserNick =
+ case search_user_nick(PName++"@"++PServer, UserRoster) of
+ nothing when PServer == Server ->
+ PName;
+ nothing when Type == "groupchat", Direction == from ->
+ PName++"@"++PServer++Resource;
+ nothing ->
+ PName++"@"++PServer;
+ N -> N
+ end,
+ ?XE("tr",
+ [?XE("td", [?INPUT("checkbox", "selected", ID)]),
+ ?XC("td", convert_timestamp(Timestamp)),
+ ?XC("td", atom_to_list(Direction)++": "++UserNick),
+ ?XC("td", Text)])
+ end,
+ % Filtered user messages in html
+ Msgs = lists:map(Msgs_Fun, lists:sort(User_messages_filtered)),
+
+ [?XC("h1", ?T("Logged messages for ") ++ Jid ++ ?T(" at ") ++ Date)] ++
+ case Res of
+ ok -> [?CT("Submitted"), ?P];
+ error -> [?CT("Bad format"), ?P];
+ nothing -> []
+ end ++
+ [?XAE("form", [{"action", ""}, {"method", "post"}],
+ [?XE("table",
+ [?XE("thead",
+ [?X("td"),
+ ?XCT("td", "User")
+ ]
+ ),
+ ?XE("tbody",
+ Users
+ )]),
+ ?INPUTT("submit", "filter", "Filter Selected")
+ ] ++
+ [?XE("table",
+ [?XE("thead",
+ [?XE("tr",
+ [?X("td"),
+ ?XCT("td", "Date, Time"),
+ ?XCT("td", "Direction: Jid"),
+ ?XCT("td", "Body")
+ ])]),
+ ?XE("tbody",
+ Msgs
+ )]),
+ ?INPUTT("submit", "delete", "Delete Selected"),
+ ?BR
+ ]
+ )]
+ end.
--- mod_logdb.hrl.orig 2010-05-12 17:21:38.000000000 +0300
+++ mod_logdb.hrl 2009-02-05 20:12:58.000000000 +0200
@@ -0,0 +1,35 @@
+%%%----------------------------------------------------------------------
+%%% File : mod_logdb.hrl
+%%% Author : Oleg Palij (mailto,xmpp:[email protected])
+%%% Purpose :
+%%% Version : trunk
+%%% Id : $Id: mod_logdb.hrl 1273 2009-02-05 18:12:57Z malik $
+%%% Url : http://www.dp.uz.gov.ua/o.palij/mod_logdb/
+%%%----------------------------------------------------------------------
+
+-define(logdb_debug, true).
+
+-ifdef(logdb_debug).
+-define(MYDEBUG(Format, Args), io:format("D(~p:~p:~p) : "++Format++"~n",
+ [calendar:local_time(),?MODULE,?LINE]++Args)).
+-else.
+-define(MYDEBUG(_F,_A),[]).
+-endif.
+
+-record(msg, {timestamp,
+ owner_name,
+ peer_name, peer_server, peer_resource,
+ direction,
+ type, subject,
+ body}).
+
+-record(user_settings, {owner_name,
+ dolog_default,
+ dolog_list=[],
+ donotlog_list=[]}).
+
+-define(INPUTC(Type, Name, Value),
+ ?XA("input", [{"type", Type},
+ {"name", Name},
+ {"value", Value},
+ {"checked", "true"}])).
--- mod_logdb_mnesia.erl.orig 2010-05-12 17:21:38.000000000 +0300
+++ mod_logdb_mnesia.erl 2009-02-05 20:12:58.000000000 +0200
@@ -0,0 +1,546 @@
+%%%----------------------------------------------------------------------
+%%% File : mod_logdb_mnesia.erl
+%%% Author : Oleg Palij (mailto,xmpp:[email protected])
+%%% Purpose : mnesia backend for mod_logdb
+%%% Version : trunk
+%%% Id : $Id: mod_logdb_mnesia.erl 1273 2009-02-05 18:12:57Z malik $
+%%% Url : http://www.dp.uz.gov.ua/o.palij/mod_logdb/
+%%%----------------------------------------------------------------------
+
+-module(mod_logdb_mnesia).
+-author('[email protected]').
+
+-include("mod_logdb.hrl").
+-include("ejabberd.hrl").
+-include("jlib.hrl").
+
+-behaviour(gen_logdb).
+-behaviour(gen_server).
+
+% gen_server
+-export([code_change/3,handle_call/3,handle_cast/2,handle_info/2,init/1,terminate/2]).
+% gen_mod
+-export([start/2, stop/1]).
+% gen_logdb
+-export([log_message/2,
+ rebuild_stats/1,
+ rebuild_stats_at/2,
+ delete_messages_by_user_at/3, delete_all_messages_by_user_at/3, delete_messages_at/2,
+ get_vhost_stats/1, get_vhost_stats_at/2, get_user_stats/2, get_user_messages_at/3,
+ get_dates/1,
+ get_users_settings/1, get_user_settings/2, set_user_settings/3,
+ drop_user/2]).
+
+-define(PROCNAME, mod_logdb_mnesia).
+-define(CALL_TIMEOUT, 10000).
+
+-record(state, {vhost}).
+
+-record(stats, {user, at, count}).
+
+prefix() ->
+ "logdb_".
+
+suffix(VHost) ->
+ "_" ++ VHost.
+
+stats_table(VHost) ->
+ list_to_atom(prefix() ++ "stats" ++ suffix(VHost)).
+
+table_name(VHost, Date) ->
+ list_to_atom(prefix() ++ "messages_" ++ Date ++ suffix(VHost)).
+
+settings_table(VHost) ->
+ list_to_atom(prefix() ++ "settings" ++ suffix(VHost)).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% gen_mod callbacks
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+start(VHost, Opts) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:start({local, Proc}, ?MODULE, [VHost, Opts], []).
+
+stop(VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {stop}, ?CALL_TIMEOUT).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% gen_server callbacks
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+init([VHost, _Opts]) ->
+ case mnesia:system_info(is_running) of
+ yes ->
+ ok = create_stats_table(VHost),
+ ok = create_settings_table(VHost),
+ {ok, #state{vhost=VHost}};
+ no ->
+ ?ERROR_MSG("Mnesia not running", []),
+ {stop, db_connection_failed};
+ Status ->
+ ?ERROR_MSG("Mnesia status: ~p", [Status]),
+ {stop, db_connection_failed}
+ end.
+
+handle_call({log_message, Msg}, _From, #state{vhost=VHost}=State) ->
+ {reply, log_message_int(VHost, Msg), State};
+handle_call({rebuild_stats}, _From, #state{vhost=VHost}=State) ->
+ {atomic, ok} = delete_nonexistent_stats(VHost),
+ Reply =
+ lists:foreach(fun(Date) ->
+ rebuild_stats_at_int(VHost, Date)
+ end, get_dates_int(VHost)),
+ {reply, Reply, State};
+handle_call({rebuild_stats_at, Date}, _From, #state{vhost=VHost}=State) ->
+ Reply = rebuild_stats_at_int(VHost, Date),
+ {reply, Reply, State};
+handle_call({delete_messages_by_user_at, Msgs, Date}, _From, #state{vhost=VHost}=State) ->
+ Table = table_name(VHost, Date),
+ Fun = fun() ->
+ lists:foreach(
+ fun(Msg) ->
+ mnesia:write_lock_table(stats_table(VHost)),
+ mnesia:write_lock_table(Table),
+ mnesia:delete_object(Table, Msg, write)
+ end, Msgs)
+ end,
+ DRez = case mnesia:transaction(Fun) of
+ {aborted, Reason} ->
+ ?ERROR_MSG("Failed to delete_messages_by_user_at at ~p for ~p: ~p", [Date, VHost, Reason]),
+ error;
+ _ ->
+ ok
+ end,
+ Reply =
+ case rebuild_stats_at_int(VHost, Date) of
+ error ->
+ error;
+ ok ->
+ DRez
+ end,
+ {reply, Reply, State};
+handle_call({delete_all_messages_by_user_at, User, Date}, _From, #state{vhost=VHost}=State) ->
+ {reply, delete_all_messages_by_user_at_int(User, VHost, Date), State};
+handle_call({delete_messages_at, Date}, _From, #state{vhost=VHost}=State) ->
+ Reply =
+ case mnesia:delete_table(table_name(VHost, Date)) of
+ {atomic, ok} ->
+ delete_stats_by_vhost_at_int(VHost, Date);
+ {aborted, Reason} ->
+ ?ERROR_MSG("Failed to delete_messages_at for ~p at ~p", [VHost, Date, Reason]),
+ error
+ end,
+ {reply, Reply, State};
+handle_call({get_vhost_stats}, _From, #state{vhost=VHost}=State) ->
+ Fun = fun(#stats{at=Date, count=Count}, Stats) ->
+ case lists:keysearch(Date, 1, Stats) of
+ false ->
+ lists:append(Stats, [{Date, Count}]);
+ {value, {_, TempCount}} ->
+ lists:keyreplace(Date, 1, Stats, {Date, TempCount+Count})
+ end
+ end,
+ Reply =
+ case mnesia:transaction(fun() ->
+ mnesia:foldl(Fun, [], stats_table(VHost))
+ end) of
+ {atomic, Result} -> {ok, mod_logdb:sort_stats(Result)};
+ {aborted, Reason} -> {error, Reason}
+ end,
+ {reply, Reply, State};
+handle_call({get_vhost_stats_at, Date}, _From, #state{vhost=VHost}=State) ->
+ Fun = fun() ->
+ Pat = #stats{user='$1', at=Date, count='$2'},
+ mnesia:select(stats_table(VHost), [{Pat, [], [['$1', '$2']]}])
+ end,
+ Reply =
+ case mnesia:transaction(Fun) of
+ {atomic, Result} ->
+ {ok, lists:reverse(lists:keysort(2, [{User, Count} || [User, Count] <- Result]))};
+ {aborted, Reason} ->
+ {error, Reason}
+ end,
+ {reply, Reply, State};
+handle_call({get_user_stats, User}, _From, #state{vhost=VHost}=State) ->
+ {reply, get_user_stats_int(User, VHost), State};
+handle_call({get_user_messages_at, User, Date}, _From, #state{vhost=VHost}=State) ->
+ Reply =
+ case mnesia:transaction(fun() ->
+ Pat = #msg{owner_name=User, _='_'},
+ mnesia:select(table_name(VHost, Date),
+ [{Pat, [], ['$_']}])
+ end) of
+ {atomic, Result} -> {ok, Result};
+ {aborted, Reason} ->
+ {error, Reason}
+ end,
+ {reply, Reply, State};
+handle_call({get_dates}, _From, #state{vhost=VHost}=State) ->
+ {reply, get_dates_int(VHost), State};
+handle_call({get_users_settings}, _From, #state{vhost=VHost}=State) ->
+ Reply = mnesia:dirty_match_object(settings_table(VHost), #user_settings{_='_'}),
+ {reply, {ok, Reply}, State};
+handle_call({get_user_settings, User}, _From, #state{vhost=VHost}=State) ->
+ Reply =
+ case mnesia:dirty_match_object(settings_table(VHost), #user_settings{owner_name=User, _='_'}) of
+ [] -> [];
+ [Setting] ->
+ Setting
+ end,
+ {reply, Reply, State};
+handle_call({set_user_settings, _User, Set}, _From, #state{vhost=VHost}=State) ->
+ ?MYDEBUG("~p~n~p", [settings_table(VHost), Set]),
+ Reply = mnesia:dirty_write(settings_table(VHost), Set),
+ ?MYDEBUG("~p", [Reply]),
+ {reply, Reply, State};
+handle_call({drop_user, User}, _From, #state{vhost=VHost}=State) ->
+ {ok, Dates} = get_user_stats_int(User, VHost),
+ MDResult = lists:map(fun({Date, _}) ->
+ delete_all_messages_by_user_at_int(User, VHost, Date)
+ end, Dates),
+ SDResult = delete_user_settings_int(User, VHost),
+ Reply =
+ case lists:all(fun(Result) when Result == ok ->
+ true;
+ (Result) when Result == error ->
+ false
+ end, lists:append(MDResult, [SDResult])) of
+ true ->
+ ok;
+ false ->
+ error
+ end,
+ {reply, Reply, State};
+handle_call({stop}, _From, State) ->
+ {stop, normal, ok, State};
+handle_call(Msg, _From, State) ->
+ ?INFO_MSG("Got call Msg: ~p, State: ~p", [Msg, State]),
+ {noreply, State}.
+
+handle_cast(Msg, State) ->
+ ?INFO_MSG("Got cast Msg:~p, State:~p", [Msg, State]),
+ {noreply, State}.
+
+handle_info(Info, State) ->
+ ?INFO_MSG("Got Info:~p, State:~p", [Info, State]),
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% gen_logdb callbacks
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+log_message(VHost, Msg) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {log_message, Msg}, ?CALL_TIMEOUT).
+rebuild_stats(VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {rebuild_stats}, ?CALL_TIMEOUT).
+rebuild_stats_at(VHost, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {rebuild_stats_at, Date}, ?CALL_TIMEOUT).
+delete_messages_by_user_at(VHost, Msgs, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {delete_messages_by_user_at, Msgs, Date}, ?CALL_TIMEOUT).
+delete_all_messages_by_user_at(User, VHost, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {delete_all_messages_by_user_at, User, Date}, ?CALL_TIMEOUT).
+delete_messages_at(VHost, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {delete_messages_at, Date}, ?CALL_TIMEOUT).
+get_vhost_stats(VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_vhost_stats}, ?CALL_TIMEOUT).
+get_vhost_stats_at(VHost, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_vhost_stats_at, Date}, ?CALL_TIMEOUT).
+get_user_stats(User, VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_user_stats, User}, ?CALL_TIMEOUT).
+get_user_messages_at(User, VHost, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_user_messages_at, User, Date}, ?CALL_TIMEOUT).
+get_dates(VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_dates}, ?CALL_TIMEOUT).
+get_user_settings(User, VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_user_settings, User}, ?CALL_TIMEOUT).
+get_users_settings(VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_users_settings}, ?CALL_TIMEOUT).
+set_user_settings(User, VHost, Set) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {set_user_settings, User, Set}, ?CALL_TIMEOUT).
+drop_user(User, VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {drop_user, User}, ?CALL_TIMEOUT).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% internals
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+log_message_int(VHost, #msg{timestamp=Timestamp}=Msg) ->
+ Date = mod_logdb:convert_timestamp_brief(Timestamp),
+
+ ATable = table_name(VHost, Date),
+ Fun = fun() ->
+ mnesia:write_lock_table(ATable),
+ mnesia:write(ATable, Msg, write)
+ end,
+ % log message, increment stats for both users
+ case mnesia:transaction(Fun) of
+ % if table does not exists - create it and try to log message again
+ {aborted,{no_exists, _Table}} ->
+ case create_msg_table(VHost, Date) of
+ {aborted, CReason} ->
+ ?ERROR_MSG("Failed to log message: ~p", [CReason]),
+ error;
+ {atomic, ok} ->
+ ?MYDEBUG("Created msg table for ~p at ~p", [VHost, Date]),
+ log_message_int(VHost, Msg)
+ end;
+ {aborted, TReason} ->
+ ?ERROR_MSG("Failed to log message: ~p", [TReason]),
+ error;
+ {atomic, _} ->
+ ?MYDEBUG("Logged ok for ~p, peer: ~p", [Msg#msg.owner_name++"@"++VHost,
+ Msg#msg.peer_name++"@"++Msg#msg.peer_server]),
+ increment_user_stats(Msg#msg.owner_name, VHost, Date)
+ end.
+
+increment_user_stats(Owner, VHost, Date) ->
+ Fun = fun() ->
+ Pat = #stats{user=Owner, at=Date, count='$1'},
+ mnesia:write_lock_table(stats_table(VHost)),
+ case mnesia:select(stats_table(VHost), [{Pat, [], ['$_']}]) of
+ [] ->
+ mnesia:write(stats_table(VHost),
+ #stats{user=Owner,
+ at=Date,
+ count=1},
+ write);
+ [Stats] ->
+ mnesia:delete_object(stats_table(VHost),
+ #stats{user=Owner,
+ at=Date,
+ count=Stats#stats.count},
+ write),
+ New = Stats#stats{count = Stats#stats.count+1},
+ if
+ New#stats.count > 0 -> mnesia:write(stats_table(VHost),
+ New,
+ write);
+ true -> ok
+ end
+ end
+ end,
+ case mnesia:transaction(Fun) of
+ {aborted, Reason} ->
+ ?ERROR_MSG("Failed to update stats for ~s@~s: ~p", [Owner, VHost, Reason]),
+ error;
+ {atomic, _} ->
+ ?MYDEBUG("Updated stats for ~s@~s", [Owner, VHost]),
+ ok
+ end.
+
+get_dates_int(VHost) ->
+ Tables = mnesia:system_info(tables),
+ lists:foldl(fun(ATable, Dates) ->
+ Table = atom_to_list(ATable),
+ case re:run(Table, VHost++"$") of
+ {match, _, _} ->
+ case re:run(Table,"_[0-9]+-[0-9]+-[0-9]+_") of
+ {match, S, E} ->
+ lists:append(Dates, [lists:sublist(Table,S+1,E-2)]);
+ nomatch ->
+ Dates
+ end;
+ nomatch ->
+ Dates
+ end
+ end, [], Tables).
+
+rebuild_stats_at_int(VHost, Date) ->
+ Table = table_name(VHost, Date),
+ STable = stats_table(VHost),
+ CFun = fun(Msg, Stats) ->
+ Owner = Msg#msg.owner_name,
+ case lists:keysearch(Owner, 1, Stats) of
+ {value, {_, Count}} ->
+ lists:keyreplace(Owner, 1, Stats, {Owner, Count + 1});
+ false ->
+ lists:append(Stats, [{Owner, 1}])
+ end
+ end,
+ DFun = fun(#stats{at=SDate} = Stat, _Acc)
+ when SDate == Date ->
+ mnesia:delete_object(stats_table(VHost), Stat, write);
+ (_Stat, _Acc) -> ok
+ end,
+ % TODO: Maybe unregister hooks ?
+ case mnesia:transaction(fun() ->
+ mnesia:write_lock_table(Table),
+ mnesia:write_lock_table(STable),
+ % Calc stats for VHost at Date
+ case mnesia:foldl(CFun, [], Table) of
+ [] -> empty;
+ AStats ->
+ % Delete all stats for VHost at Date
+ mnesia:foldl(DFun, [], STable),
+ % Write new calc'ed stats
+ lists:foreach(fun({Owner, Count}) ->
+ WStat = #stats{user=Owner, at=Date, count=Count},
+ mnesia:write(stats_table(VHost), WStat, write)
+ end, AStats),
+ ok
+ end
+ end) of
+ {aborted, Reason} ->
+ ?ERROR_MSG("Failed to rebuild_stats_at for ~p at ~p: ~p", [VHost, Date, Reason]),
+ error;
+ {atomic, ok} ->
+ ok;
+ {atomic, empty} ->
+ {atomic,ok} = mnesia:delete_table(Table),
+ ?MYDEBUG("Dropped table at ~p", [Date]),
+ ok
+ end.
+
+delete_nonexistent_stats(VHost) ->
+ Dates = get_dates_int(VHost),
+ mnesia:transaction(fun() ->
+ mnesia:foldl(fun(#stats{at=Date} = Stat, _Acc) ->
+ case lists:member(Date, Dates) of
+ false -> mnesia:delete_object(Stat);
+ true -> ok
+ end
+ end, ok, stats_table(VHost))
+ end).
+
+delete_stats_by_vhost_at_int(VHost, Date) ->
+ StatsDelete = fun(#stats{at=SDate} = Stat, _Acc)
+ when SDate == Date ->
+ mnesia:delete_object(stats_table(VHost), Stat, write),
+ ok;
+ (_Msg, _Acc) -> ok
+ end,
+ case mnesia:transaction(fun() ->
+ mnesia:write_lock_table(stats_table(VHost)),
+ mnesia:foldl(StatsDelete, ok, stats_table(VHost))
+ end) of
+ {aborted, Reason} ->
+ ?ERROR_MSG("Failed to update stats at ~p for ~p: ~p", [Date, VHost, Reason]),
+ rebuild_stats_at_int(VHost, Date);
+ _ ->
+ ?INFO_MSG("Updated stats at ~p for ~p", [Date, VHost]),
+ ok
+ end.
+
+get_user_stats_int(User, VHost) ->
+ case mnesia:transaction(fun() ->
+ Pat = #stats{user=User, at='$1', count='$2'},
+ mnesia:select(stats_table(VHost), [{Pat, [], [['$1', '$2']]}])
+ end) of
+ {atomic, Result} ->
+ {ok, mod_logdb:sort_stats([{Date, Count} || [Date, Count] <- Result])};
+ {aborted, Reason} ->
+ {error, Reason}
+ end.
+
+delete_all_messages_by_user_at_int(User, VHost, Date) ->
+ Table = table_name(VHost, Date),
+ MsgDelete = fun(#msg{owner_name=Owner} = Msg, _Acc)
+ when Owner == User ->
+ mnesia:delete_object(Table, Msg, write),
+ ok;
+ (_Msg, _Acc) -> ok
+ end,
+ DRez = case mnesia:transaction(fun() ->
+ mnesia:foldl(MsgDelete, ok, Table)
+ end) of
+ {aborted, Reason} ->
+ ?ERROR_MSG("Failed to delete_all_messages_by_user_at for ~p@~p at ~p: ~p", [User, VHost, Date, Reason]),
+ error;
+ _ ->
+ ok
+ end,
+ case rebuild_stats_at_int(VHost, Date) of
+ error ->
+ error;
+ ok ->
+ DRez
+ end.
+
+delete_user_settings_int(User, VHost) ->
+ STable = settings_table(VHost),
+ case mnesia:dirty_match_object(STable, #user_settings{owner_name=User, _='_'}) of
+ [] ->
+ ok;
+ [UserSettings] ->
+ mnesia:dirty_delete_object(STable, UserSettings)
+ end.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% tables internals
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+create_stats_table(VHost) ->
+ SName = stats_table(VHost),
+ case mnesia:create_table(SName,
+ [{disc_only_copies, [node()]},
+ {type, bag},
+ {attributes, record_info(fields, stats)},
+ {record_name, stats}
+ ]) of
+ {atomic, ok} ->
+ ?MYDEBUG("Created stats table for ~p", [VHost]),
+ lists:foreach(fun(Date) ->
+ rebuild_stats_at_int(VHost, Date)
+ end, get_dates_int(VHost)),
+ ok;
+ {aborted, {already_exists, _}} ->
+ ?MYDEBUG("Stats table for ~p already exists", [VHost]),
+ ok;
+ {aborted, Reason} ->
+ ?ERROR_MSG("Failed to create stats table: ~p", [Reason]),
+ error
+ end.
+
+create_settings_table(VHost) ->
+ SName = settings_table(VHost),
+ case mnesia:create_table(SName,
+ [{disc_copies, [node()]},
+ {type, set},
+ {attributes, record_info(fields, user_settings)},
+ {record_name, user_settings}
+ ]) of
+ {atomic, ok} ->
+ ?MYDEBUG("Created settings table for ~p", [VHost]),
+ ok;
+ {aborted, {already_exists, _}} ->
+ ?MYDEBUG("Settings table for ~p already exists", [VHost]),
+ ok;
+ {aborted, Reason} ->
+ ?ERROR_MSG("Failed to create settings table: ~p", [Reason]),
+ error
+ end.
+
+create_msg_table(VHost, Date) ->
+ mnesia:create_table(
+ table_name(VHost, Date),
+ [{disc_only_copies, [node()]},
+ {type, bag},
+ {attributes, record_info(fields, msg)},
+ {record_name, msg}]).
--- mod_logdb_mysql.erl.orig 2010-05-12 17:21:38.000000000 +0300
+++ mod_logdb_mysql.erl 2009-07-30 09:00:14.000000000 +0300
@@ -0,0 +1,1053 @@
+%%%----------------------------------------------------------------------
+%%% File : mod_logdb_mysql.erl
+%%% Author : Oleg Palij (mailto,xmpp:[email protected])
+%%% Purpose : MySQL backend for mod_logdb
+%%% Version : trunk
+%%% Id : $Id: mod_logdb_mysql.erl 1360 2009-07-30 06:00:14Z malik $
+%%% Url : http://www.dp.uz.gov.ua/o.palij/mod_logdb/
+%%%----------------------------------------------------------------------
+
+-module(mod_logdb_mysql).
+-author('[email protected]').
+
+-include("mod_logdb.hrl").
+-include("ejabberd.hrl").
+-include("jlib.hrl").
+
+-behaviour(gen_logdb).
+-behaviour(gen_server).
+
+% gen_server
+-export([code_change/3,handle_call/3,handle_cast/2,handle_info/2,init/1,terminate/2]).
+% gen_mod
+-export([start/2, stop/1]).
+% gen_logdb
+-export([log_message/2,
+ rebuild_stats/1,
+ rebuild_stats_at/2,
+ delete_messages_by_user_at/3, delete_all_messages_by_user_at/3, delete_messages_at/2,
+ get_vhost_stats/1, get_vhost_stats_at/2, get_user_stats/2, get_user_messages_at/3,
+ get_dates/1,
+ get_users_settings/1, get_user_settings/2, set_user_settings/3,
+ drop_user/2]).
+
+% gen_server call timeout
+-define(CALL_TIMEOUT, 30000).
+-define(MYSQL_TIMEOUT, 60000).
+-define(INDEX_SIZE, integer_to_list(170)).
+-define(PROCNAME, mod_logdb_mysql).
+
+-import(mod_logdb, [list_to_bool/1, bool_to_list/1,
+ list_to_string/1, string_to_list/1,
+ convert_timestamp_brief/1]).
+
+-record(state, {dbref, vhost, server, port, db, user, password}).
+
+% replace "." with "_"
+escape_vhost(VHost) -> lists:map(fun(46) -> 95;
+ (A) -> A
+ end, VHost).
+prefix() ->
+ "`logdb_".
+
+suffix(VHost) ->
+ "_" ++ escape_vhost(VHost) ++ "`".
+
+messages_table(VHost, Date) ->
+ prefix() ++ "messages_" ++ Date ++ suffix(VHost).
+
+stats_table(VHost) ->
+ prefix() ++ "stats" ++ suffix(VHost).
+
+temp_table(VHost) ->
+ prefix() ++ "temp" ++ suffix(VHost).
+
+settings_table(VHost) ->
+ prefix() ++ "settings" ++ suffix(VHost).
+
+users_table(VHost) ->
+ prefix() ++ "users" ++ suffix(VHost).
+servers_table(VHost) ->
+ prefix() ++ "servers" ++ suffix(VHost).
+resources_table(VHost) ->
+ prefix() ++ "resources" ++ suffix(VHost).
+
+ets_users_table(VHost) -> list_to_atom("logdb_users_" ++ VHost).
+ets_servers_table(VHost) -> list_to_atom("logdb_servers_" ++ VHost).
+ets_resources_table(VHost) -> list_to_atom("logdb_resources_" ++ VHost).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% gen_mod callbacks
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+start(VHost, Opts) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:start({local, Proc}, ?MODULE, [VHost, Opts], []).
+
+stop(VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {stop}, ?CALL_TIMEOUT).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% gen_server callbacks
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+init([VHost, Opts]) ->
+ crypto:start(),
+
+ Server = gen_mod:get_opt(server, Opts, "localhost"),
+ Port = gen_mod:get_opt(port, Opts, 3306),
+ DB = gen_mod:get_opt(db, Opts, "logdb"),
+ User = gen_mod:get_opt(user, Opts, "root"),
+ Password = gen_mod:get_opt(password, Opts, ""),
+
+ St = #state{vhost=VHost,
+ server=Server, port=Port, db=DB,
+ user=User, password=Password},
+
+ case open_mysql_connection(St) of
+ {ok, DBRef} ->
+ State = St#state{dbref=DBRef},
+ ok = create_stats_table(State),
+ ok = create_settings_table(State),
+ ok = create_users_table(State),
+ % clear ets cache every ...
+ timer:send_interval(timer:hours(12), clear_ets_tables),
+ ok = create_servers_table(State),
+ ok = create_resources_table(State),
+ erlang:monitor(process, DBRef),
+ {ok, State};
+ {error, Reason} ->
+ ?ERROR_MSG("MySQL connection failed: ~p~n", [Reason]),
+ {stop, db_connection_failed}
+ end.
+
+open_mysql_connection(#state{server=Server, port=Port, db=DB,
+ user=DBUser, password=Password} = _State) ->
+ LogFun = fun(debug, _Format, _Argument) ->
+ %?MYDEBUG(Format, Argument);
+ ok;
+ (error, Format, Argument) ->
+ ?ERROR_MSG(Format, Argument);
+ (Level, Format, Argument) ->
+ ?MYDEBUG("MySQL (~p)~n", [Level]),
+ ?MYDEBUG(Format, Argument)
+ end,
+ ?INFO_MSG("Opening mysql connection ~s@~s:~p/~s", [DBUser, Server, Port, DB]),
+ mysql_conn:start(Server, Port, DBUser, Password, DB, LogFun).
+
+close_mysql_connection(DBRef) ->
+ ?MYDEBUG("Closing ~p mysql connection", [DBRef]),
+ mysql_conn:stop(DBRef).
+
+handle_call({log_message, Msg}, _From, #state{dbref=DBRef, vhost=VHost}=State) ->
+ Date = convert_timestamp_brief(Msg#msg.timestamp),
+
+ Table = messages_table(VHost, Date),
+ Owner_id = get_user_id(DBRef, VHost, Msg#msg.owner_name),
+ Peer_name_id = get_user_id(DBRef, VHost, Msg#msg.peer_name),
+ Peer_server_id = get_server_id(DBRef, VHost, Msg#msg.peer_server),
+ Peer_resource_id = get_resource_id(DBRef, VHost, Msg#msg.peer_resource),
+
+ Query = ["INSERT INTO ",Table," ",
+ "(owner_id,",
+ "peer_name_id,",
+ "peer_server_id,",
+ "peer_resource_id,",
+ "direction,",
+ "type,",
+ "subject,",
+ "body,",
+ "timestamp) ",
+ "VALUES ",
+ "('", Owner_id, "',",
+ "'", Peer_name_id, "',",
+ "'", Peer_server_id, "',",
+ "'", Peer_resource_id, "',",
+ "'", atom_to_list(Msg#msg.direction), "',",
+ "'", Msg#msg.type, "',",
+ "'", ejabberd_odbc:escape(Msg#msg.subject), "',",
+ "'", ejabberd_odbc:escape(Msg#msg.body), "',",
+ "'", Msg#msg.timestamp, "');"],
+
+ Reply =
+ case sql_query_internal_silent(DBRef, Query) of
+ {updated, _} ->
+ ?MYDEBUG("Logged ok for ~p, peer: ~p", [Msg#msg.owner_name++"@"++VHost,
+ Msg#msg.peer_name++"@"++Msg#msg.peer_server]),
+ increment_user_stats(DBRef, Msg#msg.owner_name, Owner_id, VHost, Peer_name_id, Peer_server_id, Date);
+ {error, Reason} ->
+ case re:run(Reason, "#42S02") of
+ % Table doesn't exist
+ {match, _, _} ->
+ case create_msg_table(DBRef, VHost, Date) of
+ error ->
+ error;
+ ok ->
+ {updated, _} = sql_query_internal(DBRef, Query),
+ increment_user_stats(DBRef, Msg#msg.owner_name, Owner_id, VHost, Peer_name_id, Peer_server_id, Date)
+ end;
+ _ ->
+ ?ERROR_MSG("Failed to log message: ~p", [Reason]),
+ error
+ end
+ end,
+ {reply, Reply, State};
+handle_call({rebuild_stats_at, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) ->
+ Reply = rebuild_stats_at_int(DBRef, VHost, Date),
+ {reply, Reply, State};
+handle_call({delete_messages_by_user_at, [], _Date}, _From, State) ->
+ {reply, error, State};
+handle_call({delete_messages_by_user_at, Msgs, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) ->
+ Temp = lists:flatmap(fun(#msg{timestamp=Timestamp} = _Msg) ->
+ ["\"",Timestamp,"\"",","]
+ end, Msgs),
+
+ Temp1 = lists:append([lists:sublist(Temp, length(Temp)-1), ");"]),
+
+ Query = ["DELETE FROM ",messages_table(VHost, Date)," ",
+ "WHERE timestamp IN (", Temp1],
+
+ Reply =
+ case sql_query_internal(DBRef, Query) of
+ {updated, Aff} ->
+ ?MYDEBUG("Aff=~p", [Aff]),
+ rebuild_stats_at_int(DBRef, VHost, Date);
+ {error, _} ->
+ error
+ end,
+ {reply, Reply, State};
+handle_call({delete_all_messages_by_user_at, User, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) ->
+ ok = delete_all_messages_by_user_at_int(DBRef, User, VHost, Date),
+ ok = delete_stats_by_user_at_int(DBRef, User, VHost, Date),
+ {reply, ok, State};
+handle_call({delete_messages_at, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) ->
+ Reply =
+ case sql_query_internal(DBRef, ["DROP TABLE ",messages_table(VHost, Date),";"]) of
+ {updated, _} ->
+ Query = ["DELETE FROM ",stats_table(VHost)," "
+ "WHERE at=\"",Date,"\";"],
+ case sql_query_internal(DBRef, Query) of
+ {updated, _} ->
+ ok;
+ {error, _} ->
+ error
+ end;
+ {error, _} ->
+ error
+ end,
+ {reply, Reply, State};
+handle_call({get_vhost_stats}, _From, #state{dbref=DBRef, vhost=VHost}=State) ->
+ SName = stats_table(VHost),
+ Query = ["SELECT at, sum(count) ",
+ "FROM ",SName," ",
+ "GROUP BY at ",
+ "ORDER BY DATE(at) DESC;"
+ ],
+ Reply =
+ case sql_query_internal(DBRef, Query) of
+ {data, Result} ->
+ {ok, [ {Date, list_to_integer(Count)} || [Date, Count] <- Result ]};
+ {error, Reason} ->
+ % TODO: Duplicate error message ?
+ {error, Reason}
+ end,
+ {reply, Reply, State};
+handle_call({get_vhost_stats_at, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) ->
+ SName = stats_table(VHost),
+ Query = ["SELECT username, sum(count) AS allcount ",
+ "FROM ",SName," ",
+ "JOIN ",users_table(VHost)," ON owner_id=user_id "
+ "WHERE at=\"",Date,"\" "
+ "GROUP BY username ",
+ "ORDER BY allcount DESC;"
+ ],
+ Reply =
+ case sql_query_internal(DBRef, Query) of
+ {data, Result} ->
+ {ok, lists:reverse(
+ lists:keysort(2,
+ [ {User, list_to_integer(Count)} || [User, Count] <- Result]))};
+ {error, Reason} ->
+ % TODO:
+ {error, Reason}
+ end,
+ {reply, Reply, State};
+handle_call({get_user_stats, User}, _From, #state{dbref=DBRef, vhost=VHost}=State) ->
+ {reply, get_user_stats_int(DBRef, User, VHost), State};
+handle_call({get_user_messages_at, User, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) ->
+ TName = messages_table(VHost, Date),
+ UName = users_table(VHost),
+ SName = servers_table(VHost),
+ RName = resources_table(VHost),
+ Query = ["SELECT users.username,",
+ "servers.server,",
+ "resources.resource,",
+ "messages.direction,"
+ "messages.type,"
+ "messages.subject,"
+ "messages.body,"
+ "messages.timestamp "
+ "FROM ",TName," AS messages "
+ "JOIN ",UName," AS users ON peer_name_id=user_id ",
+ "JOIN ",SName," AS servers ON peer_server_id=server_id ",
+ "JOIN ",RName," AS resources ON peer_resource_id=resource_id ",
+ "WHERE owner_id=\"",get_user_id(DBRef, VHost, User),"\" ",
+ "ORDER BY timestamp ASC;"],
+ Reply =
+ case sql_query_internal(DBRef, Query) of
+ {data, Result} ->
+ Fun = fun([Peer_name, Peer_server, Peer_resource,
+ Direction,
+ Type,
+ Subject, Body,
+ Timestamp]) ->
+ #msg{peer_name=Peer_name, peer_server=Peer_server, peer_resource=Peer_resource,
+ direction=list_to_atom(Direction),
+ type=Type,
+ subject=Subject, body=Body,
+ timestamp=Timestamp}
+ end,
+ {ok, lists:map(Fun, Result)};
+ {error, Reason} ->
+ {error, Reason}
+ end,
+ {reply, Reply, State};
+handle_call({get_dates}, _From, #state{dbref=DBRef, vhost=VHost}=State) ->
+ SName = stats_table(VHost),
+ Query = ["SELECT at ",
+ "FROM ",SName," ",
+ "GROUP BY at ",
+ "ORDER BY DATE(at) DESC;"
+ ],
+ Reply =
+ case sql_query_internal(DBRef, Query) of
+ {data, Result} ->
+ [ Date || [Date] <- Result ];
+ {error, Reason} ->
+ {error, Reason}
+ end,
+ {reply, Reply, State};
+handle_call({get_users_settings}, _From, #state{dbref=DBRef, vhost=VHost}=State) ->
+ Query = ["SELECT username,dolog_default,dolog_list,donotlog_list ",
+ "FROM ",settings_table(VHost)," ",
+ "JOIN ",users_table(VHost)," ON user_id=owner_id;"],
+ Reply =
+ case sql_query_internal(DBRef, Query) of
+ {data, Result} ->
+ {ok, lists:map(fun([Owner, DoLogDef, DoLogL, DoNotLogL]) ->
+ #user_settings{owner_name=Owner,
+ dolog_default=list_to_bool(DoLogDef),
+ dolog_list=string_to_list(DoLogL),
+ donotlog_list=string_to_list(DoNotLogL)
+ }
+ end, Result)};
+ {error, _} ->
+ error
+ end,
+ {reply, Reply, State};
+handle_call({get_user_settings, User}, _From, #state{dbref=DBRef, vhost=VHost}=State) ->
+ Query = ["SELECT dolog_default,dolog_list,donotlog_list FROM ",settings_table(VHost)," ",
+ "WHERE owner_id=\"",get_user_id(DBRef, VHost, User),"\";"],
+ Reply =
+ case sql_query_internal(DBRef, Query) of
+ {data, []} ->
+ {ok, []};
+ {data, [[Owner, DoLogDef, DoLogL, DoNotLogL]]} ->
+ {ok, #user_settings{owner_name=Owner,
+ dolog_default=list_to_bool(DoLogDef),
+ dolog_list=string_to_list(DoLogL),
+ donotlog_list=string_to_list(DoNotLogL)}};
+ {error, _} ->
+ error
+ end,
+ {reply, Reply, State};
+handle_call({set_user_settings, User, #user_settings{dolog_default=DoLogDef,
+ dolog_list=DoLogL,
+ donotlog_list=DoNotLogL}},
+ _From, #state{dbref=DBRef, vhost=VHost} = State) ->
+ User_id = get_user_id(DBRef, VHost, User),
+
+ Query = ["UPDATE ",settings_table(VHost)," ",
+ "SET dolog_default=",bool_to_list(DoLogDef),", ",
+ "dolog_list='",list_to_string(DoLogL),"', ",
+ "donotlog_list='",list_to_string(DoNotLogL),"' ",
+ "WHERE owner_id=\"",User_id,"\";"],
+
+ Reply =
+ case sql_query_internal(DBRef, Query) of
+ {updated, 0} ->
+ IQuery = ["INSERT INTO ",settings_table(VHost)," ",
+ "(owner_id, dolog_default, dolog_list, donotlog_list) ",
+ "VALUES ",
+ "('",User_id,"', ",bool_to_list(DoLogDef),",'",list_to_string(DoLogL),"','",list_to_string(DoNotLogL),"');"],
+ case sql_query_internal_silent(DBRef, IQuery) of
+ {updated, _} ->
+ ?MYDEBUG("New settings for ~s@~s", [User, VHost]),
+ ok;
+ {error, Reason} ->
+ case re:run(Reason, "#23000") of
+ % Already exists
+ {match, _, _} ->
+ ok;
+ _ ->
+ ?ERROR_MSG("Failed setup user ~p@~p: ~p", [User, VHost, Reason]),
+ error
+ end
+ end;
+ {updated, 1} ->
+ ?MYDEBUG("Updated settings for ~s@~s", [User, VHost]),
+ ok;
+ {error, _} ->
+ error
+ end,
+ {reply, Reply, State};
+handle_call({stop}, _From, #state{vhost=VHost}=State) ->
+ ets:delete(ets_users_table(VHost)),
+ ets:delete(ets_servers_table(VHost)),
+ ?MYDEBUG("Stoping mysql backend for ~p", [VHost]),
+ {stop, normal, ok, State};
+handle_call(Msg, _From, State) ->
+ ?INFO_MSG("Got call Msg: ~p, State: ~p", [Msg, State]),
+ {noreply, State}.
+
+handle_cast({rebuild_stats}, State) ->
+ rebuild_all_stats_int(State),
+ {noreply, State};
+handle_cast({drop_user, User}, #state{vhost=VHost} = State) ->
+ Fun = fun() ->
+ {ok, DBRef} = open_mysql_connection(State),
+ {ok, Dates} = get_user_stats_int(DBRef, User, VHost),
+ MDResult = lists:map(fun({Date, _}) ->
+ delete_all_messages_by_user_at_int(DBRef, User, VHost, Date)
+ end, Dates),
+ StDResult = delete_all_stats_by_user_int(DBRef, User, VHost),
+ SDResult = delete_user_settings_int(DBRef, User, VHost),
+ case lists:all(fun(Result) when Result == ok ->
+ true;
+ (Result) when Result == error ->
+ false
+ end, lists:append([MDResult, [StDResult], [SDResult]])) of
+ true ->
+ ?INFO_MSG("Removed ~s@~s", [User, VHost]);
+ false ->
+ ?ERROR_MSG("Failed to remove ~s@~s", [User, VHost])
+ end,
+ close_mysql_connection(DBRef)
+ end,
+ spawn(Fun),
+ {noreply, State};
+handle_cast(Msg, State) ->
+ ?INFO_MSG("Got cast Msg:~p, State:~p", [Msg, State]),
+ {noreply, State}.
+
+handle_info(clear_ets_tables, State) ->
+ ets:delete_all_objects(ets_users_table(State#state.vhost)),
+ ets:delete_all_objects(ets_resources_table(State#state.vhost)),
+ {noreply, State};
+handle_info({'DOWN', _MonitorRef, process, _Pid, _Info}, State) ->
+ {stop, connection_dropped, State};
+handle_info(Info, State) ->
+ ?INFO_MSG("Got Info:~p, State:~p", [Info, State]),
+ {noreply, State}.
+
+terminate(_Reason, #state{dbref=DBRef}=_State) ->
+ close_mysql_connection(DBRef),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% gen_logdb callbacks
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+log_message(VHost, Msg) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {log_message, Msg}, ?CALL_TIMEOUT).
+rebuild_stats(VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:cast(Proc, {rebuild_stats}).
+rebuild_stats_at(VHost, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {rebuild_stats_at, Date}, ?CALL_TIMEOUT).
+delete_messages_by_user_at(VHost, Msgs, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {delete_messages_by_user_at, Msgs, Date}, ?CALL_TIMEOUT).
+delete_all_messages_by_user_at(User, VHost, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {delete_all_messages_by_user_at, User, Date}, ?CALL_TIMEOUT).
+delete_messages_at(VHost, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {delete_messages_at, Date}, ?CALL_TIMEOUT).
+get_vhost_stats(VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_vhost_stats}, ?CALL_TIMEOUT).
+get_vhost_stats_at(VHost, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_vhost_stats_at, Date}, ?CALL_TIMEOUT).
+get_user_stats(User, VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_user_stats, User}, ?CALL_TIMEOUT).
+get_user_messages_at(User, VHost, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_user_messages_at, User, Date}, ?CALL_TIMEOUT).
+get_dates(VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_dates}, ?CALL_TIMEOUT).
+get_users_settings(VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_users_settings}, ?CALL_TIMEOUT).
+get_user_settings(User, VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_user_settings, User}, ?CALL_TIMEOUT).
+set_user_settings(User, VHost, Set) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {set_user_settings, User, Set}, ?CALL_TIMEOUT).
+drop_user(User, VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:cast(Proc, {drop_user, User}).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% internals
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+increment_user_stats(DBRef, User_name, User_id, VHost, PNameID, PServerID, Date) ->
+ SName = stats_table(VHost),
+ UQuery = ["UPDATE ",SName," ",
+ "SET count=count+1 ",
+ "WHERE owner_id=\"",User_id,"\" AND peer_name_id=\"",PNameID,"\" AND peer_server_id=\"",PServerID,"\" AND at=\"",Date,"\";"],
+
+ case sql_query_internal(DBRef, UQuery) of
+ {updated, 0} ->
+ IQuery = ["INSERT INTO ",SName," ",
+ "(owner_id, peer_name_id, peer_server_id, at, count) ",
+ "VALUES ",
+ "('",User_id,"', '",PNameID,"', '",PServerID,"', '",Date,"', '1');"],
+ case sql_query_internal(DBRef, IQuery) of
+ {updated, _} ->
+ ?MYDEBUG("New stats for ~s@~s at ~s", [User_name, VHost, Date]),
+ ok;
+ {error, _} ->
+ error
+ end;
+ {updated, _} ->
+ ?MYDEBUG("Updated stats for ~s@~s at ~s", [User_name, VHost, Date]),
+ ok;
+ {error, _} ->
+ error
+ end.
+
+get_dates_int(DBRef, VHost) ->
+ case sql_query_internal(DBRef, ["SHOW TABLES"]) of
+ {data, Tables} ->
+ lists:foldl(fun([Table], Dates) ->
+ Reg = lists:sublist(prefix(),2,length(prefix())) ++ ".*" ++ escape_vhost(VHost),
+ case re:run(Table, Reg) of
+ {match, 1, _} ->
+ ?MYDEBUG("matched ~p against ~p", [Table, Reg]),
+ case re:run(Table,"[0-9]+-[0-9]+-[0-9]+") of
+ {match, S, E} ->
+ lists:append(Dates, [lists:sublist(Table,S,E)]);
+ nomatch ->
+ Dates
+ end;
+
+ _ ->
+ Dates
+ end
+ end, [], Tables);
+ {error, _} ->
+ []
+ end.
+
+rebuild_all_stats_int(#state{vhost=VHost}=State) ->
+ Fun = fun() ->
+ {ok, DBRef} = open_mysql_connection(State),
+ ok = delete_nonexistent_stats(DBRef, VHost),
+ case lists:filter(fun(Date) ->
+ case catch rebuild_stats_at_int(DBRef, VHost, Date) of
+ ok -> false;
+ error -> true;
+ {'EXIT', _} -> true
+ end
+ end, get_dates_int(DBRef, VHost)) of
+ [] -> ok;
+ FTables ->
+ ?ERROR_MSG("Failed to rebuild stats for ~p dates", [FTables]),
+ error
+ end,
+ close_mysql_connection(DBRef)
+ end,
+ spawn(Fun).
+
+rebuild_stats_at_int(DBRef, VHost, Date) ->
+ TempTable = temp_table(VHost),
+ Fun = fun() ->
+ Table = messages_table(VHost, Date),
+ STable = stats_table(VHost),
+
+ DQuery = [ "DELETE FROM ",STable," ",
+ "WHERE at='",Date,"';"],
+
+ ok = create_temp_table(DBRef, TempTable),
+ {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",Table," WRITE, ",TempTable," WRITE;"]),
+ SQuery = ["INSERT INTO ",TempTable," ",
+ "(owner_id,peer_name_id,peer_server_id,at,count) ",
+ "SELECT owner_id,peer_name_id,peer_server_id,\"",Date,"\",count(*) ",
+ "FROM ",Table," GROUP BY owner_id,peer_name_id,peer_server_id;"],
+ case sql_query_internal(DBRef, SQuery) of
+ {updated, 0} ->
+ Count = sql_query_internal(DBRef, ["SELECT count(*) FROM ",Table,";"]),
+ case Count of
+ {data, [["0"]]} ->
+ {updated, _} = sql_query_internal(DBRef, ["DROP TABLE ",Table,";"]),
+ {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",STable," WRITE;"]),
+ {updated, _} = sql_query_internal(DBRef, DQuery),
+ ok;
+ _ ->
+ ?ERROR_MSG("Failed to calculate stats for ~s table! Count was ~p.", [Date, Count]),
+ error
+ end;
+ {updated, _} ->
+ {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",STable," WRITE, ",TempTable," WRITE;"]),
+ {updated, _} = sql_query_internal(DBRef, DQuery),
+ SQuery1 = ["INSERT INTO ",STable," ",
+ "(owner_id,peer_name_id,peer_server_id,at,count) ",
+ "SELECT owner_id,peer_name_id,peer_server_id,at,count ",
+ "FROM ",TempTable,";"],
+ case sql_query_internal(DBRef, SQuery1) of
+ {updated, _} -> ok;
+ {error, _} -> error
+ end;
+ {error, _} -> error
+ end
+ end,
+
+ case catch apply(Fun, []) of
+ ok ->
+ ?INFO_MSG("Rebuilded stats for ~p at ~p", [VHost, Date]),
+ ok;
+ error ->
+ error;
+ {'EXIT', Reason} ->
+ ?ERROR_MSG("Failed to rebuild stats for ~s table: ~p.", [Date, Reason]),
+ error
+ end,
+ sql_query_internal(DBRef, ["UNLOCK TABLES;"]),
+ sql_query_internal(DBRef, ["DROP TABLE ",TempTable,";"]),
+ ok.
+
+
+delete_nonexistent_stats(DBRef, VHost) ->
+ Dates = get_dates_int(DBRef, VHost),
+ STable = stats_table(VHost),
+
+ Temp = lists:flatmap(fun(Date) ->
+ ["\"",Date,"\"",","]
+ end, Dates),
+
+ case Temp of
+ [] ->
+ ok;
+ _ ->
+ % replace last "," with ");"
+ Temp1 = lists:append([lists:sublist(Temp, length(Temp)-1), ");"]),
+ Query = ["DELETE FROM ",STable," ",
+ "WHERE at NOT IN (", Temp1],
+ case sql_query_internal(DBRef, Query) of
+ {updated, _} ->
+ ok;
+ {error, _} ->
+ error
+ end
+ end.
+
+get_user_stats_int(DBRef, User, VHost) ->
+ SName = stats_table(VHost),
+ Query = ["SELECT at, sum(count) as allcount ",
+ "FROM ",SName," ",
+ "WHERE owner_id=\"",get_user_id(DBRef, VHost, User),"\" ",
+ "GROUP BY at "
+ "ORDER BY DATE(at) DESC;"
+ ],
+ case sql_query_internal(DBRef, Query) of
+ {data, Result} ->
+ {ok, [ {Date, list_to_integer(Count)} || [Date, Count] <- Result]};
+ {error, Result} ->
+ {error, Result}
+ end.
+
+delete_all_messages_by_user_at_int(DBRef, User, VHost, Date) ->
+ DQuery = ["DELETE FROM ",messages_table(VHost, Date)," ",
+ "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\");"],
+ case sql_query_internal(DBRef, DQuery) of
+ {updated, _} ->
+ ?INFO_MSG("Dropped messages for ~s@~s at ~s", [User, VHost, Date]),
+ ok;
+ {error, _} ->
+ error
+ end.
+
+delete_all_stats_by_user_int(DBRef, User, VHost) ->
+ SQuery = ["DELETE FROM ",stats_table(VHost)," ",
+ "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\");"],
+ case sql_query_internal(DBRef, SQuery) of
+ {updated, _} ->
+ ?INFO_MSG("Dropped all stats for ~s@~s", [User, VHost]),
+ ok;
+ {error, _} -> error
+ end.
+
+delete_stats_by_user_at_int(DBRef, User, VHost, Date) ->
+ SQuery = ["DELETE FROM ",stats_table(VHost)," ",
+ "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\") ",
+ "AND at=\"",Date,"\";"],
+ case sql_query_internal(DBRef, SQuery) of
+ {updated, _} ->
+ ?INFO_MSG("Dropped stats for ~s@~s at ~s", [User, VHost, Date]),
+ ok;
+ {error, _} -> error
+ end.
+
+delete_user_settings_int(DBRef, User, VHost) ->
+ Query = ["DELETE FROM ",settings_table(VHost)," ",
+ "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\");"],
+ case sql_query_internal(DBRef, Query) of
+ {updated, _} ->
+ ?INFO_MSG("Dropped ~s@~s settings", [User, VHost]),
+ ok;
+ {error, Reason} ->
+ ?ERROR_MSG("Failed to drop ~s@~s settings: ~p", [User, VHost, Reason]),
+ error
+ end.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% tables internals
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+create_temp_table(DBRef, Name) ->
+ Query = ["CREATE TABLE IF NOT EXISTS ",Name," (",
+ "owner_id MEDIUMINT UNSIGNED, ",
+ "peer_name_id MEDIUMINT UNSIGNED, ",
+ "peer_server_id MEDIUMINT UNSIGNED, ",
+ "at VARCHAR(11), ",
+ "count INT(11) ",
+ ") ENGINE=MyISAM CHARACTER SET utf8;"
+ ],
+ case sql_query_internal(DBRef, Query) of
+ {updated, _} -> ok;
+ {error, _Reason} -> error
+ end.
+
+create_stats_table(#state{dbref=DBRef, vhost=VHost}=State) ->
+ SName = stats_table(VHost),
+ Query = ["CREATE TABLE IF NOT EXISTS ",SName," (",
+ "owner_id MEDIUMINT UNSIGNED, ",
+ "peer_name_id MEDIUMINT UNSIGNED, ",
+ "peer_server_id MEDIUMINT UNSIGNED, ",
+ "at varchar(20), ",
+ "count int(11), ",
+ "INDEX(owner_id, peer_name_id, peer_server_id), ",
+ "INDEX(at)"
+ ") ENGINE=InnoDB CHARACTER SET utf8;"
+ ],
+ case sql_query_internal_silent(DBRef, Query) of
+ {updated, _} ->
+ ?INFO_MSG("Created stats table for ~p", [VHost]),
+ rebuild_all_stats_int(State),
+ ok;
+ {error, Reason} ->
+ case re:run(Reason, "#42S01") of
+ {match, _, _} ->
+ ?MYDEBUG("Stats table for ~p already exists", [VHost]),
+ CheckQuery = ["SHOW COLUMNS FROM ",SName," LIKE 'peer_%_id';"],
+ case sql_query_internal(DBRef, CheckQuery) of
+ {data, Elems} when length(Elems) == 2 ->
+ ?MYDEBUG("Stats table structure is ok", []),
+ ok;
+ _ ->
+ ?INFO_MSG("It seems like stats table structure is invalid. I will drop it and recreate", []),
+ case sql_query_internal(DBRef, ["DROP TABLE ",SName,";"]) of
+ {updated, _} ->
+ ?INFO_MSG("Successfully dropped ~p", [SName]);
+ _ ->
+ ?ERROR_MSG("Failed to drop ~p. You should drop it and restart module", [SName])
+ end,
+ error
+ end;
+ _ ->
+ ?ERROR_MSG("Failed to create stats table for ~p: ~p", [VHost, Reason]),
+ error
+ end
+ end.
+
+create_settings_table(#state{dbref=DBRef, vhost=VHost}) ->
+ SName = settings_table(VHost),
+ Query = ["CREATE TABLE IF NOT EXISTS ",SName," (",
+ "owner_id MEDIUMINT UNSIGNED PRIMARY KEY, ",
+ "dolog_default TINYINT(1) NOT NULL DEFAULT 1, ",
+ "dolog_list TEXT, ",
+ "donotlog_list TEXT ",
+ ") ENGINE=InnoDB CHARACTER SET utf8;"
+ ],
+ case sql_query_internal(DBRef, Query) of
+ {updated, _} ->
+ ?MYDEBUG("Created settings table for ~p", [VHost]),
+ ok;
+ {error, _} ->
+ error
+ end.
+
+create_users_table(#state{dbref=DBRef, vhost=VHost}) ->
+ SName = users_table(VHost),
+ Query = ["CREATE TABLE IF NOT EXISTS ",SName," (",
+ "username TEXT NOT NULL, ",
+ "user_id MEDIUMINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE, ",
+ "UNIQUE INDEX(username(",?INDEX_SIZE,")) ",
+ ") ENGINE=InnoDB CHARACTER SET utf8;"
+ ],
+ case sql_query_internal(DBRef, Query) of
+ {updated, _} ->
+ ?MYDEBUG("Created users table for ~p", [VHost]),
+ ets:new(ets_users_table(VHost), [named_table, set, public]),
+ %update_users_from_db(DBRef, VHost),
+ ok;
+ {error, _} ->
+ error
+ end.
+
+create_servers_table(#state{dbref=DBRef, vhost=VHost}) ->
+ SName = servers_table(VHost),
+ Query = ["CREATE TABLE IF NOT EXISTS ",SName," (",
+ "server TEXT NOT NULL, ",
+ "server_id MEDIUMINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE, ",
+ "UNIQUE INDEX(server(",?INDEX_SIZE,")) ",
+ ") ENGINE=InnoDB CHARACTER SET utf8;"
+ ],
+ case sql_query_internal(DBRef, Query) of
+ {updated, _} ->
+ ?MYDEBUG("Created servers table for ~p", [VHost]),
+ ets:new(ets_servers_table(VHost), [named_table, set, public]),
+ update_servers_from_db(DBRef, VHost),
+ ok;
+ {error, _} ->
+ error
+ end.
+
+create_resources_table(#state{dbref=DBRef, vhost=VHost}) ->
+ RName = resources_table(VHost),
+ Query = ["CREATE TABLE IF NOT EXISTS ",RName," (",
+ "resource TEXT NOT NULL, ",
+ "resource_id MEDIUMINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE, ",
+ "UNIQUE INDEX(resource(",?INDEX_SIZE,")) ",
+ ") ENGINE=InnoDB CHARACTER SET utf8;"
+ ],
+ case sql_query_internal(DBRef, Query) of
+ {updated, _} ->
+ ?MYDEBUG("Created resources table for ~p", [VHost]),
+ ets:new(ets_resources_table(VHost), [named_table, set, public]),
+ ok;
+ {error, _} ->
+ error
+ end.
+
+create_msg_table(DBRef, VHost, Date) ->
+ TName = messages_table(VHost, Date),
+ Query = ["CREATE TABLE IF NOT EXISTS ",TName," (",
+ "owner_id MEDIUMINT UNSIGNED, ",
+ "peer_name_id MEDIUMINT UNSIGNED, ",
+ "peer_server_id MEDIUMINT UNSIGNED, ",
+ "peer_resource_id MEDIUMINT(8) UNSIGNED, ",
+ "direction ENUM('to', 'from'), ",
+ "type ENUM('chat','error','groupchat','headline','normal') NOT NULL, ",
+ "subject TEXT, ",
+ "body TEXT, ",
+ "timestamp DOUBLE, ",
+ "INDEX search_i (owner_id, peer_name_id, peer_server_id, peer_resource_id), ",
+ "FULLTEXT (body) "
+ ") ENGINE=MyISAM CHARACTER SET utf8;"
+ ],
+ case sql_query_internal(DBRef, Query) of
+ {updated, _MySQLRes} ->
+ ?MYDEBUG("Created msg table for ~p at ~p", [VHost, Date]),
+ ok;
+ {error, _} ->
+ error
+ end.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% internal ets cache (users, servers, resources)
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+update_servers_from_db(DBRef, VHost) ->
+ ?INFO_MSG("Reading servers from db for ~p", [VHost]),
+ SQuery = ["SELECT server, server_id FROM ",servers_table(VHost),";"],
+ {data, Result} = sql_query_internal(DBRef, SQuery),
+ true = ets:delete_all_objects(ets_servers_table(VHost)),
+ true = ets:insert(ets_servers_table(VHost), [ {Server, Server_id} || [Server, Server_id] <- Result]).
+
+%update_users_from_db(DBRef, VHost) ->
+% ?INFO_MSG("Reading users from db for ~p", [VHost]),
+% SQuery = ["SELECT username, user_id FROM ",users_table(VHost),";"],
+% {data, Result} = sql_query_internal(DBRef, SQuery),
+% true = ets:delete_all_objects(ets_users_table(VHost)),
+% true = ets:insert(ets_users_table(VHost), [ {Username, User_id} || [Username, User_id] <- Result]).
+
+%get_user_name(DBRef, VHost, User_id) ->
+% case ets:match(ets_users_table(VHost), {'$1', User_id}) of
+% [[User]] -> User;
+% % this can be in clustered environment
+% [] ->
+% %update_users_from_db(DBRef, VHost),
+% SQuery = ["SELECT username FROM ",users_table(VHost)," ",
+% "WHERE user_id=\"",User_id,"\";"],
+% {data, [[Name]]} = sql_query_internal(DBRef, SQuery),
+% % cache {user, id} pair
+% ets:insert(ets_users_table(VHost), {Name, User_id}),
+% Name
+% end.
+
+%get_server_name(DBRef, VHost, Server_id) ->
+% case ets:match(ets_servers_table(VHost), {'$1', Server_id}) of
+% [[Server]] -> Server;
+ % this can be in clustered environment
+% [] ->
+% update_servers_from_db(DBRef, VHost),
+% [[Server1]] = ets:match(ets_servers_table(VHost), {'$1', Server_id}),
+% Server1
+% end.
+
+get_user_id_from_db(DBRef, VHost, User) ->
+ SQuery = ["SELECT user_id FROM ",users_table(VHost)," ",
+ "WHERE username=\"",User,"\";"],
+ case sql_query_internal(DBRef, SQuery) of
+ % no such user in db
+ {data, []} ->
+ {ok, []};
+ {data, [[DBId]]} ->
+ % cache {user, id} pair
+ ets:insert(ets_users_table(VHost), {User, DBId}),
+ {ok, DBId}
+ end.
+get_user_id(DBRef, VHost, User) ->
+ % Look at ets
+ case ets:match(ets_users_table(VHost), {User, '$1'}) of
+ [] ->
+ % Look at db
+ case get_user_id_from_db(DBRef, VHost, User) of
+ % no such user in db
+ {ok, []} ->
+ IQuery = ["INSERT INTO ",users_table(VHost)," ",
+ "SET username=\"",User,"\";"],
+ case sql_query_internal_silent(DBRef, IQuery) of
+ {updated, _} ->
+ {ok, NewId} = get_user_id_from_db(DBRef, VHost, User),
+ NewId;
+ {error, Reason} ->
+ % this can be in clustered environment
+ {match, _, _} = re:run(Reason, "#23000"),
+ ?ERROR_MSG("Duplicate key name for ~p", [User]),
+ {ok, ClID} = get_user_id_from_db(DBRef, VHost, User),
+ ClID
+ end;
+ {ok, DBId} ->
+ DBId
+ end;
+ [[EtsId]] -> EtsId
+ end.
+
+get_server_id(DBRef, VHost, Server) ->
+ case ets:match(ets_servers_table(VHost), {Server, '$1'}) of
+ [] ->
+ IQuery = ["INSERT INTO ",servers_table(VHost)," ",
+ "SET server=\"",Server,"\";"],
+ case sql_query_internal_silent(DBRef, IQuery) of
+ {updated, _} ->
+ SQuery = ["SELECT server_id FROM ",servers_table(VHost)," ",
+ "WHERE server=\"",Server,"\";"],
+ {data, [[Id]]} = sql_query_internal(DBRef, SQuery),
+ ets:insert(ets_servers_table(VHost), {Server, Id}),
+ Id;
+ {error, Reason} ->
+ % this can be in clustered environment
+ {match, _, _} = re:run(Reason, "#23000"),
+ ?ERROR_MSG("Duplicate key name for ~p", [Server]),
+ update_servers_from_db(DBRef, VHost),
+ [[Id1]] = ets:match(ets_servers_table(VHost), {Server, '$1'}),
+ Id1
+ end;
+ [[Id]] -> Id
+ end.
+
+get_resource_id_from_db(DBRef, VHost, Resource) ->
+ SQuery = ["SELECT resource_id FROM ",resources_table(VHost)," ",
+ "WHERE resource=\"",ejabberd_odbc:escape(Resource),"\";"],
+ case sql_query_internal(DBRef, SQuery) of
+ % no such resource in db
+ {data, []} ->
+ {ok, []};
+ {data, [[DBId]]} ->
+ % cache {resource, id} pair
+ ets:insert(ets_resources_table(VHost), {Resource, DBId}),
+ {ok, DBId}
+ end.
+get_resource_id(DBRef, VHost, Resource) ->
+ % Look at ets
+ case ets:match(ets_resources_table(VHost), {Resource, '$1'}) of
+ [] ->
+ % Look at db
+ case get_resource_id_from_db(DBRef, VHost, Resource) of
+ % no such resource in db
+ {ok, []} ->
+ IQuery = ["INSERT INTO ",resources_table(VHost)," ",
+ "SET resource=\"",ejabberd_odbc:escape(Resource),"\";"],
+ case sql_query_internal_silent(DBRef, IQuery) of
+ {updated, _} ->
+ {ok, NewId} = get_resource_id_from_db(DBRef, VHost, Resource),
+ NewId;
+ {error, Reason} ->
+ % this can be in clustered environment
+ {match, _, _} = re:run(Reason, "#23000"),
+ ?ERROR_MSG("Duplicate key name for ~p", [Resource]),
+ {ok, ClID} = get_resource_id_from_db(DBRef, VHost, Resource),
+ ClID
+ end;
+ {ok, DBId} ->
+ DBId
+ end;
+ [[EtsId]] -> EtsId
+ end.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% SQL internals
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+sql_query_internal(DBRef, Query) ->
+ case sql_query_internal_silent(DBRef, Query) of
+ {error, Reason} ->
+ ?ERROR_MSG("~p while ~p", [Reason, lists:append(Query)]),
+ {error, Reason};
+ Rez -> Rez
+ end.
+
+sql_query_internal_silent(DBRef, Query) ->
+ ?MYDEBUG("DOING: \"~s\"", [lists:append(Query)]),
+ get_result(mysql_conn:fetch(DBRef, Query, self(), ?MYSQL_TIMEOUT)).
+
+get_result({updated, MySQLRes}) ->
+ {updated, mysql:get_result_affected_rows(MySQLRes)};
+get_result({data, MySQLRes}) ->
+ {data, mysql:get_result_rows(MySQLRes)};
+get_result({error, "query timed out"}) ->
+ {error, "query timed out"};
+get_result({error, MySQLRes}) ->
+ Reason = mysql:get_result_reason(MySQLRes),
+ {error, Reason}.
--- mod_logdb_mysql5.erl.orig 2010-05-12 17:21:38.000000000 +0300
+++ mod_logdb_mysql5.erl 2009-07-30 09:00:14.000000000 +0300
@@ -0,0 +1,979 @@
+%%%----------------------------------------------------------------------
+%%% File : mod_logdb_mysql5.erl
+%%% Author : Oleg Palij (mailto,xmpp:[email protected])
+%%% Purpose : MySQL 5 backend for mod_logdb
+%%% Version : trunk
+%%% Id : $Id: mod_logdb_mysql5.erl 1360 2009-07-30 06:00:14Z malik $
+%%% Url : http://www.dp.uz.gov.ua/o.palij/mod_logdb/
+%%%----------------------------------------------------------------------
+
+-module(mod_logdb_mysql5).
+-author('[email protected]').
+
+-include("mod_logdb.hrl").
+-include("ejabberd.hrl").
+-include("jlib.hrl").
+
+-behaviour(gen_logdb).
+-behaviour(gen_server).
+
+% gen_server
+-export([code_change/3,handle_call/3,handle_cast/2,handle_info/2,init/1,terminate/2]).
+% gen_mod
+-export([start/2, stop/1]).
+% gen_logdb
+-export([log_message/2,
+ rebuild_stats/1,
+ rebuild_stats_at/2,
+ delete_messages_by_user_at/3, delete_all_messages_by_user_at/3, delete_messages_at/2,
+ get_vhost_stats/1, get_vhost_stats_at/2, get_user_stats/2, get_user_messages_at/3,
+ get_dates/1,
+ get_users_settings/1, get_user_settings/2, set_user_settings/3,
+ drop_user/2]).
+
+% gen_server call timeout
+-define(CALL_TIMEOUT, 30000).
+-define(MYSQL_TIMEOUT, 60000).
+-define(INDEX_SIZE, integer_to_list(170)).
+-define(PROCNAME, mod_logdb_mysql5).
+
+-import(mod_logdb, [list_to_bool/1, bool_to_list/1,
+ list_to_string/1, string_to_list/1,
+ convert_timestamp_brief/1]).
+
+-record(state, {dbref, vhost, server, port, db, user, password}).
+
+% replace "." with "_"
+escape_vhost(VHost) -> lists:map(fun(46) -> 95;
+ (A) -> A
+ end, VHost).
+prefix() ->
+ "`logdb_".
+
+suffix(VHost) ->
+ "_" ++ escape_vhost(VHost) ++ "`".
+
+messages_table(VHost, Date) ->
+ prefix() ++ "messages_" ++ Date ++ suffix(VHost).
+
+% TODO: this needs to be redone to unify view name in stored procedure and in delete_messages_at/2
+view_table(VHost, Date) ->
+ Table = messages_table(VHost, Date),
+ TablewoQ = lists:sublist(Table, 2, length(Table) - 2),
+ lists:append(["`v_", TablewoQ, "`"]).
+
+stats_table(VHost) ->
+ prefix() ++ "stats" ++ suffix(VHost).
+
+temp_table(VHost) ->
+ prefix() ++ "temp" ++ suffix(VHost).
+
+settings_table(VHost) ->
+ prefix() ++ "settings" ++ suffix(VHost).
+
+users_table(VHost) ->
+ prefix() ++ "users" ++ suffix(VHost).
+servers_table(VHost) ->
+ prefix() ++ "servers" ++ suffix(VHost).
+resources_table(VHost) ->
+ prefix() ++ "resources" ++ suffix(VHost).
+
+logmessage_name(VHost) ->
+ prefix() ++ "logmessage" ++ suffix(VHost).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% gen_mod callbacks
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+start(VHost, Opts) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:start({local, Proc}, ?MODULE, [VHost, Opts], []).
+
+stop(VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {stop}, ?CALL_TIMEOUT).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% gen_server callbacks
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+init([VHost, Opts]) ->
+ crypto:start(),
+
+ Server = gen_mod:get_opt(server, Opts, "localhost"),
+ Port = gen_mod:get_opt(port, Opts, 3306),
+ DB = gen_mod:get_opt(db, Opts, "logdb"),
+ User = gen_mod:get_opt(user, Opts, "root"),
+ Password = gen_mod:get_opt(password, Opts, ""),
+
+ St = #state{vhost=VHost,
+ server=Server, port=Port, db=DB,
+ user=User, password=Password},
+
+ case open_mysql_connection(St) of
+ {ok, DBRef} ->
+ State = St#state{dbref=DBRef},
+ ok = create_internals(State),
+ ok = create_stats_table(State),
+ ok = create_settings_table(State),
+ ok = create_users_table(State),
+ ok = create_servers_table(State),
+ ok = create_resources_table(State),
+ erlang:monitor(process, DBRef),
+ {ok, State};
+ {error, Reason} ->
+ ?ERROR_MSG("MySQL connection failed: ~p~n", [Reason]),
+ {stop, db_connection_failed}
+ end.
+
+open_mysql_connection(#state{server=Server, port=Port, db=DB,
+ user=DBUser, password=Password} = _State) ->
+ LogFun = fun(debug, _Format, _Argument) ->
+ %?MYDEBUG(Format, Argument);
+ ok;
+ (error, Format, Argument) ->
+ ?ERROR_MSG(Format, Argument);
+ (Level, Format, Argument) ->
+ ?MYDEBUG("MySQL (~p)~n", [Level]),
+ ?MYDEBUG(Format, Argument)
+ end,
+ ?INFO_MSG("Opening mysql connection ~s@~s:~p/~s", [DBUser, Server, Port, DB]),
+ mysql_conn:start(Server, Port, DBUser, Password, DB, [65536, 131072], LogFun).
+
+close_mysql_connection(DBRef) ->
+ ?MYDEBUG("Closing ~p mysql connection", [DBRef]),
+ mysql_conn:stop(DBRef).
+
+handle_call({rebuild_stats_at, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) ->
+ Reply = rebuild_stats_at_int(DBRef, VHost, Date),
+ {reply, Reply, State};
+handle_call({delete_messages_by_user_at, [], _Date}, _From, State) ->
+ {reply, error, State};
+handle_call({delete_messages_by_user_at, Msgs, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) ->
+ Temp = lists:flatmap(fun(#msg{timestamp=Timestamp} = _Msg) ->
+ ["\"",Timestamp,"\"",","]
+ end, Msgs),
+
+ Temp1 = lists:append([lists:sublist(Temp, length(Temp)-1), ");"]),
+
+ Query = ["DELETE FROM ",messages_table(VHost, Date)," ",
+ "WHERE timestamp IN (", Temp1],
+
+ Reply =
+ case sql_query_internal(DBRef, Query) of
+ {updated, Aff} ->
+ ?MYDEBUG("Aff=~p", [Aff]),
+ rebuild_stats_at_int(DBRef, VHost, Date);
+ {error, _} ->
+ error
+ end,
+ {reply, Reply, State};
+handle_call({delete_all_messages_by_user_at, User, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) ->
+ ok = delete_all_messages_by_user_at_int(DBRef, User, VHost, Date),
+ ok = delete_stats_by_user_at_int(DBRef, User, VHost, Date),
+ {reply, ok, State};
+handle_call({delete_messages_at, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) ->
+ Fun = fun() ->
+ {updated, _} = sql_query_internal(DBRef, ["DROP TABLE ",messages_table(VHost, Date),";"]),
+ TQuery = ["DELETE FROM ",stats_table(VHost)," "
+ "WHERE at=\"",Date,"\";"],
+ {updated, _} = sql_query_internal(DBRef, TQuery),
+ VQuery = ["DROP VIEW IF EXISTS ",view_table(VHost,Date),";"],
+ {updated, _} = sql_query_internal(DBRef, VQuery),
+ ok
+ end,
+ Reply =
+ case catch apply(Fun, []) of
+ ok ->
+ ok;
+ {'EXIT', _} ->
+ error
+ end,
+ {reply, Reply, State};
+handle_call({get_vhost_stats}, _From, #state{dbref=DBRef, vhost=VHost}=State) ->
+ SName = stats_table(VHost),
+ Query = ["SELECT at, sum(count) ",
+ "FROM ",SName," ",
+ "GROUP BY at ",
+ "ORDER BY DATE(at) DESC;"
+ ],
+ Reply =
+ case sql_query_internal(DBRef, Query) of
+ {data, Result} ->
+ {ok, [ {Date, list_to_integer(Count)} || [Date, Count] <- Result ]};
+ {error, Reason} ->
+ % TODO: Duplicate error message ?
+ {error, Reason}
+ end,
+ {reply, Reply, State};
+handle_call({get_vhost_stats_at, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) ->
+ SName = stats_table(VHost),
+ Query = ["SELECT username, sum(count) as allcount ",
+ "FROM ",SName," ",
+ "JOIN ",users_table(VHost)," ON owner_id=user_id "
+ "WHERE at=\"",Date,"\" ",
+ "GROUP BY username ",
+ "ORDER BY allcount DESC;"
+ ],
+ Reply =
+ case sql_query_internal(DBRef, Query) of
+ {data, Result} ->
+ {ok, [ {User, list_to_integer(Count)} || [User, Count] <- Result ]};
+ {error, Reason} ->
+ {error, Reason}
+ end,
+ {reply, Reply, State};
+handle_call({get_user_stats, User}, _From, #state{dbref=DBRef, vhost=VHost}=State) ->
+ {reply, get_user_stats_int(DBRef, User, VHost), State};
+handle_call({get_user_messages_at, User, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) ->
+ Query = ["SELECT peer_name,",
+ "peer_server,",
+ "peer_resource,",
+ "direction,"
+ "type,"
+ "subject,"
+ "body,"
+ "timestamp "
+ "FROM ",view_table(VHost, Date)," "
+ "WHERE owner_name=\"",User,"\";"],
+ Reply =
+ case sql_query_internal(DBRef, Query) of
+ {data, Result} ->
+ Fun = fun([Peer_name, Peer_server, Peer_resource,
+ Direction,
+ Type,
+ Subject, Body,
+ Timestamp]) ->
+ #msg{peer_name=Peer_name, peer_server=Peer_server, peer_resource=Peer_resource,
+ direction=list_to_atom(Direction),
+ type=Type,
+ subject=Subject, body=Body,
+ timestamp=Timestamp}
+ end,
+ {ok, lists:map(Fun, Result)};
+ {error, Reason} ->
+ {error, Reason}
+ end,
+ {reply, Reply, State};
+handle_call({get_dates}, _From, #state{dbref=DBRef, vhost=VHost}=State) ->
+ SName = stats_table(VHost),
+ Query = ["SELECT at ",
+ "FROM ",SName," ",
+ "GROUP BY at ",
+ "ORDER BY DATE(at) DESC;"
+ ],
+ Reply =
+ case sql_query_internal(DBRef, Query) of
+ {data, Result} ->
+ [ Date || [Date] <- Result ];
+ {error, Reason} ->
+ {error, Reason}
+ end,
+ {reply, Reply, State};
+handle_call({get_users_settings}, _From, #state{dbref=DBRef, vhost=VHost}=State) ->
+ Query = ["SELECT username,dolog_default,dolog_list,donotlog_list ",
+ "FROM ",settings_table(VHost)," ",
+ "JOIN ",users_table(VHost)," ON user_id=owner_id;"],
+ Reply =
+ case sql_query_internal(DBRef, Query) of
+ {data, Result} ->
+ {ok, lists:map(fun([Owner, DoLogDef, DoLogL, DoNotLogL]) ->
+ #user_settings{owner_name=Owner,
+ dolog_default=list_to_bool(DoLogDef),
+ dolog_list=string_to_list(DoLogL),
+ donotlog_list=string_to_list(DoNotLogL)
+ }
+ end, Result)};
+ {error, _} ->
+ error
+ end,
+ {reply, Reply, State};
+handle_call({get_user_settings, User}, _From, #state{dbref=DBRef, vhost=VHost}=State) ->
+ Query = ["SELECT dolog_default,dolog_list,donotlog_list FROM ",settings_table(VHost)," ",
+ "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\");"],
+ Reply =
+ case sql_query_internal(DBRef, Query) of
+ {data, []} ->
+ {ok, []};
+ {data, [[Owner, DoLogDef, DoLogL, DoNotLogL]]} ->
+ {ok, #user_settings{owner_name=Owner,
+ dolog_default=list_to_bool(DoLogDef),
+ dolog_list=string_to_list(DoLogL),
+ donotlog_list=string_to_list(DoNotLogL)}};
+ {error, _} ->
+ error
+ end,
+ {reply, Reply, State};
+handle_call({set_user_settings, User, #user_settings{dolog_default=DoLogDef,
+ dolog_list=DoLogL,
+ donotlog_list=DoNotLogL}},
+ _From, #state{dbref=DBRef, vhost=VHost} = State) ->
+ User_id = get_user_id(DBRef, VHost, User),
+ Query = ["UPDATE ",settings_table(VHost)," ",
+ "SET dolog_default=",bool_to_list(DoLogDef),", ",
+ "dolog_list='",list_to_string(DoLogL),"', ",
+ "donotlog_list='",list_to_string(DoNotLogL),"' ",
+ "WHERE owner_id=",User_id,";"],
+
+ Reply =
+ case sql_query_internal(DBRef, Query) of
+ {updated, 0} ->
+ IQuery = ["INSERT INTO ",settings_table(VHost)," ",
+ "(owner_id, dolog_default, dolog_list, donotlog_list) ",
+ "VALUES ",
+ "(",User_id,",",bool_to_list(DoLogDef),",'",list_to_string(DoLogL),"','",list_to_string(DoNotLogL),"');"],
+ case sql_query_internal_silent(DBRef, IQuery) of
+ {updated, _} ->
+ ?MYDEBUG("New settings for ~s@~s", [User, VHost]),
+ ok;
+ {error, Reason} ->
+ case re:run(Reason, "#23000") of
+ % Already exists
+ {match, _, _} ->
+ ok;
+ _ ->
+ ?ERROR_MSG("Failed setup user ~p@~p: ~p", [User, VHost, Reason]),
+ error
+ end
+ end;
+ {updated, 1} ->
+ ?MYDEBUG("Updated settings for ~s@~s", [User, VHost]),
+ ok;
+ {error, _} ->
+ error
+ end,
+ {reply, Reply, State};
+handle_call({stop}, _From, #state{vhost=VHost}=State) ->
+ ?MYDEBUG("Stoping mysql5 backend for ~p", [VHost]),
+ {stop, normal, ok, State};
+handle_call(Msg, _From, State) ->
+ ?INFO_MSG("Got call Msg: ~p, State: ~p", [Msg, State]),
+ {noreply, State}.
+
+handle_cast({log_message, Msg}, #state{dbref=DBRef, vhost=VHost}=State) ->
+ Fun = fun() ->
+ Date = convert_timestamp_brief(Msg#msg.timestamp),
+ TableName = messages_table(VHost, Date),
+
+ Query = [ "CALL ",logmessage_name(VHost)," "
+ "('", TableName, "',",
+ "'", Date, "',",
+ "'", Msg#msg.owner_name, "',",
+ "'", Msg#msg.peer_name, "',",
+ "'", Msg#msg.peer_server, "',",
+ "'", ejabberd_odbc:escape(Msg#msg.peer_resource), "',",
+ "'", atom_to_list(Msg#msg.direction), "',",
+ "'", Msg#msg.type, "',",
+ "'", ejabberd_odbc:escape(Msg#msg.subject), "',",
+ "'", ejabberd_odbc:escape(Msg#msg.body), "',",
+ "'", Msg#msg.timestamp, "');"],
+
+ case sql_query_internal(DBRef, Query) of
+ {updated, _} ->
+ ?MYDEBUG("Logged ok for ~p, peer: ~p", [Msg#msg.owner_name++"@"++VHost,
+ Msg#msg.peer_name++"@"++Msg#msg.peer_server]),
+ ok;
+ {error, _Reason} ->
+ error
+ end
+ end,
+ spawn(Fun),
+ {noreply, State};
+handle_cast({rebuild_stats}, State) ->
+ rebuild_all_stats_int(State),
+ {noreply, State};
+handle_cast({drop_user, User}, #state{vhost=VHost} = State) ->
+ Fun = fun() ->
+ {ok, DBRef} = open_mysql_connection(State),
+ {ok, Dates} = get_user_stats_int(DBRef, User, VHost),
+ MDResult = lists:map(fun({Date, _}) ->
+ delete_all_messages_by_user_at_int(DBRef, User, VHost, Date)
+ end, Dates),
+ StDResult = delete_all_stats_by_user_int(DBRef, User, VHost),
+ SDResult = delete_user_settings_int(DBRef, User, VHost),
+ case lists:all(fun(Result) when Result == ok ->
+ true;
+ (Result) when Result == error ->
+ false
+ end, lists:append([MDResult, [StDResult], [SDResult]])) of
+ true ->
+ ?INFO_MSG("Removed ~s@~s", [User, VHost]);
+ false ->
+ ?ERROR_MSG("Failed to remove ~s@~s", [User, VHost])
+ end,
+ close_mysql_connection(DBRef)
+ end,
+ spawn(Fun),
+ {noreply, State};
+handle_cast(Msg, State) ->
+ ?INFO_MSG("Got cast Msg:~p, State:~p", [Msg, State]),
+ {noreply, State}.
+
+handle_info({'DOWN', _MonitorRef, process, _Pid, _Info}, State) ->
+ {stop, connection_dropped, State};
+handle_info(Info, State) ->
+ ?INFO_MSG("Got Info:~p, State:~p", [Info, State]),
+ {noreply, State}.
+
+terminate(_Reason, #state{dbref=DBRef}=_State) ->
+ close_mysql_connection(DBRef),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% gen_logdb callbacks
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+log_message(VHost, Msg) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:cast(Proc, {log_message, Msg}).
+rebuild_stats(VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:cast(Proc, {rebuild_stats}).
+rebuild_stats_at(VHost, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {rebuild_stats_at, Date}, ?CALL_TIMEOUT).
+delete_messages_by_user_at(VHost, Msgs, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {delete_messages_by_user_at, Msgs, Date}, ?CALL_TIMEOUT).
+delete_all_messages_by_user_at(User, VHost, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {delete_all_messages_by_user_at, User, Date}, ?CALL_TIMEOUT).
+delete_messages_at(VHost, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {delete_messages_at, Date}, ?CALL_TIMEOUT).
+get_vhost_stats(VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_vhost_stats}, ?CALL_TIMEOUT).
+get_vhost_stats_at(VHost, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_vhost_stats_at, Date}, ?CALL_TIMEOUT).
+get_user_stats(User, VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_user_stats, User}, ?CALL_TIMEOUT).
+get_user_messages_at(User, VHost, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_user_messages_at, User, Date}, ?CALL_TIMEOUT).
+get_dates(VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_dates}, ?CALL_TIMEOUT).
+get_users_settings(VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_users_settings}, ?CALL_TIMEOUT).
+get_user_settings(User, VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_user_settings, User}, ?CALL_TIMEOUT).
+set_user_settings(User, VHost, Set) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {set_user_settings, User, Set}, ?CALL_TIMEOUT).
+drop_user(User, VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:cast(Proc, {drop_user, User}).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% internals
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+get_dates_int(DBRef, VHost) ->
+ case sql_query_internal(DBRef, ["SHOW TABLES"]) of
+ {data, Tables} ->
+ lists:foldl(fun([Table], Dates) ->
+ Reg = lists:sublist(prefix(),2,length(prefix())) ++ ".*" ++ escape_vhost(VHost),
+ case re:run(Table, Reg) of
+ {match, 1, _} ->
+ case re:run(Table,"[0-9]+-[0-9]+-[0-9]+") of
+ {match, S, E} ->
+ lists:append(Dates, [lists:sublist(Table,S,E)]);
+ nomatch ->
+ Dates
+ end;
+ _ ->
+ Dates
+ end
+ end, [], Tables);
+ {error, _} ->
+ []
+ end.
+
+rebuild_all_stats_int(#state{vhost=VHost}=State) ->
+ Fun = fun() ->
+ {ok, DBRef} = open_mysql_connection(State),
+ ok = delete_nonexistent_stats(DBRef, VHost),
+ case lists:filter(fun(Date) ->
+ case catch rebuild_stats_at_int(DBRef, VHost, Date) of
+ ok -> false;
+ error -> true;
+ {'EXIT', _} -> true
+ end
+ end, get_dates_int(DBRef, VHost)) of
+ [] -> ok;
+ FTables ->
+ ?ERROR_MSG("Failed to rebuild stats for ~p dates", [FTables]),
+ error
+ end,
+ close_mysql_connection(DBRef)
+ end,
+ spawn(Fun).
+
+rebuild_stats_at_int(DBRef, VHost, Date) ->
+ TempTable = temp_table(VHost),
+ Fun = fun() ->
+ Table = messages_table(VHost, Date),
+ STable = stats_table(VHost),
+
+ DQuery = [ "DELETE FROM ",STable," ",
+ "WHERE at='",Date,"';"],
+
+ ok = create_temp_table(DBRef, TempTable),
+ {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",Table," WRITE, ",TempTable," WRITE;"]),
+ SQuery = ["INSERT INTO ",TempTable," ",
+ "(owner_id,peer_name_id,peer_server_id,at,count) ",
+ "SELECT owner_id,peer_name_id,peer_server_id,\"",Date,"\",count(*) ",
+ "FROM ",Table," WHERE ext is NULL GROUP BY owner_id,peer_name_id,peer_server_id;"],
+ case sql_query_internal(DBRef, SQuery) of
+ {updated, 0} ->
+ Count = sql_query_internal(DBRef, ["SELECT count(*) FROM ",Table,";"]),
+ case Count of
+ {data, [["0"]]} ->
+ {updated, _} = sql_query_internal(DBRef, ["DROP VIEW IF EXISTS ",view_table(VHost,Date),";"]),
+ {updated, _} = sql_query_internal(DBRef, ["DROP TABLE ",Table,";"]),
+ {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",STable," WRITE;"]),
+ {updated, _} = sql_query_internal(DBRef, DQuery),
+ ok;
+ _ ->
+ ?ERROR_MSG("Failed to calculate stats for ~s table! Count was ~p.", [Date, Count]),
+ error
+ end;
+ {updated, _} ->
+ {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",STable," WRITE, ",TempTable," WRITE;"]),
+ {updated, _} = sql_query_internal(DBRef, DQuery),
+ SQuery1 = ["INSERT INTO ",STable," ",
+ "(owner_id,peer_name_id,peer_server_id,at,count) ",
+ "SELECT owner_id,peer_name_id,peer_server_id,at,count ",
+ "FROM ",TempTable,";"],
+ case sql_query_internal(DBRef, SQuery1) of
+ {updated, _} -> ok;
+ {error, _} -> error
+ end;
+ {error, _} -> error
+ end
+ end,
+
+ case catch apply(Fun, []) of
+ ok ->
+ ?INFO_MSG("Rebuilded stats for ~p at ~p", [VHost, Date]),
+ ok;
+ error ->
+ error;
+ {'EXIT', Reason} ->
+ ?ERROR_MSG("Failed to rebuild stats for ~s table: ~p.", [Date, Reason]),
+ error
+ end,
+ sql_query_internal(DBRef, ["UNLOCK TABLES;"]),
+ sql_query_internal(DBRef, ["DROP TABLE ",TempTable,";"]),
+ ok.
+
+delete_nonexistent_stats(DBRef, VHost) ->
+ Dates = get_dates_int(DBRef, VHost),
+ STable = stats_table(VHost),
+
+ Temp = lists:flatmap(fun(Date) ->
+ ["\"",Date,"\"",","]
+ end, Dates),
+ case Temp of
+ [] ->
+ ok;
+ _ ->
+ % replace last "," with ");"
+ Temp1 = lists:append([lists:sublist(Temp, length(Temp)-1), ");"]),
+ Query = ["DELETE FROM ",STable," ",
+ "WHERE at NOT IN (", Temp1],
+ case sql_query_internal(DBRef, Query) of
+ {updated, _} ->
+ ok;
+ {error, _} ->
+ error
+ end
+ end.
+
+get_user_stats_int(DBRef, User, VHost) ->
+ SName = stats_table(VHost),
+ UName = users_table(VHost),
+ Query = ["SELECT stats.at, sum(stats.count) ",
+ "FROM ",UName," AS users ",
+ "JOIN ",SName," AS stats ON owner_id=user_id "
+ "WHERE users.username=\"",User,"\" ",
+ "GROUP BY stats.at "
+ "ORDER BY DATE(stats.at) DESC;"
+ ],
+ case sql_query_internal(DBRef, Query) of
+ {data, Result} ->
+ {ok, [ {Date, list_to_integer(Count)} || [Date, Count] <- Result ]};
+ {error, Result} ->
+ {error, Result}
+ end.
+
+delete_all_messages_by_user_at_int(DBRef, User, VHost, Date) ->
+ DQuery = ["DELETE FROM ",messages_table(VHost, Date)," ",
+ "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\");"],
+ case sql_query_internal(DBRef, DQuery) of
+ {updated, _} ->
+ ?INFO_MSG("Dropped messages for ~s@~s at ~s", [User, VHost, Date]),
+ ok;
+ {error, _} ->
+ error
+ end.
+
+delete_all_stats_by_user_int(DBRef, User, VHost) ->
+ SQuery = ["DELETE FROM ",stats_table(VHost)," ",
+ "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\");"],
+ case sql_query_internal(DBRef, SQuery) of
+ {updated, _} ->
+ ?INFO_MSG("Dropped all stats for ~s@~s", [User, VHost]),
+ ok;
+ {error, _} -> error
+ end.
+
+delete_stats_by_user_at_int(DBRef, User, VHost, Date) ->
+ SQuery = ["DELETE FROM ",stats_table(VHost)," ",
+ "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\") ",
+ "AND at=\"",Date,"\";"],
+ case sql_query_internal(DBRef, SQuery) of
+ {updated, _} ->
+ ?INFO_MSG("Dropped stats for ~s@~s at ~s", [User, VHost, Date]),
+ ok;
+ {error, _} -> error
+ end.
+
+delete_user_settings_int(DBRef, User, VHost) ->
+ Query = ["DELETE FROM ",settings_table(VHost)," ",
+ "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\");"],
+ case sql_query_internal(DBRef, Query) of
+ {updated, _} ->
+ ?INFO_MSG("Dropped ~s@~s settings", [User, VHost]),
+ ok;
+ {error, Reason} ->
+ ?ERROR_MSG("Failed to drop ~s@~s settings: ~p", [User, VHost, Reason]),
+ error
+ end.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% tables internals
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+create_temp_table(DBRef, Name) ->
+ Query = ["CREATE TABLE IF NOT EXISTS ",Name," (",
+ "owner_id MEDIUMINT UNSIGNED, ",
+ "peer_name_id MEDIUMINT UNSIGNED, ",
+ "peer_server_id MEDIUMINT UNSIGNED, ",
+ "at VARCHAR(11), ",
+ "count INT(11) ",
+ ") ENGINE=MyISAM CHARACTER SET utf8;"
+ ],
+ case sql_query_internal(DBRef, Query) of
+ {updated, _} -> ok;
+ {error, _Reason} -> error
+ end.
+
+create_stats_table(#state{dbref=DBRef, vhost=VHost}=State) ->
+ SName = stats_table(VHost),
+ Query = ["CREATE TABLE IF NOT EXISTS ",SName," (",
+ "owner_id MEDIUMINT UNSIGNED, ",
+ "peer_name_id MEDIUMINT UNSIGNED, ",
+ "peer_server_id MEDIUMINT UNSIGNED, ",
+ "at VARCHAR(11), ",
+ "count INT(11), ",
+ "ext INTEGER DEFAULT NULL, "
+ "INDEX ext_i (ext), "
+ "INDEX(owner_id,peer_name_id,peer_server_id), ",
+ "INDEX(at) ",
+ ") ENGINE=MyISAM CHARACTER SET utf8;"
+ ],
+ case sql_query_internal_silent(DBRef, Query) of
+ {updated, _} ->
+ ?MYDEBUG("Created stats table for ~p", [VHost]),
+ rebuild_all_stats_int(State),
+ ok;
+ {error, Reason} ->
+ case re:run(Reason, "#42S01") of
+ {match, _, _} ->
+ ?MYDEBUG("Stats table for ~p already exists", [VHost]),
+ CheckQuery = ["SHOW COLUMNS FROM ",SName," LIKE 'peer_%_id';"],
+ case sql_query_internal(DBRef, CheckQuery) of
+ {data, Elems} when length(Elems) == 2 ->
+ ?MYDEBUG("Stats table structure is ok", []),
+ ok;
+ _ ->
+ ?INFO_MSG("It seems like stats table structure is invalid. I will drop it and recreate", []),
+ case sql_query_internal(DBRef, ["DROP TABLE ",SName,";"]) of
+ {updated, _} ->
+ ?INFO_MSG("Successfully dropped ~p", [SName]);
+ _ ->
+ ?ERROR_MSG("Failed to drop ~p. You should drop it and restart module", [SName])
+ end,
+ error
+ end;
+ _ ->
+ ?ERROR_MSG("Failed to create stats table for ~p: ~p", [VHost, Reason]),
+ error
+ end
+ end.
+
+create_settings_table(#state{dbref=DBRef, vhost=VHost}) ->
+ SName = settings_table(VHost),
+ Query = ["CREATE TABLE IF NOT EXISTS ",SName," (",
+ "owner_id MEDIUMINT UNSIGNED PRIMARY KEY, ",
+ "dolog_default TINYINT(1) NOT NULL DEFAULT 1, ",
+ "dolog_list TEXT, ",
+ "donotlog_list TEXT ",
+ ") ENGINE=InnoDB CHARACTER SET utf8;"
+ ],
+ case sql_query_internal(DBRef, Query) of
+ {updated, _} ->
+ ?MYDEBUG("Created settings table for ~p", [VHost]),
+ ok;
+ {error, _} ->
+ error
+ end.
+
+create_users_table(#state{dbref=DBRef, vhost=VHost}) ->
+ SName = users_table(VHost),
+ Query = ["CREATE TABLE IF NOT EXISTS ",SName," (",
+ "username TEXT NOT NULL, ",
+ "user_id MEDIUMINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE, ",
+ "UNIQUE INDEX(username(",?INDEX_SIZE,")) ",
+ ") ENGINE=InnoDB CHARACTER SET utf8;"
+ ],
+ case sql_query_internal(DBRef, Query) of
+ {updated, _} ->
+ ?MYDEBUG("Created users table for ~p", [VHost]),
+ ok;
+ {error, _} ->
+ error
+ end.
+
+create_servers_table(#state{dbref=DBRef, vhost=VHost}) ->
+ SName = servers_table(VHost),
+ Query = ["CREATE TABLE IF NOT EXISTS ",SName," (",
+ "server TEXT NOT NULL, ",
+ "server_id MEDIUMINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE, ",
+ "UNIQUE INDEX(server(",?INDEX_SIZE,")) ",
+ ") ENGINE=InnoDB CHARACTER SET utf8;"
+ ],
+ case sql_query_internal(DBRef, Query) of
+ {updated, _} ->
+ ?MYDEBUG("Created servers table for ~p", [VHost]),
+ ok;
+ {error, _} ->
+ error
+ end.
+
+create_resources_table(#state{dbref=DBRef, vhost=VHost}) ->
+ RName = resources_table(VHost),
+ Query = ["CREATE TABLE IF NOT EXISTS ",RName," (",
+ "resource TEXT NOT NULL, ",
+ "resource_id MEDIUMINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE, ",
+ "UNIQUE INDEX(resource(",?INDEX_SIZE,")) ",
+ ") ENGINE=InnoDB CHARACTER SET utf8;"
+ ],
+ case sql_query_internal(DBRef, Query) of
+ {updated, _} ->
+ ?MYDEBUG("Created resources table for ~p", [VHost]),
+ ok;
+ {error, _} ->
+ error
+ end.
+
+create_internals(#state{dbref=DBRef, vhost=VHost}) ->
+ sql_query_internal(DBRef, ["DROP PROCEDURE IF EXISTS ",logmessage_name(VHost),";"]),
+ case sql_query_internal(DBRef, [get_logmessage(VHost)]) of
+ {updated, _} ->
+ ?MYDEBUG("Created logmessage for ~p", [VHost]),
+ ok;
+ {error, _} ->
+ error
+ end.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% SQL internals
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+sql_query_internal(DBRef, Query) ->
+ case sql_query_internal_silent(DBRef, Query) of
+ {error, Reason} ->
+ ?ERROR_MSG("~p while ~p", [Reason, lists:append(Query)]),
+ {error, Reason};
+ Rez -> Rez
+ end.
+
+sql_query_internal_silent(DBRef, Query) ->
+ ?MYDEBUG("DOING: \"~s\"", [lists:append(Query)]),
+ get_result(mysql_conn:fetch(DBRef, Query, self(), ?MYSQL_TIMEOUT)).
+
+get_result({updated, MySQLRes}) ->
+ {updated, mysql:get_result_affected_rows(MySQLRes)};
+get_result({data, MySQLRes}) ->
+ {data, mysql:get_result_rows(MySQLRes)};
+get_result({error, "query timed out"}) ->
+ {error, "query timed out"};
+get_result({error, MySQLRes}) ->
+ Reason = mysql:get_result_reason(MySQLRes),
+ {error, Reason}.
+
+get_user_id(DBRef, VHost, User) ->
+ SQuery = ["SELECT user_id FROM ",users_table(VHost)," ",
+ "WHERE username=\"",User,"\";"],
+ case sql_query_internal(DBRef, SQuery) of
+ {data, []} ->
+ IQuery = ["INSERT INTO ",users_table(VHost)," ",
+ "SET username=\"",User,"\";"],
+ case sql_query_internal_silent(DBRef, IQuery) of
+ {updated, _} ->
+ {data, [[DBIdNew]]} = sql_query_internal(DBRef, SQuery),
+ DBIdNew;
+ {error, Reason} ->
+ % this can be in clustered environment
+ {match, _, _} = re:run(Reason, "#23000"),
+ ?ERROR_MSG("Duplicate key name for ~p", [User]),
+ {data, [[ClID]]} = sql_query_internal(DBRef, SQuery),
+ ClID
+ end;
+ {data, [[DBId]]} ->
+ DBId
+ end.
+
+get_logmessage(VHost) ->
+ UName = users_table(VHost),
+ SName = servers_table(VHost),
+ RName = resources_table(VHost),
+ StName = stats_table(VHost),
+ io_lib:format("
+CREATE PROCEDURE ~s(tablename TEXT, atdate TEXT, owner TEXT, peer_name TEXT, peer_server TEXT, peer_resource TEXT, mdirection VARCHAR(4), mtype VARCHAR(10), msubject TEXT, mbody TEXT, mtimestamp DOUBLE)
+BEGIN
+ DECLARE ownerID MEDIUMINT UNSIGNED;
+ DECLARE peer_nameID MEDIUMINT UNSIGNED;
+ DECLARE peer_serverID MEDIUMINT UNSIGNED;
+ DECLARE peer_resourceID MEDIUMINT UNSIGNED;
+ DECLARE Vmtype VARCHAR(10);
+ DECLARE Vmtimestamp DOUBLE;
+ DECLARE Vmdirection VARCHAR(4);
+ DECLARE Vmbody TEXT;
+ DECLARE Vmsubject TEXT;
+ DECLARE iq TEXT;
+ DECLARE cq TEXT;
+ DECLARE viewname TEXT;
+ DECLARE notable INT;
+ DECLARE CONTINUE HANDLER FOR SQLSTATE '42S02' SET @notable = 1;
+
+ SET @notable = 0;
+ SET @ownerID = NULL;
+ SET @peer_nameID = NULL;
+ SET @peer_serverID = NULL;
+ SET @peer_resourceID = NULL;
+
+ SET @Vmtype = mtype;
+ SET @Vmtimestamp = mtimestamp;
+ SET @Vmdirection = mdirection;
+ SET @Vmbody = mbody;
+ SET @Vmsubject = msubject;
+
+ SELECT user_id INTO @ownerID FROM ~s WHERE username=owner;
+ IF @ownerID IS NULL THEN
+ INSERT INTO ~s SET username=owner;
+ SET @ownerID = LAST_INSERT_ID();
+ END IF;
+
+ SELECT user_id INTO @peer_nameID FROM ~s WHERE username=peer_name;
+ IF @peer_nameID IS NULL THEN
+ INSERT INTO ~s SET username=peer_name;
+ SET @peer_nameID = LAST_INSERT_ID();
+ END IF;
+
+ SELECT server_id INTO @peer_serverID FROM ~s WHERE server=peer_server;
+ IF @peer_serverID IS NULL THEN
+ INSERT INTO ~s SET server=peer_server;
+ SET @peer_serverID = LAST_INSERT_ID();
+ END IF;
+
+ SELECT resource_id INTO @peer_resourceID FROM ~s WHERE resource=peer_resource;
+ IF @peer_resourceID IS NULL THEN
+ INSERT INTO ~s SET resource=peer_resource;
+ SET @peer_resourceID = LAST_INSERT_ID();
+ END IF;
+
+ SET @iq = CONCAT(\"INSERT INTO \",tablename,\" (owner_id, peer_name_id, peer_server_id, peer_resource_id, direction, type, subject, body, timestamp) VALUES (@ownerID,@peer_nameID,@peer_serverID,@peer_resourceID,@Vmdirection,@Vmtype,@Vmsubject,@Vmbody,@Vmtimestamp);\");
+ PREPARE insertmsg FROM @iq;
+
+ IF @notable = 1 THEN
+ SET @cq = CONCAT(\"CREATE TABLE IF NOT EXISTS \",tablename,\" (
+ owner_id MEDIUMINT UNSIGNED NOT NULL,
+ peer_name_id MEDIUMINT UNSIGNED NOT NULL,
+ peer_server_id MEDIUMINT UNSIGNED NOT NULL,
+ peer_resource_id MEDIUMINT(8) UNSIGNED NOT NULL,
+ direction ENUM('to', 'from') NOT NULL,
+ type ENUM('chat','error','groupchat','headline','normal') NOT NULL,
+ subject TEXT,
+ body TEXT,
+ timestamp DOUBLE NOT NULL,
+ ext INTEGER DEFAULT NULL,
+ INDEX search_i (owner_id, peer_name_id, peer_server_id, peer_resource_id),
+ INDEX ext_i (ext),
+ FULLTEXT (body)
+ ) ENGINE=MyISAM
+ PACK_KEYS=1
+ CHARACTER SET utf8;\");
+ PREPARE createtable FROM @cq;
+ EXECUTE createtable;
+ DEALLOCATE PREPARE createtable;
+
+ SET @viewname = CONCAT(\"`v_\", TRIM(BOTH '`' FROM tablename), \"`\");
+ SET @cq = CONCAT(\"CREATE OR REPLACE VIEW \",@viewname,\" AS
+ SELECT owner.username AS owner_name,
+ peer.username AS peer_name,
+ servers.server AS peer_server,
+ resources.resource AS peer_resource,
+ messages.direction,
+ messages.type,
+ messages.subject,
+ messages.body,
+ messages.timestamp
+ FROM
+ ~s owner,
+ ~s peer,
+ ~s servers,
+ ~s resources,
+ \", tablename,\" messages
+ WHERE
+ owner.user_id=messages.owner_id and
+ peer.user_id=messages.peer_name_id and
+ servers.server_id=messages.peer_server_id and
+ resources.resource_id=messages.peer_resource_id
+ ORDER BY messages.timestamp;\");
+ PREPARE createview FROM @cq;
+ EXECUTE createview;
+ DEALLOCATE PREPARE createview;
+
+ SET @notable = 0;
+ PREPARE insertmsg FROM @iq;
+ EXECUTE insertmsg;
+ ELSEIF @notable = 0 THEN
+ EXECUTE insertmsg;
+ END IF;
+
+ DEALLOCATE PREPARE insertmsg;
+
+ IF @notable = 0 THEN
+ UPDATE ~s SET count=count+1 WHERE owner_id=@ownerID AND peer_name_id=@peer_nameID AND peer_server_id=@peer_serverID AND at=atdate;
+ IF ROW_COUNT() = 0 THEN
+ INSERT INTO ~s (owner_id, peer_name_id, peer_server_id, at, count) VALUES (@ownerID, @peer_nameID, @peer_serverID, atdate, 1);
+ END IF;
+ END IF;
+END;", [logmessage_name(VHost),UName,UName,UName,UName,SName,SName,RName,RName,UName,UName,SName,RName,StName,StName]).
--- mod_logdb_pgsql.erl.orig 2010-05-12 17:21:38.000000000 +0300
+++ mod_logdb_pgsql.erl 2009-07-30 09:49:10.000000000 +0300
@@ -0,0 +1,1104 @@
+%%%----------------------------------------------------------------------
+%%% File : mod_logdb_pgsql.erl
+%%% Author : Oleg Palij (mailto,xmpp:[email protected])
+%%% Purpose : Posgresql backend for mod_logdb
+%%% Version : trunk
+%%% Id : $Id: mod_logdb_pgsql.erl 1360 2009-07-30 06:00:14Z malik $
+%%% Url : http://www.dp.uz.gov.ua/o.palij/mod_logdb/
+%%%----------------------------------------------------------------------
+
+-module(mod_logdb_pgsql).
+-author('[email protected]').
+
+-include("mod_logdb.hrl").
+-include("ejabberd.hrl").
+-include("jlib.hrl").
+
+-behaviour(gen_logdb).
+-behaviour(gen_server).
+
+% gen_server
+-export([code_change/3,handle_call/3,handle_cast/2,handle_info/2,init/1,terminate/2]).
+% gen_mod
+-export([start/2, stop/1]).
+% gen_logdb
+-export([log_message/2,
+ rebuild_stats/1,
+ rebuild_stats_at/2,
+ delete_messages_by_user_at/3, delete_all_messages_by_user_at/3, delete_messages_at/2,
+ get_vhost_stats/1, get_vhost_stats_at/2, get_user_stats/2, get_user_messages_at/3,
+ get_dates/1,
+ get_users_settings/1, get_user_settings/2, set_user_settings/3,
+ drop_user/2]).
+
+-export([view_table/3]).
+
+% gen_server call timeout
+-define(CALL_TIMEOUT, 30000).
+-define(PGSQL_TIMEOUT, 60000).
+-define(PROCNAME, mod_logdb_pgsql).
+
+-import(mod_logdb, [list_to_bool/1, bool_to_list/1,
+ list_to_string/1, string_to_list/1,
+ convert_timestamp_brief/1]).
+
+-record(state, {dbref, vhost, server, port, db, user, password, schema}).
+
+% replace "." with "_"
+escape_vhost(VHost) -> lists:map(fun(46) -> 95;
+ (A) -> A
+ end, VHost).
+
+prefix(Schema) ->
+ Schema ++ ".\"" ++ "logdb_".
+
+suffix(VHost) ->
+ "_" ++ escape_vhost(VHost) ++ "\"".
+
+messages_table(VHost, Schema, Date) ->
+ prefix(Schema) ++ "messages_" ++ Date ++ suffix(VHost).
+
+view_table(VHost, Schema, Date) ->
+ Table = messages_table(VHost, Schema, Date),
+ TablewoS = lists:sublist(Table, length(Schema) + 3, length(Table) - length(Schema) - 3),
+ lists:append([Schema, ".\"v_", TablewoS, "\""]).
+
+stats_table(VHost, Schema) ->
+ prefix(Schema) ++ "stats" ++ suffix(VHost).
+
+temp_table(VHost, Schema) ->
+ prefix(Schema) ++ "temp" ++ suffix(VHost).
+
+settings_table(VHost, Schema) ->
+ prefix(Schema) ++ "settings" ++ suffix(VHost).
+
+users_table(VHost, Schema) ->
+ prefix(Schema) ++ "users" ++ suffix(VHost).
+servers_table(VHost, Schema) ->
+ prefix(Schema) ++ "servers" ++ suffix(VHost).
+resources_table(VHost, Schema) ->
+ prefix(Schema) ++ "resources" ++ suffix(VHost).
+
+logmessage_name(VHost, Schema) ->
+ prefix(Schema) ++ "logmessage" ++ suffix(VHost).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% gen_mod callbacks
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+start(VHost, Opts) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:start({local, Proc}, ?MODULE, [VHost, Opts], []).
+
+stop(VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {stop}, ?CALL_TIMEOUT).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% gen_server callbacks
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+init([VHost, Opts]) ->
+ Server = gen_mod:get_opt(server, Opts, "localhost"),
+ DB = gen_mod:get_opt(db, Opts, "ejabberd_logdb"),
+ User = gen_mod:get_opt(user, Opts, "root"),
+ Port = gen_mod:get_opt(port, Opts, 5432),
+ Password = gen_mod:get_opt(password, Opts, ""),
+ Schema = gen_mod:get_opt(schema, Opts, "public"),
+
+ ?MYDEBUG("Starting pgsql backend for ~p", [VHost]),
+
+ St = #state{vhost=VHost,
+ server=Server, port=Port, db=DB,
+ user=User, password=Password,
+ schema=Schema},
+
+ case open_pgsql_connection(St) of
+ {ok, DBRef} ->
+ State = St#state{dbref=DBRef},
+ ok = create_internals(State),
+ ok = create_stats_table(State),
+ ok = create_settings_table(State),
+ ok = create_users_table(State),
+ ok = create_servers_table(State),
+ ok = create_resources_table(State),
+ erlang:monitor(process, DBRef),
+ {ok, State};
+ % this does not work
+ {error, Reason} ->
+ ?ERROR_MSG("PgSQL connection failed: ~p~n", [Reason]),
+ {stop, db_connection_failed};
+ % and this too, becouse pgsql_conn do exit() which can not be catched
+ {'EXIT', Rez} ->
+ ?ERROR_MSG("Rez: ~p~n", [Rez]),
+ {stop, db_connection_failed}
+ end.
+
+open_pgsql_connection(#state{server=Server, port=Port, db=DB, schema=Schema,
+ user=User, password=Password} = _State) ->
+ ?INFO_MSG("Opening pgsql connection ~s@~s:~p/~s", [User, Server, Port, DB]),
+ {ok, DBRef} = pgsql:connect(Server, DB, User, Password, Port),
+ {updated, _} = sql_query_internal(DBRef, ["SET SEARCH_PATH TO ",Schema,";"]),
+ {ok, DBRef}.
+
+close_pgsql_connection(DBRef) ->
+ ?MYDEBUG("Closing ~p pgsql connection", [DBRef]),
+ pgsql:terminate(DBRef).
+
+handle_call({log_message, Msg}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) ->
+ Date = convert_timestamp_brief(Msg#msg.timestamp),
+ TableName = messages_table(VHost, Schema, Date),
+ ViewName = view_table(VHost, Schema, Date),
+
+ Query = [ "SELECT ", logmessage_name(VHost, Schema)," "
+ "('", TableName, "',",
+ "'", ViewName, "',",
+ "'", Date, "',",
+ "'", Msg#msg.owner_name, "',",
+ "'", Msg#msg.peer_name, "',",
+ "'", Msg#msg.peer_server, "',",
+ "'", ejabberd_odbc:escape(Msg#msg.peer_resource), "',",
+ "'", atom_to_list(Msg#msg.direction), "',",
+ "'", Msg#msg.type, "',",
+ "'", ejabberd_odbc:escape(Msg#msg.subject), "',",
+ "'", ejabberd_odbc:escape(Msg#msg.body), "',",
+ "'", Msg#msg.timestamp, "');"],
+
+ case sql_query_internal_silent(DBRef, Query) of
+ % TODO: change this
+ {data, [{"0"}]} ->
+ ?MYDEBUG("Logged ok for ~p, peer: ~p", [Msg#msg.owner_name++"@"++VHost,
+ Msg#msg.peer_name++"@"++Msg#msg.peer_server]),
+ ok;
+ {error, _Reason} ->
+ error
+ end,
+ {reply, ok, State};
+handle_call({rebuild_stats_at, Date}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) ->
+ Reply = rebuild_stats_at_int(DBRef, VHost, Schema, Date),
+ {reply, Reply, State};
+handle_call({delete_messages_by_user_at, [], _Date}, _From, State) ->
+ {reply, error, State};
+handle_call({delete_messages_by_user_at, Msgs, Date}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) ->
+ Temp = lists:flatmap(fun(#msg{timestamp=Timestamp} = _Msg) ->
+ ["'",Timestamp,"'",","]
+ end, Msgs),
+
+ Temp1 = lists:append([lists:sublist(Temp, length(Temp)-1), ");"]),
+
+ Query = ["DELETE FROM ",messages_table(VHost, Schema, Date)," ",
+ "WHERE timestamp IN (", Temp1],
+
+ Reply =
+ case sql_query_internal(DBRef, Query) of
+ {updated, _} ->
+ rebuild_stats_at_int(DBRef, VHost, Schema, Date);
+ {error, _} ->
+ error
+ end,
+ {reply, Reply, State};
+handle_call({delete_all_messages_by_user_at, User, Date}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) ->
+ ok = delete_all_messages_by_user_at_int(DBRef, Schema, User, VHost, Date),
+ ok = delete_stats_by_user_at_int(DBRef, Schema, User, VHost, Date),
+ {reply, ok, State};
+handle_call({delete_messages_at, Date}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) ->
+ {updated, _} = sql_query_internal(DBRef, ["DROP VIEW ",view_table(VHost, Schema, Date),";"]),
+ Reply =
+ case sql_query_internal(DBRef, ["DROP TABLE ",messages_table(VHost, Schema, Date)," CASCADE;"]) of
+ {updated, _} ->
+ Query = ["DELETE FROM ",stats_table(VHost, Schema)," "
+ "WHERE at='",Date,"';"],
+ case sql_query_internal(DBRef, Query) of
+ {updated, _} ->
+ ok;
+ {error, _} ->
+ error
+ end;
+ {error, _} ->
+ error
+ end,
+ {reply, Reply, State};
+handle_call({get_vhost_stats}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) ->
+ SName = stats_table(VHost, Schema),
+ Query = ["SELECT at, sum(count) ",
+ "FROM ",SName," ",
+ "GROUP BY at ",
+ "ORDER BY DATE(at) DESC;"
+ ],
+ Reply =
+ case sql_query_internal(DBRef, Query) of
+ {data, Recs} ->
+ {ok, [ {Date, list_to_integer(Count)} || {Date, Count} <- Recs]};
+ {error, Reason} ->
+ % TODO: Duplicate error message ?
+ {error, Reason}
+ end,
+ {reply, Reply, State};
+handle_call({get_vhost_stats_at, Date}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) ->
+ SName = stats_table(VHost, Schema),
+ Query = ["SELECT username, sum(count) AS allcount ",
+ "FROM ",SName," ",
+ "JOIN ",users_table(VHost, Schema)," ON owner_id=user_id ",
+ "WHERE at='",Date,"' ",
+ "GROUP BY username ",
+ "ORDER BY allcount DESC;"
+ ],
+ Reply =
+ case sql_query_internal(DBRef, Query) of
+ {data, Recs} ->
+ RFun = fun({User, Count}) ->
+ {User, list_to_integer(Count)}
+ end,
+ {ok, lists:reverse(lists:keysort(2, lists:map(RFun, Recs)))};
+ {error, Reason} ->
+ % TODO:
+ {error, Reason}
+ end,
+ {reply, Reply, State};
+handle_call({get_user_stats, User}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) ->
+ {reply, get_user_stats_int(DBRef, Schema, User, VHost), State};
+handle_call({get_user_messages_at, User, Date}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) ->
+ Query = ["SELECT peer_name,",
+ "peer_server,",
+ "peer_resource,",
+ "direction,"
+ "type,"
+ "subject,"
+ "body,"
+ "timestamp "
+ "FROM ",view_table(VHost, Schema, Date)," "
+ "WHERE owner_name='",User,"';"],
+ Reply =
+ case sql_query_internal(DBRef, Query) of
+ {data, Recs} ->
+ Fun = fun({Peer_name, Peer_server, Peer_resource,
+ Direction,
+ Type,
+ Subject, Body,
+ Timestamp}) ->
+ #msg{peer_name=Peer_name, peer_server=Peer_server, peer_resource=Peer_resource,
+ direction=list_to_atom(Direction),
+ type=Type,
+ subject=Subject, body=Body,
+ timestamp=Timestamp}
+ end,
+ {ok, lists:map(Fun, Recs)};
+ {error, Reason} ->
+ {error, Reason}
+ end,
+ {reply, Reply, State};
+handle_call({get_dates}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) ->
+ SName = stats_table(VHost, Schema),
+ Query = ["SELECT at ",
+ "FROM ",SName," ",
+ "GROUP BY at ",
+ "ORDER BY at DESC;"
+ ],
+ Reply =
+ case sql_query_internal(DBRef, Query) of
+ {data, Result} ->
+ [ Date || {Date} <- Result ];
+ {error, Reason} ->
+ {error, Reason}
+ end,
+ {reply, Reply, State};
+handle_call({get_users_settings}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) ->
+ Query = ["SELECT username,dolog_default,dolog_list,donotlog_list ",
+ "FROM ",settings_table(VHost, Schema)," ",
+ "JOIN ",users_table(VHost, Schema)," ON user_id=owner_id;"],
+ Reply =
+ case sql_query_internal(DBRef, Query) of
+ {data, Recs} ->
+ {ok, [#user_settings{owner_name=Owner,
+ dolog_default=list_to_bool(DoLogDef),
+ dolog_list=string_to_list(DoLogL),
+ donotlog_list=string_to_list(DoNotLogL)
+ } || {Owner, DoLogDef, DoLogL, DoNotLogL} <- Recs]};
+ {error, Reason} ->
+ {error, Reason}
+ end,
+ {reply, Reply, State};
+handle_call({get_user_settings, User}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) ->
+ Query = ["SELECT dolog_default,dolog_list,donotlog_list ",
+ "FROM ",settings_table(VHost, Schema)," ",
+ "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost, Schema)," WHERE username='",User,"');"],
+ Reply =
+ case sql_query_internal_silent(DBRef, Query) of
+ {data, []} ->
+ {ok, []};
+ {data, [{DoLogDef, DoLogL, DoNotLogL}]} ->
+ {ok, #user_settings{owner_name=User,
+ dolog_default=list_to_bool(DoLogDef),
+ dolog_list=string_to_list(DoLogL),
+ donotlog_list=string_to_list(DoNotLogL)}};
+ {error, Reason} ->
+ ?ERROR_MSG("Failed to get_user_settings for ~p@~p: ~p", [User, VHost, Reason]),
+ error
+ end,
+ {reply, Reply, State};
+handle_call({set_user_settings, User, #user_settings{dolog_default=DoLogDef,
+ dolog_list=DoLogL,
+ donotlog_list=DoNotLogL}},
+ _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) ->
+ User_id = get_user_id(DBRef, VHost, Schema, User),
+ Query = ["UPDATE ",settings_table(VHost, Schema)," ",
+ "SET dolog_default=",bool_to_list(DoLogDef),", ",
+ "dolog_list='",list_to_string(DoLogL),"', ",
+ "donotlog_list='",list_to_string(DoNotLogL),"' ",
+ "WHERE owner_id=",User_id,";"],
+
+ Reply =
+ case sql_query_internal(DBRef, Query) of
+ {updated, 0} ->
+ IQuery = ["INSERT INTO ",settings_table(VHost, Schema)," ",
+ "(owner_id, dolog_default, dolog_list, donotlog_list) ",
+ "VALUES ",
+ "(",User_id,", ",bool_to_list(DoLogDef),",'",list_to_string(DoLogL),"','",list_to_string(DoNotLogL),"');"],
+ case sql_query_internal(DBRef, IQuery) of
+ {updated, 1} ->
+ ?MYDEBUG("New settings for ~s@~s", [User, VHost]),
+ ok;
+ {error, _} ->
+ error
+ end;
+ {updated, 1} ->
+ ?MYDEBUG("Updated settings for ~s@~s", [User, VHost]),
+ ok;
+ {error, _} ->
+ error
+ end,
+ {reply, Reply, State};
+handle_call({stop}, _From, State) ->
+ ?MYDEBUG("Stoping pgsql backend for ~p", [State#state.vhost]),
+ {stop, normal, ok, State};
+handle_call(Msg, _From, State) ->
+ ?INFO_MSG("Got call Msg: ~p, State: ~p", [Msg, State]),
+ {noreply, State}.
+
+
+handle_cast({rebuild_stats}, State) ->
+ rebuild_all_stats_int(State),
+ {noreply, State};
+handle_cast({drop_user, User}, #state{vhost=VHost, schema=Schema}=State) ->
+ Fun = fun() ->
+ {ok, DBRef} = open_pgsql_connection(State),
+ {ok, Dates} = get_user_stats_int(DBRef, Schema, User, VHost),
+ MDResult = lists:map(fun({Date, _}) ->
+ delete_all_messages_by_user_at_int(DBRef, Schema, User, VHost, Date)
+ end, Dates),
+ StDResult = delete_all_stats_by_user_int(DBRef, Schema, User, VHost),
+ SDResult = delete_user_settings_int(DBRef, Schema, User, VHost),
+ case lists:all(fun(Result) when Result == ok ->
+ true;
+ (Result) when Result == error ->
+ false
+ end, lists:append([MDResult, [StDResult], [SDResult]])) of
+ true ->
+ ?INFO_MSG("Removed ~s@~s", [User, VHost]);
+ false ->
+ ?ERROR_MSG("Failed to remove ~s@~s", [User, VHost])
+ end,
+ close_pgsql_connection(DBRef)
+ end,
+ spawn(Fun),
+ {noreply, State};
+handle_cast(Msg, State) ->
+ ?INFO_MSG("Got cast Msg:~p, State:~p", [Msg, State]),
+ {noreply, State}.
+
+handle_info({'DOWN', _MonitorRef, process, _Pid, _Info}, State) ->
+ {stop, connection_dropped, State};
+handle_info(Info, State) ->
+ ?INFO_MSG("Got Info:~p, State:~p", [Info, State]),
+ {noreply, State}.
+
+terminate(_Reason, #state{dbref=DBRef}=_State) ->
+ close_pgsql_connection(DBRef),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% gen_logdb callbacks
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+log_message(VHost, Msg) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {log_message, Msg}, ?CALL_TIMEOUT).
+rebuild_stats(VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:cast(Proc, {rebuild_stats}).
+rebuild_stats_at(VHost, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {rebuild_stats_at, Date}, ?CALL_TIMEOUT).
+delete_messages_by_user_at(VHost, Msgs, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {delete_messages_by_user_at, Msgs, Date}, ?CALL_TIMEOUT).
+delete_all_messages_by_user_at(User, VHost, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {delete_all_messages_by_user_at, User, Date}, ?CALL_TIMEOUT).
+delete_messages_at(VHost, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {delete_messages_at, Date}, ?CALL_TIMEOUT).
+get_vhost_stats(VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_vhost_stats}, ?CALL_TIMEOUT).
+get_vhost_stats_at(VHost, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_vhost_stats_at, Date}, ?CALL_TIMEOUT).
+get_user_stats(User, VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_user_stats, User}, ?CALL_TIMEOUT).
+get_user_messages_at(User, VHost, Date) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_user_messages_at, User, Date}, ?CALL_TIMEOUT).
+get_dates(VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_dates}, ?CALL_TIMEOUT).
+get_users_settings(VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_users_settings}, ?CALL_TIMEOUT).
+get_user_settings(User, VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {get_user_settings, User}, ?CALL_TIMEOUT).
+set_user_settings(User, VHost, Set) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:call(Proc, {set_user_settings, User, Set}, ?CALL_TIMEOUT).
+drop_user(User, VHost) ->
+ Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
+ gen_server:cast(Proc, {drop_user, User}).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% internals
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+get_dates_int(DBRef, VHost) ->
+ Query = ["SELECT n.nspname as \"Schema\",
+ c.relname as \"Name\",
+ CASE c.relkind WHEN 'r' THEN 'table' WHEN 'v' THEN 'view' WHEN 'i' THEN 'index' WHEN 'S' THEN 'sequence' WHEN 's' THEN 'special' END as \"Type\",
+ r.rolname as \"Owner\"
+ FROM pg_catalog.pg_class c
+ JOIN pg_catalog.pg_roles r ON r.oid = c.relowner
+ LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
+ WHERE c.relkind IN ('r','')
+ AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
+ AND c.relname ~ '^(.*",escape_vhost(VHost),".*)$'
+ AND pg_catalog.pg_table_is_visible(c.oid)
+ ORDER BY 1,2;"],
+ case sql_query_internal(DBRef, Query) of
+ {data, Recs} ->
+ lists:foldl(fun({_Schema, Table, _Type, _Owner}, Dates) ->
+ case re:run(Table,"[0-9]+-[0-9]+-[0-9]+") of
+ {match, S, E} ->
+ lists:append(Dates, [lists:sublist(Table,S,E)]);
+ nomatch ->
+ Dates
+ end
+ end, [], Recs);
+ {error, _} ->
+ []
+ end.
+
+rebuild_all_stats_int(#state{vhost=VHost, schema=Schema}=State) ->
+ Fun = fun() ->
+ {ok, DBRef} = open_pgsql_connection(State),
+ ok = delete_nonexistent_stats(DBRef, Schema, VHost),
+ case lists:filter(fun(Date) ->
+ case catch rebuild_stats_at_int(DBRef, VHost, Schema, Date) of
+ ok -> false;
+ error -> true;
+ {'EXIT', _} -> true
+ end
+ end, get_dates_int(DBRef, VHost)) of
+ [] -> ok;
+ FTables ->
+ ?ERROR_MSG("Failed to rebuild stats for ~p dates", [FTables]),
+ error
+ end,
+ close_pgsql_connection(DBRef)
+ end,
+ spawn(Fun).
+
+rebuild_stats_at_int(DBRef, VHost, Schema, Date) ->
+ TempTable = temp_table(VHost, Schema),
+ Fun =
+ fun() ->
+ Table = messages_table(VHost, Schema, Date),
+ STable = stats_table(VHost, Schema),
+
+ DQuery = [ "DELETE FROM ",STable," ",
+ "WHERE at='",Date,"';"],
+
+ ok = create_temp_table(DBRef, VHost, Schema),
+ {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",Table," IN ACCESS EXCLUSIVE MODE;"]),
+ {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",TempTable," IN ACCESS EXCLUSIVE MODE;"]),
+ SQuery = ["INSERT INTO ",TempTable," ",
+ "(owner_id,peer_name_id,peer_server_id,at,count) ",
+ "SELECT owner_id,peer_name_id,peer_server_id,'",Date,"'",",count(*) ",
+ "FROM ",Table," GROUP BY owner_id,peer_name_id,peer_server_id;"],
+ case sql_query_internal(DBRef, SQuery) of
+ {updated, 0} ->
+ Count = sql_query_internal(DBRef, ["SELECT count(*) FROM ",Table,";"]),
+ case Count of
+ {data, [{"0"}]} ->
+ {updated, _} = sql_query_internal(DBRef, ["DROP VIEW ",view_table(VHost, Schema, Date),";"]),
+ {updated, _} = sql_query_internal(DBRef, ["DROP TABLE ",Table," CASCADE;"]),
+ {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",STable," IN ACCESS EXCLUSIVE MODE;"]),
+ {updated, _} = sql_query_internal(DBRef, DQuery),
+ ok;
+ _ ->
+ ?ERROR_MSG("Failed to calculate stats for ~s table! Count was ~p.", [Date, Count]),
+ error
+ end;
+ {updated, _} ->
+ {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",STable," IN ACCESS EXCLUSIVE MODE;"]),
+ {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",TempTable," IN ACCESS EXCLUSIVE MODE;"]),
+ {updated, _} = sql_query_internal(DBRef, DQuery),
+ SQuery1 = ["INSERT INTO ",STable," ",
+ "(owner_id,peer_name_id,peer_server_id,at,count) ",
+ "SELECT owner_id,peer_name_id,peer_server_id,at,count ",
+ "FROM ",TempTable,";"],
+ case sql_query_internal(DBRef, SQuery1) of
+ {updated, _} -> ok;
+ {error, _} -> error
+ end;
+ {error, _} -> error
+ end
+ end, % fun
+
+ case sql_transaction_internal(DBRef, Fun) of
+ {atomic, _} ->
+ ?INFO_MSG("Rebuilded stats for ~p at ~p", [VHost, Date]),
+ ok;
+ {aborted, Reason} ->
+ ?ERROR_MSG("Failed to rebuild stats for ~s table: ~p.", [Date, Reason]),
+ error
+ end,
+ sql_query_internal(DBRef, ["DROP TABLE ",TempTable,";"]),
+ ok.
+
+delete_nonexistent_stats(DBRef, Schema, VHost) ->
+ Dates = get_dates_int(DBRef, VHost),
+ STable = stats_table(VHost, Schema),
+
+ Temp = lists:flatmap(fun(Date) ->
+ ["'",Date,"'",","]
+ end, Dates),
+
+ case Temp of
+ [] ->
+ ok;
+ _ ->
+ % replace last "," with ");"
+ Temp1 = lists:append([lists:sublist(Temp, length(Temp)-1), ");"]),
+ Query = ["DELETE FROM ",STable," ",
+ "WHERE at NOT IN (", Temp1],
+ case sql_query_internal(DBRef, Query) of
+ {updated, _} ->
+ ok;
+ {error, _} ->
+ error
+ end
+ end.
+
+get_user_stats_int(DBRef, Schema, User, VHost) ->
+ SName = stats_table(VHost, Schema),
+ UName = users_table(VHost, Schema),
+ Query = ["SELECT stats.at, sum(stats.count) ",
+ "FROM ",UName," AS users ",
+ "JOIN ",SName," AS stats ON owner_id=user_id "
+ "WHERE users.username='",User,"' ",
+ "GROUP BY stats.at "
+ "ORDER BY DATE(at) DESC;"
+ ],
+ case sql_query_internal(DBRef, Query) of
+ {data, Recs} ->
+ {ok, [ {Date, list_to_integer(Count)} || {Date, Count} <- Recs ]};
+ {error, Result} ->
+ {error, Result}
+ end.
+
+delete_all_messages_by_user_at_int(DBRef, Schema, User, VHost, Date) ->
+ DQuery = ["DELETE FROM ",messages_table(VHost, Schema, Date)," ",
+ "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost, Schema)," WHERE username='",User,"');"],
+ case sql_query_internal(DBRef, DQuery) of
+ {updated, _} ->
+ ?INFO_MSG("Dropped messages for ~s@~s at ~s", [User, VHost, Date]),
+ ok;
+ {error, _} ->
+ error
+ end.
+
+delete_all_stats_by_user_int(DBRef, Schema, User, VHost) ->
+ SQuery = ["DELETE FROM ",stats_table(VHost, Schema)," ",
+ "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost, Schema)," WHERE username='",User,"');"],
+ case sql_query_internal(DBRef, SQuery) of
+ {updated, _} ->
+ ?INFO_MSG("Dropped all stats for ~s@~s", [User, VHost]),
+ ok;
+ {error, _} -> error
+ end.
+
+delete_stats_by_user_at_int(DBRef, Schema, User, VHost, Date) ->
+ SQuery = ["DELETE FROM ",stats_table(VHost, Schema)," ",
+ "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost, Schema)," WHERE username='",User,"') ",
+ "AND at='",Date,"';"],
+ case sql_query_internal(DBRef, SQuery) of
+ {updated, _} ->
+ ?INFO_MSG("Dropped stats for ~s@~s at ~s", [User, VHost, Date]),
+ ok;
+ {error, _} -> error
+ end.
+
+delete_user_settings_int(DBRef, Schema, User, VHost) ->
+ Query = ["DELETE FROM ",settings_table(VHost, Schema)," ",
+ "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost, Schema)," WHERE username='",User,"');"],
+ case sql_query_internal(DBRef, Query) of
+ {updated, _} ->
+ ?INFO_MSG("Dropped ~s@~s settings", [User, VHost]),
+ ok;
+ {error, Reason} ->
+ ?ERROR_MSG("Failed to drop ~s@~s settings: ~p", [User, VHost, Reason]),
+ error
+ end.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% tables internals
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+create_temp_table(DBRef, VHost, Schema) ->
+ TName = temp_table(VHost, Schema),
+ Query = ["CREATE TABLE IF NOT EXISTS ",TName," (",
+ "owner_id INTEGER, ",
+ "peer_name_id INTEGER, ",
+ "peer_server_id INTEGER, ",
+ "at VARCHAR(20), ",
+ "count INTEGER ",
+ ");"
+ ],
+ case sql_query_internal(DBRef, Query) of
+ {updated, _} -> ok;
+ {error, _Reason} -> error
+ end.
+
+create_stats_table(#state{dbref=DBRef, vhost=VHost, schema=Schema}=State) ->
+ SName = stats_table(VHost, Schema),
+
+ Fun =
+ fun() ->
+ Query = ["CREATE TABLE IF NOT EXISTS ",SName," (",
+ "owner_id INTEGER, ",
+ "peer_name_id INTEGER, ",
+ "peer_server_id INTEGER, ",
+ "at VARCHAR(20), ",
+ "count integer",
+ ");"
+ ],
+ case sql_query_internal_silent(DBRef, Query) of
+ {updated, _} ->
+ {updated, _} = sql_query_internal(DBRef, ["CREATE INDEX \"s_search_i_",Schema,"_",escape_vhost(VHost),"\" ON ",SName," (owner_id, peer_name_id, peer_server_id);"]),
+ {updated, _} = sql_query_internal(DBRef, ["CREATE INDEX \"s_at_i_",Schema,"_",escape_vhost(VHost),"\" ON ",SName," (at);"]),
+ created;
+ {error, Reason} ->
+ case lists:keysearch(code, 1, Reason) of
+ {value, {code, "42P07"}} ->
+ exists;
+ _ ->
+ ?ERROR_MSG("Failed to create stats table for ~p: ~p", [VHost, Reason]),
+ error
+ end
+ end
+ end,
+ case sql_transaction_internal(DBRef, Fun) of
+ {atomic, created} ->
+ ?MYDEBUG("Created stats table for ~p", [VHost]),
+ rebuild_all_stats_int(State),
+ ok;
+ {atomic, exists} ->
+ ?MYDEBUG("Stats table for ~p already exists", [VHost]),
+ {match, F, L} = re:run(SName, "\".*\""),
+ QTable = lists:sublist(SName, F+1, L-2),
+ OIDQuery = ["SELECT c.oid FROM pg_catalog.pg_class c LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace WHERE c.relname='",QTable,"' AND pg_catalog.pg_table_is_visible(c.oid);"],
+ {data,[{OID}]} = sql_query_internal(DBRef, OIDQuery),
+ CheckQuery = ["SELECT a.attname FROM pg_catalog.pg_attribute a WHERE a.attrelid = '",OID,"' AND a.attnum > 0 AND NOT a.attisdropped AND a.attname ~ '^peer_.*_id$';"],
+ case sql_query_internal(DBRef, CheckQuery) of
+ {data, Elems} when length(Elems) == 2 ->
+ ?MYDEBUG("Stats table structure is ok", []),
+ ok;
+ _ ->
+ ?INFO_MSG("It seems like stats table structure is invalid. I will drop it and recreate", []),
+ case sql_query_internal(DBRef, ["DROP TABLE ",SName,";"]) of
+ {updated, _} ->
+ ?INFO_MSG("Successfully dropped ~p", [SName]);
+ _ ->
+ ?ERROR_MSG("Failed to drop ~p. You should drop it and restart module", [SName])
+ end,
+ error
+ end;
+ {error, _} -> error
+ end.
+
+create_settings_table(#state{dbref=DBRef, vhost=VHost, schema=Schema}) ->
+ SName = settings_table(VHost, Schema),
+ Query = ["CREATE TABLE IF NOT EXISTS ",SName," (",
+ "owner_id INTEGER PRIMARY KEY, ",
+ "dolog_default BOOLEAN, ",
+ "dolog_list TEXT DEFAULT '', ",
+ "donotlog_list TEXT DEFAULT ''",
+ ");"
+ ],
+ case sql_query_internal_silent(DBRef, Query) of
+ {updated, _} ->
+ ?MYDEBUG("Created settings table for ~p", [VHost]),
+ ok;
+ {error, Reason} ->
+ case lists:keysearch(code, 1, Reason) of
+ {value, {code, "42P07"}} ->
+ ?MYDEBUG("Settings table for ~p already exists", [VHost]),
+ ok;
+ _ ->
+ ?ERROR_MSG("Failed to create settings table for ~p: ~p", [VHost, Reason]),
+ error
+ end
+ end.
+
+create_users_table(#state{dbref=DBRef, vhost=VHost, schema=Schema}) ->
+ SName = users_table(VHost, Schema),
+
+ Fun =
+ fun() ->
+ Query = ["CREATE TABLE IF NOT EXISTS ",SName," (",
+ "username TEXT UNIQUE, ",
+ "user_id SERIAL PRIMARY KEY",
+ ");"
+ ],
+ case sql_query_internal_silent(DBRef, Query) of
+ {updated, _} ->
+ {updated, _} = sql_query_internal(DBRef, ["CREATE INDEX \"username_i_",Schema,"_",escape_vhost(VHost),"\" ON ",SName," (username);"]),
+ created;
+ {error, Reason} ->
+ case lists:keysearch(code, 1, Reason) of
+ {value, {code, "42P07"}} ->
+ exists;
+ _ ->
+ ?ERROR_MSG("Failed to create users table for ~p: ~p", [VHost, Reason]),
+ error
+ end
+ end
+ end,
+ case sql_transaction_internal(DBRef, Fun) of
+ {atomic, created} ->
+ ?MYDEBUG("Created users table for ~p", [VHost]),
+ ok;
+ {atomic, exists} ->
+ ?MYDEBUG("Users table for ~p already exists", [VHost]),
+ ok;
+ {aborted, _} -> error
+ end.
+
+create_servers_table(#state{dbref=DBRef, vhost=VHost, schema=Schema}) ->
+ SName = servers_table(VHost, Schema),
+ Fun =
+ fun() ->
+ Query = ["CREATE TABLE IF NOT EXISTS ",SName," (",
+ "server TEXT UNIQUE, ",
+ "server_id SERIAL PRIMARY KEY",
+ ");"
+ ],
+ case sql_query_internal_silent(DBRef, Query) of
+ {updated, _} ->
+ {updated, _} = sql_query_internal(DBRef, ["CREATE INDEX \"server_i_",Schema,"_",escape_vhost(VHost),"\" ON ",SName," (server);"]),
+ created;
+ {error, Reason} ->
+ case lists:keysearch(code, 1, Reason) of
+ {value, {code, "42P07"}} ->
+ exists;
+ _ ->
+ ?ERROR_MSG("Failed to create servers table for ~p: ~p", [VHost, Reason]),
+ error
+ end
+ end
+ end,
+ case sql_transaction_internal(DBRef, Fun) of
+ {atomic, created} ->
+ ?MYDEBUG("Created servers table for ~p", [VHost]),
+ ok;
+ {atomic, exists} ->
+ ?MYDEBUG("Servers table for ~p already exists", [VHost]),
+ ok;
+ {aborted, _} -> error
+ end.
+
+create_resources_table(#state{dbref=DBRef, vhost=VHost, schema=Schema}) ->
+ RName = resources_table(VHost, Schema),
+ Fun = fun() ->
+ Query = ["CREATE TABLE IF NOT EXISTS ",RName," (",
+ "resource TEXT UNIQUE, ",
+ "resource_id SERIAL PRIMARY KEY",
+ ");"
+ ],
+ case sql_query_internal_silent(DBRef, Query) of
+ {updated, _} ->
+ {updated, _} = sql_query_internal(DBRef, ["CREATE INDEX \"resource_i_",Schema,"_",escape_vhost(VHost),"\" ON ",RName," (resource);"]),
+ created;
+ {error, Reason} ->
+ case lists:keysearch(code, 1, Reason) of
+ {value, {code, "42P07"}} ->
+ exists;
+ _ ->
+ ?ERROR_MSG("Failed to create users table for ~p: ~p", [VHost, Reason]),
+ error
+ end
+ end
+ end,
+ case sql_transaction_internal(DBRef, Fun) of
+ {atomic, created} ->
+ ?MYDEBUG("Created resources table for ~p", [VHost]),
+ ok;
+ {atomic, exists} ->
+ ?MYDEBUG("Resources table for ~p already exists", [VHost]),
+ ok;
+ {aborted, _} -> error
+ end.
+
+create_internals(#state{dbref=DBRef, vhost=VHost, schema=Schema}=State) ->
+ sql_query_internal(DBRef, ["DROP FUNCTION IF EXISTS ",logmessage_name(VHost,Schema)," (tbname TEXT, atdt TEXT, owner TEXT, peer_name TEXT, peer_server TEXT, peer_resource TEXT, mdirection VARCHAR(4), mtype VARCHAR(9), msubj TEXT, mbody TEXT, mtimestamp DOUBLE PRECISION);"]),
+ case sql_query_internal(DBRef, [get_logmessage(VHost, Schema)]) of
+ {updated, _} ->
+ ?MYDEBUG("Created logmessage for ~p", [VHost]),
+ ok;
+ {error, Reason} ->
+ case lists:keysearch(code, 1, Reason) of
+ {value, {code, "42704"}} ->
+ ?ERROR_MSG("plpgsql language must be installed into database '~s'. Use CREATE LANGUAGE...", [State#state.db]),
+ error;
+ _ ->
+ error
+ end
+ end.
+
+get_user_id(DBRef, VHost, Schema, User) ->
+ SQuery = ["SELECT user_id FROM ",users_table(VHost, Schema)," ",
+ "WHERE username='",User,"';"],
+ case sql_query_internal(DBRef, SQuery) of
+ {data, []} ->
+ IQuery = ["INSERT INTO ",users_table(VHost, Schema)," ",
+ "VALUES ('",User,"');"],
+ case sql_query_internal_silent(DBRef, IQuery) of
+ {updated, _} ->
+ {data, [{DBIdNew}]} = sql_query_internal(DBRef, SQuery),
+ DBIdNew;
+ {error, Reason} ->
+ % this can be in clustered environment
+ {value, {code, "23505"}} = lists:keysearch(code, 1, Reason),
+ ?ERROR_MSG("Duplicate key name for ~p", [User]),
+ {data, [{ClID}]} = sql_query_internal(DBRef, SQuery),
+ ClID
+ end;
+ {data, [{DBId}]} ->
+ DBId
+ end.
+
+get_logmessage(VHost,Schema) ->
+ UName = users_table(VHost,Schema),
+ SName = servers_table(VHost,Schema),
+ RName = resources_table(VHost,Schema),
+ StName = stats_table(VHost,Schema),
+ io_lib:format("CREATE OR REPLACE FUNCTION ~s (tbname TEXT, vname TEXT, atdt TEXT, owner TEXT, peer_name TEXT, peer_server TEXT, peer_resource TEXT, mdirection VARCHAR(4), mtype VARCHAR(9), msubj TEXT, mbody TEXT, mtimestamp DOUBLE PRECISION) RETURNS INTEGER AS $$
+DECLARE
+ ownerID INTEGER;
+ peer_nameID INTEGER;
+ peer_serverID INTEGER;
+ peer_resourceID INTEGER;
+ tablename ALIAS for $1;
+ viewname ALIAS for $2;
+ atdate ALIAS for $3;
+BEGIN
+ SELECT INTO ownerID user_id FROM ~s WHERE username = owner;
+ IF NOT FOUND THEN
+ INSERT INTO ~s (username) VALUES (owner);
+ ownerID := lastval();
+ END IF;
+
+ SELECT INTO peer_nameID user_id FROM ~s WHERE username = peer_name;
+ IF NOT FOUND THEN
+ INSERT INTO ~s (username) VALUES (peer_name);
+ peer_nameID := lastval();
+ END IF;
+
+ SELECT INTO peer_serverID server_id FROM ~s WHERE server = peer_server;
+ IF NOT FOUND THEN
+ INSERT INTO ~s (server) VALUES (peer_server);
+ peer_serverID := lastval();
+ END IF;
+
+ SELECT INTO peer_resourceID resource_id FROM ~s WHERE resource = peer_resource;
+ IF NOT FOUND THEN
+ INSERT INTO ~s (resource) VALUES (peer_resource);
+ peer_resourceID := lastval();
+ END IF;
+
+ BEGIN
+ EXECUTE 'INSERT INTO ' || tablename || ' (owner_id, peer_name_id, peer_server_id, peer_resource_id, direction, type, subject, body, timestamp) VALUES (' || ownerID || ',' || peer_nameID || ',' || peer_serverID || ',' || peer_resourceID || ',''' || mdirection || ''',''' || mtype || ''',' || quote_literal(msubj) || ',' || quote_literal(mbody) || ',' || mtimestamp || ')';
+ EXCEPTION WHEN undefined_table THEN
+ EXECUTE 'CREATE TABLE IF NOT EXISTS ' || tablename || ' (' ||
+ 'owner_id INTEGER, ' ||
+ 'peer_name_id INTEGER, ' ||
+ 'peer_server_id INTEGER, ' ||
+ 'peer_resource_id INTEGER, ' ||
+ 'direction VARCHAR(4) CHECK (direction IN (''to'',''from'')), ' ||
+ 'type VARCHAR(9) CHECK (type IN (''chat'',''error'',''groupchat'',''headline'',''normal'')), ' ||
+ 'subject TEXT, ' ||
+ 'body TEXT, ' ||
+ 'timestamp DOUBLE PRECISION)';
+ EXECUTE 'CREATE INDEX \"search_i_' || '~s' || '_' || atdate || '_' || '~s' || '\"' || ' ON ' || tablename || ' (owner_id, peer_name_id, peer_server_id, peer_resource_id)';
+
+ EXECUTE 'CREATE OR REPLACE VIEW ' || viewname || ' AS ' ||
+ 'SELECT owner.username AS owner_name, ' ||
+ 'peer.username AS peer_name, ' ||
+ 'servers.server AS peer_server, ' ||
+ 'resources.resource AS peer_resource, ' ||
+ 'messages.direction, ' ||
+ 'messages.type, ' ||
+ 'messages.subject, ' ||
+ 'messages.body, ' ||
+ 'messages.timestamp ' ||
+ 'FROM ' ||
+ '~s owner, ' ||
+ '~s peer, ' ||
+ '~s servers, ' ||
+ '~s resources, ' ||
+ tablename || ' messages ' ||
+ 'WHERE ' ||
+ 'owner.user_id=messages.owner_id and ' ||
+ 'peer.user_id=messages.peer_name_id and ' ||
+ 'servers.server_id=messages.peer_server_id and ' ||
+ 'resources.resource_id=messages.peer_resource_id ' ||
+ 'ORDER BY messages.timestamp';
+
+ EXECUTE 'INSERT INTO ' || tablename || ' (owner_id, peer_name_id, peer_server_id, peer_resource_id, direction, type, subject, body, timestamp) VALUES (' || ownerID || ',' || peer_nameID || ',' || peer_serverID || ',' || peer_resourceID || ',''' || mdirection || ''',''' || mtype || ''',' || quote_literal(msubj) || ',' || quote_literal(mbody) || ',' || mtimestamp || ')';
+ END;
+
+ UPDATE ~s SET count=count+1 where at=atdate and owner_id=ownerID and peer_name_id=peer_nameID and peer_server_id=peer_serverID;
+ IF NOT FOUND THEN
+ INSERT INTO ~s (owner_id, peer_name_id, peer_server_id, at, count) VALUES (ownerID, peer_nameID, peer_serverID, atdate, 1);
+ END IF;
+ RETURN 0;
+END;
+$$ LANGUAGE plpgsql;
+", [logmessage_name(VHost,Schema),UName,UName,UName,UName,SName,SName,RName,RName,Schema,escape_vhost(VHost),UName,UName,SName,RName,StName,StName]).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% SQL internals
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+% like do_transaction/2 in mysql_conn.erl (changeset by Yariv Sadan <[email protected]>)
+sql_transaction_internal(DBRef, Fun) ->
+ case sql_query_internal(DBRef, ["BEGIN;"]) of
+ {updated, _} ->
+ case catch Fun() of
+ error = Err ->
+ rollback_internal(DBRef, Err);
+ {error, _} = Err ->
+ rollback_internal(DBRef, Err);
+ {'EXIT', _} = Err ->
+ rollback_internal(DBRef, Err);
+ Res ->
+ case sql_query_internal(DBRef, ["COMMIT;"]) of
+ {error, _} -> rollback_internal(DBRef, {commit_error});
+ {updated, _} ->
+ case Res of
+ {atomic, _} -> Res;
+ _ -> {atomic, Res}
+ end
+ end
+ end;
+ {error, _} ->
+ {aborted, {begin_error}}
+ end.
+
+% like rollback/2 in mysql_conn.erl (changeset by Yariv Sadan <[email protected]>)
+rollback_internal(DBRef, Reason) ->
+ Res = sql_query_internal(DBRef, ["ROLLBACK;"]),
+ {aborted, {Reason, {rollback_result, Res}}}.
+
+sql_query_internal(DBRef, Query) ->
+ case sql_query_internal_silent(DBRef, Query) of
+ {error, undefined, Rez} ->
+ ?ERROR_MSG("Got undefined result: ~p while ~p", [Rez, lists:append(Query)]),
+ {error, undefined};
+ {error, Error} ->
+ ?ERROR_MSG("Failed: ~p while ~p", [Error, lists:append(Query)]),
+ {error, Error};
+ Rez -> Rez
+ end.
+
+sql_query_internal_silent(DBRef, Query) ->
+ ?MYDEBUG("DOING: \"~s\"", [lists:append(Query)]),
+ % TODO: use pquery?
+ get_result(pgsql:squery(DBRef, Query)).
+
+get_result({ok, ["CREATE TABLE"]}) ->
+ {updated, 1};
+get_result({ok, ["DROP TABLE"]}) ->
+ {updated, 1};
+get_result({ok, ["ALTER TABLE"]}) ->
+ {updated, 1};
+get_result({ok,["DROP VIEW"]}) ->
+ {updated, 1};
+get_result({ok,["DROP FUNCTION"]}) ->
+ {updated, 1};
+get_result({ok, ["CREATE INDEX"]}) ->
+ {updated, 1};
+get_result({ok, ["CREATE FUNCTION"]}) ->
+ {updated, 1};
+get_result({ok, [{"SELECT", _Rows, Recs}]}) ->
+ Fun = fun(Rec) ->
+ list_to_tuple(
+ lists:map(fun(Elem) when is_binary(Elem) ->
+ binary_to_list(Elem);
+ (Elem) when is_list(Elem) ->
+ Elem;
+ (Elem) when is_integer(Elem) ->
+ integer_to_list(Elem);
+ (Elem) when is_float(Elem) ->
+ float_to_list(Elem);
+ (Elem) when is_boolean(Elem) ->
+ atom_to_list(Elem);
+ (Elem) ->
+ ?ERROR_MSG("Unknown element type ~p", [Elem]),
+ Elem
+ end, Rec))
+ end,
+ Res = lists:map(Fun, Recs),
+ %{data, [list_to_tuple(Rec) || Rec <- Recs]};
+ {data, Res};
+get_result({ok, ["INSERT " ++ OIDN]}) ->
+ [_OID, N] = string:tokens(OIDN, " "),
+ {updated, list_to_integer(N)};
+get_result({ok, ["DELETE " ++ N]}) ->
+ {updated, list_to_integer(N)};
+get_result({ok, ["UPDATE " ++ N]}) ->
+ {updated, list_to_integer(N)};
+get_result({ok, ["BEGIN"]}) ->
+ {updated, 1};
+get_result({ok, ["LOCK TABLE"]}) ->
+ {updated, 1};
+get_result({ok, ["ROLLBACK"]}) ->
+ {updated, 1};
+get_result({ok, ["COMMIT"]}) ->
+ {updated, 1};
+get_result({ok, ["SET"]}) ->
+ {updated, 1};
+get_result({ok, [{error, Error}]}) ->
+ {error, Error};
+get_result(Rez) ->
+ {error, undefined, Rez}.
+
--- mod_logdb_mnesia_old.erl.orig 2010-05-12 17:21:38.000000000 +0300
+++ mod_logdb_mnesia_old.erl 2009-02-05 20:12:58.000000000 +0200
@@ -0,0 +1,258 @@
+%%%----------------------------------------------------------------------
+%%% File : mod_logdb_mnesia_old.erl
+%%% Author : Oleg Palij (mailto,xmpp:[email protected])
+%%% Purpose : mod_logmnesia backend for mod_logdb (should be used only for copy_tables functionality)
+%%% Version : trunk
+%%% Id : $Id: mod_logdb_mnesia_old.erl 1273 2009-02-05 18:12:57Z malik $
+%%% Url : http://www.dp.uz.gov.ua/o.palij/mod_logdb/
+%%%----------------------------------------------------------------------
+
+-module(mod_logdb_mnesia_old).
+-author('[email protected]').
+
+-include("ejabberd.hrl").
+-include("jlib.hrl").
+
+-behaviour(gen_logdb).
+
+-export([start/2, stop/1,
+ log_message/2,
+ rebuild_stats/1,
+ rebuild_stats_at/2,
+ rebuild_stats_at1/2,
+ delete_messages_by_user_at/3, delete_all_messages_by_user_at/3, delete_messages_at/2,
+ get_vhost_stats/1, get_vhost_stats_at/2, get_user_stats/2, get_user_messages_at/3,
+ get_dates/1,
+ get_users_settings/1, get_user_settings/2, set_user_settings/3,
+ drop_user/2]).
+
+-record(stats, {user, server, table, count}).
+-record(msg, {to_user, to_server, to_resource, from_user, from_server, from_resource, id, type, subject, body, timestamp}).
+
+tables_prefix() -> "messages_".
+% stats_table should not start with tables_prefix(VHost) !
+% i.e. lists:prefix(tables_prefix(VHost), atom_to_list(stats_table())) must be /= true
+stats_table() -> list_to_atom("messages-stats").
+% table name as atom from Date
+-define(ATABLE(Date), list_to_atom(tables_prefix() ++ Date)).
+-define(LTABLE(Date), tables_prefix() ++ Date).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% gen_logdb callbacks
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+start(_Opts, _VHost) ->
+ case mnesia:system_info(is_running) of
+ yes ->
+ ok = create_stats_table(),
+ {ok, ok};
+ no ->
+ ?ERROR_MSG("Mnesia not running", []),
+ error;
+ Status ->
+ ?ERROR_MSG("Mnesia status: ~p", [Status]),
+ error
+ end.
+
+stop(_VHost) ->
+ ok.
+
+log_message(_VHost, _Msg) ->
+ error.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% gen_logdb callbacks (maintaince)
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+rebuild_stats(_VHost) ->
+ ok.
+
+rebuild_stats_at(VHost, Date) ->
+ Table = ?LTABLE(Date),
+ {Time, Value}=timer:tc(?MODULE, rebuild_stats_at1, [VHost, Table]),
+ ?INFO_MSG("rebuild_stats_at ~p elapsed ~p sec: ~p~n", [Date, Time/1000000, Value]),
+ Value.
+rebuild_stats_at1(VHost, Table) ->
+ CFun = fun(Msg, Stats) ->
+ To = Msg#msg.to_user ++ "@" ++ Msg#msg.to_server,
+ Stats_to = if
+ Msg#msg.to_server == VHost ->
+ case lists:keysearch(To, 1, Stats) of
+ {value, {Who_to, Count_to}} ->
+ lists:keyreplace(To, 1, Stats, {Who_to, Count_to + 1});
+ false ->
+ lists:append(Stats, [{To, 1}])
+ end;
+ true ->
+ Stats
+ end,
+ From = Msg#msg.from_user ++ "@" ++ Msg#msg.from_server,
+ Stats_from = if
+ Msg#msg.from_server == VHost ->
+ case lists:keysearch(From, 1, Stats_to) of
+ {value, {Who_from, Count_from}} ->
+ lists:keyreplace(From, 1, Stats_to, {Who_from, Count_from + 1});
+ false ->
+ lists:append(Stats_to, [{From, 1}])
+ end;
+ true ->
+ Stats_to
+ end,
+ Stats_from
+ end,
+ DFun = fun(#stats{table=STable, server=Server} = Stat, _Acc)
+ when STable == Table, Server == VHost ->
+ mnesia:delete_object(stats_table(), Stat, write);
+ (_Stat, _Acc) -> ok
+ end,
+ case mnesia:transaction(fun() ->
+ mnesia:write_lock_table(list_to_atom(Table)),
+ mnesia:write_lock_table(stats_table()),
+ % Calc stats for VHost at Date
+ AStats = mnesia:foldl(CFun, [], list_to_atom(Table)),
+ % Delete all stats for VHost at Date
+ mnesia:foldl(DFun, [], stats_table()),
+ % Write new calc'ed stats
+ lists:foreach(fun({Who, Count}) ->
+ Jid = jlib:string_to_jid(Who),
+ JUser = Jid#jid.user,
+ WStat = #stats{user=JUser, server=VHost, table=Table, count=Count},
+ mnesia:write(stats_table(), WStat, write)
+ end, AStats)
+ end) of
+ {aborted, Reason} ->
+ ?ERROR_MSG("Failed to rebuild_stats_at for ~p at ~p: ~p", [VHost, Table, Reason]),
+ error;
+ {atomic, _} ->
+ ok
+ end.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% gen_logdb callbacks (delete)
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+delete_messages_by_user_at(_VHost, _Msgs, _Date) ->
+ error.
+
+delete_all_messages_by_user_at(_User, _VHost, _Date) ->
+ error.
+
+delete_messages_at(VHost, Date) ->
+ Table = list_to_atom(tables_prefix() ++ Date),
+
+ DFun = fun(#msg{to_server=To_server, from_server=From_server}=Msg, _Acc)
+ when To_server == VHost; From_server == VHost ->
+ mnesia:delete_object(Table, Msg, write);
+ (_Msg, _Acc) -> ok
+ end,
+
+ case mnesia:transaction(fun() ->
+ mnesia:foldl(DFun, [], Table)
+ end) of
+ {aborted, Reason} ->
+ ?ERROR_MSG("Failed to delete_messages_at for ~p at ~p: ~p", [VHost, Date, Reason]),
+ error;
+ {atomic, _} ->
+ ok
+ end.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% gen_logdb callbacks (get)
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+get_vhost_stats(_VHost) ->
+ {error, "does not emplemented"}.
+
+get_vhost_stats_at(VHost, Date) ->
+ Fun = fun() ->
+ Pat = #stats{user='$1', server=VHost, table=tables_prefix()++Date, count = '$2'},
+ mnesia:select(stats_table(), [{Pat, [], [['$1', '$2']]}])
+ end,
+ case mnesia:transaction(Fun) of
+ {atomic, Result} ->
+ RFun = fun([User, Count]) ->
+ {User, Count}
+ end,
+ {ok, lists:reverse(lists:keysort(2, lists:map(RFun, Result)))};
+ {aborted, Reason} -> {error, Reason}
+ end.
+
+get_user_stats(_User, _VHost) ->
+ {error, "does not emplemented"}.
+
+get_user_messages_at(User, VHost, Date) ->
+ Table_name = tables_prefix() ++ Date,
+ case mnesia:transaction(fun() ->
+ Pat_to = #msg{to_user=User, to_server=VHost, _='_'},
+ Pat_from = #msg{from_user=User, from_server=VHost, _='_'},
+ mnesia:select(list_to_atom(Table_name),
+ [{Pat_to, [], ['$_']},
+ {Pat_from, [], ['$_']}])
+ end) of
+ {atomic, Result} ->
+ Msgs = lists:map(fun(#msg{to_user=To_user, to_server=To_server, to_resource=To_res,
+ from_user=From_user, from_server=From_server, from_resource=From_res,
+ type=Type,
+ subject=Subj,
+ body=Body, timestamp=Timestamp} = _Msg) ->
+ Subject = case Subj of
+ "None" -> "";
+ _ -> Subj
+ end,
+ {msg, To_user, To_server, To_res, From_user, From_server, From_res, Type, Subject, Body, Timestamp}
+ end, Result),
+ {ok, Msgs};
+ {aborted, Reason} ->
+ {error, Reason}
+ end.
+
+get_dates(_VHost) ->
+ Tables = mnesia:system_info(tables),
+ MessagesTables =
+ lists:filter(fun(Table) ->
+ lists:prefix(tables_prefix(), atom_to_list(Table))
+ end,
+ Tables),
+ lists:map(fun(Table) ->
+ lists:sublist(atom_to_list(Table),
+ length(tables_prefix())+1,
+ length(atom_to_list(Table)))
+ end,
+ MessagesTables).
+
+get_users_settings(_VHost) ->
+ {ok, []}.
+get_user_settings(_User, _VHost) ->
+ {ok, []}.
+set_user_settings(_User, _VHost, _Set) ->
+ ok.
+drop_user(_User, _VHost) ->
+ ok.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% internal
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+% called from db_logon/2
+create_stats_table() ->
+ SName = stats_table(),
+ case mnesia:create_table(SName,
+ [{disc_only_copies, [node()]},
+ {type, bag},
+ {attributes, record_info(fields, stats)},
+ {record_name, stats}
+ ]) of
+ {atomic, ok} ->
+ ?INFO_MSG("Created stats table", []),
+ ok;
+ {aborted, {already_exists, _}} ->
+ ok;
+ {aborted, Reason} ->
+ ?ERROR_MSG("Failed to create stats table: ~p", [Reason]),
+ error
+ end.
--- gen_logdb.erl.orig 2010-05-12 17:21:38.000000000 +0300
+++ gen_logdb.erl 2009-07-22 16:43:26.000000000 +0300
@@ -0,0 +1,164 @@
+%%%----------------------------------------------------------------------
+%%% File : gen_logdb.erl
+%%% Author : Oleg Palij (mailto,xmpp:[email protected])
+%%% Purpose : Describes generic behaviour for mod_logdb backends.
+%%% Version : trunk
+%%% Id : $Id: gen_logdb.erl 1273 2009-02-05 18:12:57Z malik $
+%%% Url : http://www.dp.uz.gov.ua/o.palij/mod_logdb/
+%%%----------------------------------------------------------------------
+
+-module(gen_logdb).
+-author('[email protected]').
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [
+ % called from handle_info(start, _)
+ % it should logon database and return reference to started instance
+ % start(VHost, Opts) -> {ok, SPid} | error
+ % Options - list of options to connect to db
+ % Types: Options = list() -> [] |
+ % [{user, "logdb"},
+ % {pass, "1234"},
+ % {db, "logdb"}] | ...
+ % VHost = list() -> "jabber.example.org"
+ {start, 2},
+
+ % called from cleanup/1
+ % it should logoff database and do cleanup
+ % stop(VHost)
+ % Types: VHost = list() -> "jabber.example.org"
+ {stop, 1},
+
+ % called from handle_call({addlog, _}, _, _)
+ % it should log messages to database
+ % log_message(VHost, Msg) -> ok | error
+ % Types:
+ % VHost = list() -> "jabber.example.org"
+ % Msg = record() -> #msg
+ {log_message, 2},
+
+ % called from ejabberdctl rebuild_stats
+ % it should rebuild stats table (if used) for vhost
+ % rebuild_stats(VHost)
+ % Types:
+ % VHost = list() -> "jabber.example.org"
+ {rebuild_stats, 1},
+
+ % it should rebuild stats table (if used) for vhost at Date
+ % rebuild_stats_at(VHost, Date)
+ % Types:
+ % VHost = list() -> "jabber.example.org"
+ % Date = list() -> "2007-02-12"
+ {rebuild_stats_at, 2},
+
+ % called from user_messages_at_parse_query/5
+ % it should delete selected user messages at date
+ % delete_messages_by_user_at(VHost, Msgs, Date) -> ok | error
+ % Types:
+ % VHost = list() -> "jabber.example.org"
+ % Msgs = list() -> [ #msg1, msg2, ... ]
+ % Date = list() -> "2007-02-12"
+ {delete_messages_by_user_at, 3},
+
+ % called from user_messages_parse_query/4 | vhost_messages_at_parse_query/4
+ % it should delete all user messages at date
+ % delete_all_messages_by_user_at(User, VHost, Date) -> ok | error
+ % Types:
+ % User = list() -> "admin"
+ % VHost = list() -> "jabber.example.org"
+ % Date = list() -> "2007-02-12"
+ {delete_all_messages_by_user_at, 3},
+
+ % called from vhost_messages_parse_query/3
+ % it should delete messages for vhost at date and update stats
+ % delete_messages_at(VHost, Date) -> ok | error
+ % Types:
+ % VHost = list() -> "jabber.example.org"
+ % Date = list() -> "2007-02-12"
+ {delete_messages_at, 2},
+
+ % called from ejabberd_web_admin:vhost_messages_stats/3
+ % it should return sorted list of count of messages by dates for vhost
+ % get_vhost_stats(VHost) -> {ok, [{Date1, Msgs_count1}, {Date2, Msgs_count2}, ... ]} |
+ % {error, Reason}
+ % Types:
+ % VHost = list() -> "jabber.example.org"
+ % DateN = list() -> "2007-02-12"
+ % Msgs_countN = number() -> 241
+ {get_vhost_stats, 1},
+
+ % called from ejabberd_web_admin:vhost_messages_stats_at/4
+ % it should return sorted list of count of messages by users at date for vhost
+ % get_vhost_stats_at(VHost, Date) -> {ok, [{User1, Msgs_count1}, {User2, Msgs_count2}, ....]} |
+ % {error, Reason}
+ % Types:
+ % VHost = list() -> "jabber.example.org"
+ % Date = list() -> "2007-02-12"
+ % UserN = list() -> "admin"
+ % Msgs_countN = number() -> 241
+ {get_vhost_stats_at, 2},
+
+ % called from ejabberd_web_admin:user_messages_stats/4
+ % it should return sorted list of count of messages by date for user at vhost
+ % get_user_stats(User, VHost) -> {ok, [{Date1, Msgs_count1}, {Date2, Msgs_count2}, ...]} |
+ % {error, Reason}
+ % Types:
+ % User = list() -> "admin"
+ % VHost = list() -> "jabber.example.org"
+ % DateN = list() -> "2007-02-12"
+ % Msgs_countN = number() -> 241
+ {get_user_stats, 2},
+
+ % called from ejabberd_web_admin:user_messages_stats_at/5
+ % it should return all user messages at date
+ % get_user_messages_at(User, VHost, Date) -> {ok, Msgs} | {error, Reason}
+ % Types:
+ % User = list() -> "admin"
+ % VHost = list() -> "jabber.example.org"
+ % Date = list() -> "2007-02-12"
+ % Msgs = list() -> [ #msg1, msg2, ... ]
+ {get_user_messages_at, 3},
+
+ % called from many places
+ % it should return list of dates for vhost
+ % get_dates(VHost) -> [Date1, Date2, ... ]
+ % Types:
+ % VHost = list() -> "jabber.example.org"
+ % DateN = list() -> "2007-02-12"
+ {get_dates, 1},
+
+ % called from start
+ % it should return list with users settings for VHost in db
+ % get_users_settings(VHost) -> [#user_settings1, #user_settings2, ... ] | error
+ % Types:
+ % VHost = list() -> "jabber.example.org"
+ {get_users_settings, 1},
+
+ % called from many places
+ % it should return User settings at VHost from db
+ % get_user_settings(User, VHost) -> error | {ok, #user_settings}
+ % Types:
+ % User = list() -> "admin"
+ % VHost = list() -> "jabber.example.org"
+ {get_user_settings, 2},
+
+ % called from web admin
+ % it should set User settings at VHost
+ % set_user_settings(User, VHost, #user_settings) -> ok | error
+ % Types:
+ % User = list() -> "admin"
+ % VHost = list() -> "jabber.example.org"
+ {set_user_settings, 3},
+
+ % called from remove_user (ejabberd hook)
+ % it should remove user messages and settings at VHost
+ % drop_user(User, VHost) -> ok | error
+ % Types:
+ % User = list() -> "admin"
+ % VHost = list() -> "jabber.example.org"
+ {drop_user, 2}
+ ];
+behaviour_info(_) ->
+ undefined.
--- mod_muc/mod_muc_room-2.1.3.erl 2010-05-12 16:27:38.000000000 +0300
+++ mod_muc/mod_muc_room.erl 2010-05-12 16:32:11.000000000 +0300
@@ -726,6 +726,12 @@
{reply, {ok, NSD#state.config}, StateName, NSD};
handle_sync_event({change_state, NewStateData}, _From, StateName, _StateData) ->
{reply, {ok, NewStateData}, StateName, NewStateData};
+handle_sync_event({get_jid_nick, Jid}, _From, StateName, StateData) ->
+ R = case ?DICT:find(jlib:jid_tolower(Jid), StateData#state.users) of
+ error -> [];
+ {ok, {user, _, Nick, _, _}} -> Nick
+ end,
+ {reply, R, StateName, StateData};
handle_sync_event(_Event, _From, StateName, StateData) ->
Reply = ok,
{reply, Reply, StateName, StateData}.
--- ./mod_roster-2.1.3.erl 2010-05-12 16:35:05.000000000 +0300
+++ mod_roster.erl 2010-05-12 17:21:03.000000000 +0300
@@ -61,7 +61,7 @@
-include("mod_roster.hrl").
-include("web/ejabberd_http.hrl").
-include("web/ejabberd_web_admin.hrl").
-
+-include("mod_logdb.hrl").
start(Host, Opts) ->
IQDisc = gen_mod:get_opt(iqdisc, Opts, one_queue),
@@ -1334,6 +1334,14 @@
Res = user_roster_parse_query(User, Server, Items1, Query),
Items = get_roster(LUser, LServer),
SItems = lists:sort(Items),
+
+ Settings = case gen_mod:is_loaded(Server, mod_logdb) of
+ true ->
+ mod_logdb:get_user_settings(User, Server);
+ false ->
+ []
+ end,
+
FItems =
case SItems of
[] ->
@@ -1381,7 +1381,33 @@
[?INPUTT("submit",
"remove" ++
ejabberd_web_admin:term_to_id(R#roster.jid),
- "Remove")])])
+ "Remove")]),
+ case gen_mod:is_loaded(Server, mod_logdb) of
+ true ->
+ Peer = jlib:jid_to_string(R#roster.jid),
+ A = lists:member(Peer, Settings#user_settings.dolog_list),
+ B = lists:member(Peer, Settings#user_settings.donotlog_list),
+ {Name, Value} =
+ if
+ A ->
+ {"donotlog", "Do Not Log Messages"};
+ B ->
+ {"dolog", "Log Messages"};
+ Settings#user_settings.dolog_default == true ->
+ {"donotlog", "Do Not Log Messages"};
+ Settings#user_settings.dolog_default == false ->
+ {"dolog", "Log Messages"}
+ end,
+
+ ?XAE("td", [{"class", "valign"}],
+ [?INPUTT("submit",
+ Name ++
+ ejabberd_web_admin:term_to_id(R#roster.jid),
+ Value)]);
+ false ->
+ ?X([])
+ end
+ ])
end, SItems))])]
end,
[?XC("h1", ?T("Roster of ") ++ us_to_list(US))] ++
@@ -1481,11 +1481,42 @@
{"subscription", "remove"}],
[]}]}}),
throw(submitted);
- false ->
- ok
- end
-
- end
+ false ->
+ case lists:keysearch(
+ "donotlog" ++ ejabberd_web_admin:term_to_id(JID), 1, Query) of
+ {value, _} ->
+ Peer = jlib:jid_to_string(JID),
+ Settings = mod_logdb:get_user_settings(User, Server),
+ DNLL = case lists:member(Peer, Settings#user_settings.donotlog_list) of
+ false -> lists:append(Settings#user_settings.donotlog_list, [Peer]);
+ true -> Settings#user_settings.donotlog_list
+ end,
+ DLL = lists:delete(jlib:jid_to_string(JID), Settings#user_settings.dolog_list),
+ Sett = Settings#user_settings{donotlog_list=DNLL, dolog_list=DLL},
+ % TODO: check returned value
+ ok = mod_logdb:set_user_settings(User, Server, Sett),
+ throw(nothing);
+ false ->
+ case lists:keysearch(
+ "dolog" ++ ejabberd_web_admin:term_to_id(JID), 1, Query) of
+ {value, _} ->
+ Peer = jlib:jid_to_string(JID),
+ Settings = mod_logdb:get_user_settings(User, Server),
+ DLL = case lists:member(Peer, Settings#user_settings.dolog_list) of
+ false -> lists:append(Settings#user_settings.dolog_list, [Peer]);
+ true -> Settings#user_settings.dolog_list
+ end,
+ DNLL = lists:delete(jlib:jid_to_string(JID), Settings#user_settings.donotlog_list),
+ Sett = Settings#user_settings{donotlog_list=DNLL, dolog_list=DLL},
+ % TODO: check returned value
+ ok = mod_logdb:set_user_settings(User, Server, Sett),
+ throw(nothing);
+ false ->
+ ok
+ end % dolog
+ end % donotlog
+ end % remove
+ end % validate
end, Items),
nothing.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment