Skip to content

Instantly share code, notes, and snippets.

@nickva
Created June 24, 2019 13:30
Show Gist options
  • Select an option

  • Save nickva/096df2c1c6a1bb3106a0026f14d454c3 to your computer and use it in GitHub Desktop.

Select an option

Save nickva/096df2c1c6a1bb3106a0026f14d454c3 to your computer and use it in GitHub Desktop.
Benchmark for couch_jobs branch
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_jobs_bench).
-export([
start/0,
start/1,
stop/0,
job_creator/4,
job_acceptor/2,
reset/0
]).
-define(INIT_MIN, 1 bsl 52).
-define(MAX_RATE, 1 bsl 30).
-define(TYPE_TIMEOUT, 15000).
-define(START_TIME, '$couch_jobs_start_time').
-define(STOP_TIMER, '$couch_jobs_stop_timer').
-define(CREATORS, '$couch_jobs_creators').
-define(ACCEPTORS, '$couch_jobs_acceptors').
-define(OPTS, '$couch_job_opts').
start() ->
start(#{}).
start(#{} = Opts0) ->
Duration = maps:get(duration, Opts0, 15),
Creators = maps:get(creators, Opts0, 10000),
Acceptors = maps:get(acceptors, Opts0, 50),
Types = maps:get(types, Opts0, 10),
Rate = maps:get(rate, Opts0, max),
Opts = Opts0#{
duration => Duration,
creators => Creators,
acceptors => Acceptors,
types => Types,
rate => Rate,
start_time => couch_log_util:iso8601_timestamp()
},
acceptors_start(Acceptors, Types),
timer:sleep(1000),
creators_start(Creators, Rate, Types),
put(?START_TIME, erlang:system_time()),
put(?OPTS, Opts),
show_opts(Opts),
case Duration > 0 of
true ->
io:format("~nWaiting for ~p seconds ... ~n", [Duration]),
timer:sleep(Duration * 1000),
io:format("~n"),
stop();
false ->
io:format("~n Call ~p:stop() to stop the benchmark~n", [?MODULE]),
ok
end.
stop() ->
io:format("Stopping couch jobs bench~n", []),
StopTime = erlang:system_time(),
Stats = creators_stop(),
acceptors_stop(),
Opts = case get(?OPTS) of undefined -> #{}; Map -> Map end,
Res = case get(?START_TIME) of
undefined ->
io:format("warning: couldn't find start_time~n", []),
maps:merge(Stats, Opts);
StartTime when is_integer(StartTime) ->
erase(?START_TIME),
DtNative = StopTime - StartTime,
DtSec = erlang:convert_time_unit(DtNative, native, second),
#{n := N} = Stats,
Stats1 = Stats#{
rate_per_sec => trunc(N / DtSec),
seconds => DtSec
},
maps:merge(Stats1, Opts)
end,
show_opts(Res),
ok.
creators_start(Total, max, Types) ->
creators_start(Total, ?MAX_RATE, Types);
creators_start(Total, Rate, Types) when is_integer(Total),
is_integer(Rate), is_integer(Types), Total >
0, Rate > 0, Types > 0, Total >= Types ->
Sleep = trunc(1000 / (Rate / Total)),
lists:foreach(fun(T) ->
couch_jobs:set_type_timeout(T, ?TYPE_TIMEOUT)
end, lists:seq(0, Types - 1)),
couch_jobs_server:force_check_types(),
Stats = stats_new(),
Res =[spawn_monitor(?MODULE, job_creator, [Id rem Types, Id, Sleep, Stats])
|| Id <- lists:seq(0, Total - 1)
],
put(?CREATORS, Res),
Res.
creators_stop() ->
case get(?CREATORS) of
undefined ->
#{};
PidRefs when is_list(PidRefs) ->
erase(?CREATORS),
creators_stop(PidRefs)
end.
creators_stop(PidRefs) ->
lists:foreach(fun({Pid, _Ref}) -> Pid ! stop end, PidRefs),
lists:foreach(fun({Pid, _Ref}) ->
spawn(fun() -> timer:sleep(5000), exit(Pid, kill) end)
end, PidRefs),
stats_finalize(creators_wait(PidRefs)).
job_creator(Type, Id, Sleep, #{} = OldStats) ->
%Id = fabric2_util:uuid(),
Sleep1 = trunc(Sleep * 0.5) + rand:uniform(max(1, Sleep)) - 1,
case Sleep1 > 0 of
true -> timer:sleep(Sleep1);
false -> ok
end,
T1 = erlang:system_time(),
case couch_jobs:add(undefined, Type, Id, #{}, 0) of
{error, no_type_timeout} ->
% Jobs namespace was cleared, or timeout reset
% exit normal in that case.
exit(normal);
ok ->
ok
end,
T2 = erlang:system_time(),
DtAdd = T2 - T1,
{DtWait, DtAccept} = case couch_jobs:subscribe(Type, Id) of
{error, not_found} ->
{erlang:system_time() - T2, 0};
{ok, finished, #{<<"dt_accept">> := DtAcc}} ->
{erlang:system_time() - T2, DtAcc};
{ok, SubId, _, _} ->
%io:format(" * waiting ~b ~b subid:~p~n", [Type, Id, SubId]),
T3 = erlang:system_time(),
{_, _, finished, #{<<"dt_accept">> := DtAcc}} = couch_jobs:wait(SubId, finished, infinity),
{erlang:system_time() - T3, DtAcc}
end,
Stats = stats_agg(#{
n => 1,
dt_add => DtAdd,
dt_wait => DtWait,
dt_accept => DtAccept,
dt_creator => erlang:system_time() - T1
}, OldStats),
couch_jobs:remove(undefined, Type, Id),
receive
stop ->
exit({exit_creator, Stats})
after
0 -> ok
end,
job_creator(Type, Id, Sleep, Stats).
acceptors_start(Total, Types) when is_integer(Total), is_integer(Types),
Total >= Types ->
Res = [spawn_monitor(?MODULE, job_acceptor, [Id rem Types, Id])
|| Id <- lists:seq(0, Total - 1)
],
put(?ACCEPTORS, Res),
Res.
acceptors_stop() ->
case get(?ACCEPTORS) of
undefined ->
{error, could_not_find_default_pid_refs};
PidRefs when is_list(PidRefs) ->
erase(?ACCEPTORS),
acceptors_stop(PidRefs)
end.
acceptors_stop(PidRefs) ->
lists:foreach(fun({Pid, _Ref}) -> exit(Pid, kill) end, PidRefs),
acceptors_wait(PidRefs).
reset() ->
creators_stop(),
acceptors_stop(),
application:stop(couch_jobs),
couch_jobs_fdb:clear_jobs(),
application:start(couch_jobs).
% Private functions
creators_wait(PidRefs) ->
creators_wait(PidRefs, stats_new()).
creators_wait([], StatsAcc) ->
StatsAcc;
creators_wait([{_Pid, Ref} | PidRefs], StatsAcc) ->
receive
{'DOWN', Ref, _, _, {exit_creator, Stats}} ->
creators_wait(PidRefs, stats_agg(Stats, StatsAcc));
{'DOWN', Ref, _, _, _} ->
creators_wait(PidRefs, StatsAcc)
end.
acceptors_wait([]) ->
ok;
acceptors_wait([{_Pid, Ref} | PidRefs]) ->
receive
{'DOWN', Ref, _, _, _} -> acceptors_wait(PidRefs)
end.
job_acceptor(Type, Id) ->
T0 = erlang:system_time(),
Job = case couch_jobs:accept(Type) of
{error, not_found} ->
exit({exit_acceptor, not_found});
{ok, Job0, _} ->
Job0
end,
DtAccept = erlang:system_time() - T0,
%io:format("* accepted ~p ~p dt:~p finishing it~n", [Type, Id, DtAccept]),
case couch_jobs:finish(undefined, Job, #{<<"dt_accept">> => DtAccept}) of
{error, halt} -> exit(normal);
ok -> ok
end,
job_acceptor(Type, Id).
stats_agg(#{} = Stats0, #{} = Stats1) ->
maps:fold(fun
(n, N, Acc) ->
Acc#{n := N + maps:get(n, Acc, 0)};
(Field, Val, Acc) ->
Val1 = stats_add(Val, maps:get(Field, Acc)),
Acc#{Field := Val1}
end, Stats0, Stats1).
stats_finalize(#{n := N} = Stats) ->
Finalized = maps:fold(fun(Field, {Sum, Min, Max}, Acc) ->
Val = stats_fin(N, Sum, Min, Max),
Acc#{Field => Val}
end, #{}, maps:without([n], Stats)),
Finalized#{n => N}.
stats_new() ->
#{
n => 0,
dt_add => {0, ?INIT_MIN, 0},
dt_wait => {0, ?INIT_MIN, 0},
dt_accept => {0, ?INIT_MIN, 0},
dt_creator => {0, ?INIT_MIN, 0}
}.
stats_add({Sum1, Min1, Max1}, {Sum2, Min2, Max2}) ->
{Sum1 + Sum2, min(Min1, Min2), max(Max1, Max2)};
stats_add(V, {Sum, Min, Max}) when is_integer(V) ->
{Sum + V, min(V, Min), max(V, Max)};
stats_add({_, _, _} = FullVal, V) when is_integer(V) ->
stats_add(V, FullVal).
stats_fin(0, _, _, _) ->
{0, 0, 0};
stats_fin(N, Sum, Min, Max) ->
Avg = trunc(Sum / N),
AvgMSec = erlang:convert_time_unit(Avg, native, millisecond),
MinMSec = erlang:convert_time_unit(Min, native, millisecond),
MaxMSec = erlang:convert_time_unit(Max, native, millisecond),
{AvgMSec, MinMSec, MaxMSec}.
show_opts(Opts) ->
io:format("~n"),
io:format("------------------------------------------- ~n", []),
KVs = lists:sort(maps:to_list(Opts)),
lists:foreach(fun({K, V}) ->
case is_list(V) of
true ->
io:format(" ~10s : ~s~n", [K, V]);
false ->
io:format(" ~10s : ~p~n", [K, V])
end
end, KVs),
io:format("------------------------------------------- ~n", []).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment