Created
February 11, 2024 06:33
-
-
Save nickva/892ab91e9880f0dd311ac4a8f63440ef to your computer and use it in GitHub Desktop.
CouchDB Event Server Benchmark
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
% Benchmark module for couch_event_server | |
% | |
% Benchmark registering/unregistering N listeners for N dbs. Then benchmark | |
% sending 10 events from concurrent proceses for each of the Db and measure | |
% event latency: how long it took for the event to be delivered to the | |
% listener. | |
% | |
% Times are measured in microseconds. Best (minimum) results are returned for each | |
% of the N cases. | |
% | |
-module(couch_event_bench). | |
-export([ | |
go/0, | |
bench/1 | |
]). | |
-record(lst, { | |
evt_dtsum = 0, | |
evt_cnt = 0, | |
wait_cnt = 0, | |
wait_pid = undefined | |
}). | |
go() -> | |
Cases = [1, 10, 100, 1000, 10000], | |
io:format("N Register Unreg. EvtLatency ~n", []), | |
lists:foreach( | |
fun(N) -> | |
{RegT, EvtT, UnRegT} = best_of(5, N), | |
io:format("~p ~p ~p ~p~n", [N, RegT, UnRegT, EvtT]) | |
end, | |
Cases | |
). | |
best_of(Tries, N) -> | |
Results = [{_, _, _} = bench(N) || _ <- lists:seq(1, Tries)], | |
{RegTs, EvtTs, UnRegTs} = lists:unzip3(Results), | |
{lists:min(RegTs), lists:min(EvtTs), lists:min(UnRegTs)}. | |
bench(N) -> | |
Listeners = gen_listeners(N), | |
Dbs = gen_dbs(N, 50), | |
RegT = bench_reg(Listeners, Dbs), | |
EvtT = bench_event(10, Listeners, Dbs), | |
UnRegT = bench_unreg(Listeners), | |
[exit(LPid, kill) || LPid <- Listeners], | |
{RegT, EvtT, UnRegT}. | |
bench_reg(Listeners, Dbs) -> | |
T0 = ts(), | |
[reg(LPid, Db) || {LPid, Db} <- lists:zip(Listeners, Dbs)], | |
(ts() - T0) / length(Listeners). | |
bench_event(EventCnt, Listeners, Dbs) -> | |
NotifierRefs = [spawn_notifier(EventCnt, Db) || Db <- Dbs], | |
[LPid ! {get_events, EventCnt, self()} || LPid <- Listeners], | |
{EventCount, EventDtSum} = wait_all_res(length(Listeners), 0, 0), | |
wait_notifiers(NotifierRefs), | |
EventDtSum / EventCount. | |
bench_unreg(Listeners) -> | |
T0 = ts(), | |
[unreg(LPid) || LPid <- Listeners], | |
(ts() - T0) / length(Listeners). | |
spawn_notifier(EventCnt, Db) -> | |
{_Pid, Ref} = spawn_monitor(fun() -> notifier_loop(EventCnt, Db) end), | |
Ref. | |
notifier_loop(0, _Db) -> | |
ok; | |
notifier_loop(EventCnt, Db) -> | |
timer:sleep(rand:uniform(100)), | |
notify(Db), | |
notifier_loop(EventCnt - 1, Db). | |
wait_notifiers([]) -> | |
ok; | |
wait_notifiers([Ref | Rest]) -> | |
receive | |
{'DOWN', Ref, _, _, _} -> ok | |
end, | |
wait_notifiers(Rest). | |
reg(Pid, Db) -> | |
gen_server:call(couch_event_server, {register, Pid, [Db]}). | |
unreg(Pid) -> | |
gen_server:call(couch_event_server, {unregister, Pid}). | |
notify(Db) -> | |
gen_server:cast(couch_event_server, {notify, Db, {evt_t, ts()}}). | |
% Listeners receive and save their notification events. | |
% {get_events, MinNum, self()} call will block until | |
% the listener had received at least MinNum events. | |
% The response is {EventCount, EventLatencyDtUSec}. | |
start_listener() -> | |
spawn(fun() -> loop(#lst{}) end). | |
loop(#lst{} = St0) -> | |
St = #lst{evt_dtsum = DtSum, evt_cnt = Cnt, wait_pid = WaitPid} = respond(St0), | |
receive | |
{'$couch_event', _Db, {evt_t, SendT0}} -> | |
Dt = ts() - SendT0, | |
loop(St#lst{evt_dtsum = DtSum + Dt, evt_cnt = Cnt + 1}); | |
{get_events, _, WPid} when is_pid(WaitPid) -> | |
WPid ! {error, existing_waiter}; | |
{get_events, WCnt, WPid} -> | |
loop(St#lst{wait_cnt = WCnt, wait_pid = WPid}) | |
end. | |
respond(#lst{} = St) -> | |
#lst{evt_dtsum = DtSum, evt_cnt = ECnt, wait_cnt = WCnt, wait_pid = Pid} = St, | |
case {ECnt >= WCnt, is_pid(Pid)} of | |
{true, true} -> | |
Pid ! {evt_dtsum, ECnt, DtSum}, | |
St#lst{evt_dtsum = 0, evt_cnt = 0, wait_cnt = 0, wait_pid = undefined}; | |
{_, _} -> | |
St | |
end. | |
wait_all_res(0, AccEvCnt, AccEvDtSum) -> | |
{AccEvCnt, AccEvDtSum}; | |
wait_all_res(N, AccEvCnt, AccEvDtSum) -> | |
{AccEvCnt1, AccEvDtSum1} = | |
receive | |
{error, Error} -> | |
error({error, Error}); | |
{evt_dtsum, Cnt, DtSum} -> | |
{AccEvCnt + Cnt, AccEvDtSum + DtSum} | |
end, | |
wait_all_res(N - 1, AccEvCnt1, AccEvDtSum1). | |
gen_listeners(N) -> | |
[start_listener() || _ <- lists:seq(1, N)]. | |
gen_dbs(N, Size) -> | |
[gen_db(Size) || _ <- lists:seq(1, N)]. | |
gen_db(Size) -> | |
binary:encode_hex(crypto:strong_rand_bytes(Size div 2)). | |
ts() -> | |
erlang:monotonic_time(microsecond). |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
With khash
WIth maps