Skip to content

Instantly share code, notes, and snippets.

@angrycub
Last active September 21, 2024 19:19
Show Gist options
  • Save angrycub/58f7c390e80c0b1aeaf8810306800632 to your computer and use it in GitHub Desktop.
Save angrycub/58f7c390e80c0b1aeaf8810306800632 to your computer and use it in GitHub Desktop.
%% -------------------------------------------------------------------------
%%
%% leveldb_stat_collector: collect statistics from leveldb internal counters
%% to file
%%
%% Copyright (c) 2016 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License, Version 2.0 (the
%% "License"); you may not use this file except in compliance with the
%% License. You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
%% License for the specific language governing permissions and limitations
%% under the License.
%%
%% NOTE: You will need to enable the leveldb performance counters by
%% creating the file `/etc/riak/perf_counters` and restarting the
%% node.
%%
%% -------------------------------------------------------------------------
-module(leveldb_stat_collector).
-compile(export_all).
-behaviour(gen_server).
-export([start/0, start/2, stop/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([collect_stats/3]).
-define(SAMPLE_RATE, timer:minutes(1)). % in milliseconds
-define(WAIT_PRINT_INTERVAL, timer:minutes(1)).
-define(WAIT_POLL_INTERVAL, 100).
-record(?MODULE, {
started = false :: boolean(),
sample_rate :: integer(),
logfile_name :: list(),
logfile :: pid(),
stat_list :: list(),
tref :: reference(),
kv_elapsed_wait = 0 :: integer(),
leveldb_reference :: reference()
}).
start() ->
start("/var/log/riak/leveldb_perf.log",
["DBIterNew", "DBIterDelete", "Debug[3]", "Debug[4]", "Debug[0]"],
?SAMPLE_RATE).
start(LogfileName, StatList) ->
start(LogfileName, StatList, ?SAMPLE_RATE).
start(LogfileName, StatList, SampleRate) ->
gen_server:start({local, ?MODULE}, ?MODULE, [LogfileName, StatList, SampleRate], []).
stop() ->
gen_server:cast(?MODULE, stop).
init([LogfileName, StatList, SampleRate]) ->
error_logger:info_msg("Starting leveldb_stat_collector: Interval:~p ms, Destination: ~s, Stats: ~p",
[SampleRate, LogfileName, StatList]),
{ok, LogFile} = file:open(LogfileName,[append]),
write_header_row(StatList, LogFile),
self() ! timeout,
{ok, #?MODULE{
sample_rate=SampleRate,
logfile_name=LogfileName,
logfile=LogFile,
stat_list=StatList,
tref=make_ref()}}.
handle_call(status, _From, State) ->
{reply, {ok, State} , State};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast(stop, State) ->
error_logger:info_msg("Stopping leveldb_stat_collector",[]),
erlang:cancel_timer(State#?MODULE.tref),
file:close(State#?MODULE.logfile),
{stop, normal, State};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(check, State=#?MODULE{leveldb_reference=undefined}) ->
self() ! check,
{noreply, State#?MODULE{leveldb_reference=get_ref_to_leveldb()}};
handle_info(check, State) ->
erlang:spawn(?MODULE, collect_stats,[State#?MODULE.leveldb_reference, State#?MODULE.stat_list, State#?MODULE.logfile]),
erlang:cancel_timer(State#?MODULE.tref),
TRef = erlang:send_after(State#?MODULE.sample_rate, self(), check),
{noreply, State#?MODULE{tref=TRef}};
handle_info(timeout, State=#?MODULE{started=true}) ->
{noreply, State};
handle_info(timeout, State) ->
NewState = check_for_kv(State),
{noreply, NewState};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
print_stats(LeveldbRef, StatList) ->
lists:map(fun(Stat) -> {Stat, collect_stat(LeveldbRef, Stat)} end, StatList).
collect_stats(LeveldbRef, StatList, File) ->
Values = lists:map(fun(Stat) -> collect_stat(LeveldbRef, Stat) end, StatList),
write_log_line(create_log_line(Values), File),
ok.
collect_stat(LeveldbRef, Stat) ->
case eleveldb:status(LeveldbRef, list_to_binary("leveldb."++Stat)) of
{ok, Value} ->
binary_to_list(Value);
error ->
error_logger:error_msg("Error collecting ~p: No further information available",[Stat]),
"";
Unexpected ->
error_logger:error_msg("Reached default case for ~p: ~p",[Stat, Unexpected])
end.
create_log_line(Values) ->
{{Year, Month, Day},{Hour, Minute, Second}} = calendar:universal_time(),
TimeStamp =lists:flatten(io_lib:format("\"~4..0b/~2..0b/~2..0b ~2..0b:~2..0b:~2..0b\"",[Year,Month,Day,Hour,Minute,Second])),
Line = string:join([TimeStamp|lists:map(fun(X) -> integer_to_list(X) end, Values)],","),
Line.
write_header_row(StatList, File) ->
Line = string:join(["\"timestamp\""|lists:map(fun(X) -> "\""++X++"\"" end, StatList)],","),
file:write(File, io_lib:format("~s~n", [Line])).
write_log_line(Line, File) ->
file:write(File, io_lib:format("~s~n", [Line])).
get_vnode_state(Pid) ->
{status, Pid, _Mod, Status} = sys:get_status(Pid),
Status2 = lists:flatten(Status),
Status3 = [L || {data, L} <- Status2],
Status4 = lists:flatten(Status3),
State = proplists:get_value("StateData", Status4),
State.
get_ref_to_leveldb(Idx) ->
{ok, Pid} = riak_core_vnode_manager:get_vnode_pid(Idx, riak_kv_vnode),
State = get_vnode_state(Pid),
ModState = element(4, State),
case element(3,ModState) of
riak_kv_eleveldb_backend ->
LvlState = element(4, ModState),
element(2, LvlState);
_ ->
undefined
end.
get_ref_to_leveldb(Idx, Name0) ->
Name1 = if is_binary(Name0) -> Name0; true -> list_to_binary(Name0) end,
{ok, Pid} = riak_core_vnode_manager:get_vnode_pid(Idx, riak_kv_vnode),
State = get_vnode_state(Pid),
ModState = element(4, State),
case element(3,ModState) of
riak_kv_multi_backend ->
Backends = element(2, element(4, ModState)),
SL = [B || {Name, riak_kv_eleveldb_backend, B} <- Backends,
Name =:= Name1 ],
case SL of
[] -> undefined;
_ -> element(2, hd(SL))
end;
_ ->
undefined
end.
get_ref_to_leveldb() ->
VnodeIndex = hd([ B || {riak_kv_vnode,B,_} <- riak_core_vnode_manager:all_vnodes()]),
get_ref_to_leveldb(VnodeIndex).
check_for_kv(State) ->
Elapsed = State#?MODULE.kv_elapsed_wait,
Service = riak_kv,
case lists:member(Service, riak_core_node_watcher:services(node())) of
true ->
error_logger:info_msg("~p: Wait complete for service ~p (~p seconds)", [?MODULE, Service, Elapsed div 1000]),
self() ! check,
State;
false ->
%% Possibly print a notice.
ShouldPrint = Elapsed rem ?WAIT_PRINT_INTERVAL == 0,
case ShouldPrint of
true -> error_logger:info_msg("~p: Waiting for service ~p to start (~p seconds)", [?MODULE, Service, Elapsed div 1000]);
false -> skip
end,
erlang:cancel_timer(State#?MODULE.tref),
TRef = erlang:send_after(?WAIT_POLL_INTERVAL, self(), timeout),
State#?MODULE{tref=TRef, kv_elapsed_wait=Elapsed+?WAIT_POLL_INTERVAL }
end.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment