Skip to content

Instantly share code, notes, and snippets.

@nickva
Created January 28, 2025 00:53
Show Gist options
  • Save nickva/f2175a6bee8399f7f9ed38d5e9d3be5d to your computer and use it in GitHub Desktop.
Save nickva/f2175a6bee8399f7f9ed38d5e9d3be5d to your computer and use it in GitHub Desktop.
Benchmark couch work queue
-module(bench).
-export([main/1]).
producer(Q) ->
receive
{produce, Pid, N, Size} ->
{ok, Dt} = produce(Q, N, erlang:monotonic_time(), Size),
Pid ! {produced, Dt},
producer(Q)
end.
produce(_Q, 0, T0, _Size) ->
{ok, erlang:monotonic_time() - T0};
produce(Q, N, T0, Size) when is_integer(N), N>0 ->
TItem = erlang:monotonic_time(),
ok = couch_work_queue:queue(Q, {TItem, lists:duplicate(Size, {[<<"x">>, <<"y">>]})}),
garbage_collect(),
produce(Q, N - 1, T0, Size).
consumer(Q) ->
receive
{consume, Pid, N, MaxSize} ->
{ok, Dt, DtItemMin, DtItemMax} = consume(Q, N, MaxSize, undefined, 1 bsl 63, 0),
Pid ! {consumed, Dt, DtItemMin, DtItemMax},
consumer(Q)
end.
consume(_Q, 0, _MaxSize, T0, DtItemMin, DtItemMax) ->
{ok, erlang:monotonic_time() - T0, DtItemMin, DtItemMax};
consume(Q, N, MaxSize, T0, DtItemMin, DtItemMax) ->
{ok, Items} = couch_work_queue:dequeue(Q, MaxSize),
TNow = erlang:monotonic_time(),
T = case T0 of
undefined -> TNow;
_ -> T0
end,
{DtItemMin1, DtItemMax1} = lists:foldl(fun({T0Item, Item}, {AccMin, AccMax}) ->
_ = Item,
DtItem = TNow - T0Item,
{min(DtItem, AccMin), max(DtItem, AccMax)}
end, {DtItemMin, DtItemMax}, Items),
consume(Q, N - length(Items), MaxSize, T, DtItemMin1, DtItemMax1).
main({N, MaxSize, MaxItems, DequeueSize, Size}) ->
{ok, Q} = couch_work_queue:new([{max_size, MaxSize}, {max_items, MaxItems}]),
QRef = monitor(process, Q),
%unlink(Q),
{Producer, PRef} = spawn_monitor(fun() -> producer(Q) end),
{Consumer, CRef} = spawn_monitor(fun() -> consumer(Q) end),
timer:sleep(1000),
Consumer ! {consume, self(), N, DequeueSize},
Producer ! {produce, self(), N, Size},
ProducerDt = receive {produced, Dt} -> Dt end,
{ConsumerDt, IMinDt, IMaxDt} = receive {consumed, CDt, Min, Max} -> {CDt, Min, Max} end,
ok = couch_work_queue:close(Q),
exit(Producer, kill),
exit(Consumer, kill),
receive {'DOWN', QRef, _, _, _} -> ok end,
receive {'DOWN', PRef, _, _, _} -> ok end,
receive {'DOWN', CRef, _, _, _} -> ok end,
#{
p_dt => erlang:convert_time_unit(ProducerDt, native, microsecond),
c_dt => erlang:convert_time_unit(ConsumerDt, native, microsecond),
imin_dt => erlang:convert_time_unit(IMinDt, native, microsecond),
imax_dt => erlang:convert_time_unit(IMaxDt, native, microsecond)
}.
@nickva
Copy link
Author

nickva commented Jan 28, 2025

Erlang/OTP 25 [erts-13.2.2.11] [source] [64-bit] [smp:12:12] [ds:12:12:10] [async-threads:1] [jit:ns]

Eshell V13.2.2.11  (abort with ^G)
([email protected])1> bench:main({100000, 100000, 100, all, 10000}).
#{c_dt => 103772749,imax_dt => 33037,imin_dt => 599,
  p_dt => 103773912}
([email protected])2> bench:main({100000, 100000, 100, all, 10000}).
#{c_dt => 69403578,imax_dt => 2242,imin_dt => 597,
  p_dt => 69405621}
([email protected])3> bench:main({100000, 100000, 100, all, 10000}).
#{c_dt => 85111027,imax_dt => 19165,imin_dt => 597,
  p_dt => 85112678}
Erlang/OTP 27 [erts-15.1.2] [source] [64-bit] [smp:12:12] [ds:12:12:16] [async-threads:1]

Eshell V15.1.2 (press Ctrl+G to abort, type help(). for help)
([email protected])1> bench:main({100000, 100000, 100, all, 10000}).
#{c_dt => 110633029,imax_dt => 63506,imin_dt => 702,
  p_dt => 110634677}
([email protected])2> bench:main({100000, 100000, 100, all, 10000}).
#{c_dt => 109516657,imax_dt => 75298,imin_dt => 696,
  p_dt => 109518259}
([email protected])3> bench:main({100000, 100000, 100, all, 10000}).
#{c_dt => 104687478,imax_dt => 16934,imin_dt => 925,
  p_dt => 104688759}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment