Created
February 19, 2018 15:51
-
-
Save benmmurphy/66e2498ba27ca9a46fd29e89f43402b6 to your computer and use it in GitHub Desktop.
collect_acks
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(collect_acks_bench). | |
-export([bench_fifo/0, bench_lifo/0, bench_multiple/1]). | |
precondition_failed(S, _W) -> | |
throw(S). | |
%% NB: returns acks in youngest-first order | |
collect_acks(Q, 0, true) -> | |
{lists:reverse(queue:to_list(Q)), queue:new()}; | |
collect_acks(Q, DeliveryTag, Multiple) -> | |
collect_acks([], [], Q, DeliveryTag, Multiple). | |
collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> | |
case queue:out(Q) of | |
{{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}}, | |
QTail} -> | |
if CurrentDeliveryTag == DeliveryTag -> | |
{[UnackedMsg | ToAcc], | |
case PrefixAcc of | |
[] -> QTail; | |
_ -> queue:join( | |
queue:from_list(lists:reverse(PrefixAcc)), | |
QTail) | |
end}; | |
Multiple -> | |
collect_acks([UnackedMsg | ToAcc], PrefixAcc, | |
QTail, DeliveryTag, Multiple); | |
true -> | |
collect_acks(ToAcc, [UnackedMsg | PrefixAcc], | |
QTail, DeliveryTag, Multiple) | |
end; | |
{empty, _} -> | |
precondition_failed("unknown delivery tag ~w", [DeliveryTag]) | |
end. | |
repeat(0, F) -> | |
ok; | |
repeat(N, F) -> | |
F(), | |
repeat(N - 1, F). | |
bench_fifo() -> | |
Acks = lists:seq(1, 8000), | |
Q = lists:foldl(fun(E, Acc) -> | |
queue:in({E, ctag, msg}, Acc) | |
end, queue:new(), Acks), | |
timer:tc(fun() -> | |
repeat(1000, fun() -> | |
lists:foldl(fun(E, Acc) -> | |
{_Result, NewQueue} = collect_acks(Acc, E, false), | |
NewQueue | |
end, Q, Acks) | |
end) | |
end). | |
bench_multiple(N) -> | |
Acks = lists:seq(1, 8000), | |
Q = lists:foldl(fun(E, Acc) -> | |
queue:in({E, ctag, msg}, Acc) | |
end, queue:new(), Acks), | |
timer:tc(fun() -> | |
repeat(1000, fun() -> | |
collect_acks(Q, N, true) | |
end) | |
end). | |
bench_lifo() -> | |
Acks = lists:seq(1, 8000), | |
Q = lists:foldl(fun(E, Acc) -> | |
queue:in({E, ctag, msg}, Acc) | |
end, queue:new(), Acks), | |
lists:foldl(fun(E, Acc) -> | |
{Result, NewQueue} = collect_acks(Acc, E, false), | |
NewQueue | |
end, Q, lists:reverse(Acks)). |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(collect_acks_bench_new). | |
-export([bench_fifo/0, bench_lifo/0, bench_multiple/1]). | |
precondition_failed(S, _W) -> | |
throw(S). | |
%% TODO: implement DeliveryTag==0 fast path? | |
collect_acks(Q, DeliveryTag, true) -> | |
collect_acks_multiple([], Q, DeliveryTag); | |
collect_acks(Q, DeliveryTag, false) -> | |
case take_any(DeliveryTag, Q) of | |
error -> | |
precondition_failed("unknown delivery tag ~w", [DeliveryTag]); | |
{UnackedMessage, Q1} -> | |
{[UnackedMessage], Q1} | |
end. | |
%% reimplementation of gb_trees:take_any that is on erlang R20+ | |
take_any(Key, Tree) -> | |
case gb_trees:is_defined(Key, Tree) of | |
true -> | |
Value = gb_trees:get(Key, Tree), | |
Tree1 = gb_trees:delete(Key, Tree), | |
{Value, Tree1}; | |
false -> | |
error | |
end. | |
collect_acks_multiple(ToAcc, Q, DeliveryTag) -> | |
% if the tag does not exist we still ack tags < than the specified tag | |
case gb_trees:is_empty(Q) of | |
true -> | |
{ToAcc, Q}; | |
false -> | |
{NextDeliveryTag, NextUnackedMessage, Q1} = gb_trees:take_smallest(Q), | |
case NextDeliveryTag =< DeliveryTag of | |
true -> | |
collect_acks_multiple([NextUnackedMessage | ToAcc], Q1, DeliveryTag); | |
false -> | |
{ToAcc, Q} | |
end | |
end. | |
repeat(0, F) -> | |
ok; | |
repeat(N, F) -> | |
F(), | |
repeat(N - 1, F). | |
bench_fifo() -> | |
Acks = lists:seq(1, 8000), | |
Q = lists:foldl(fun(E, Acc) -> | |
gb_trees:insert(E, {E, ctag, msg}, Acc) | |
end, gb_trees:empty(), Acks), | |
timer:tc(fun() -> | |
repeat(1000, fun() -> | |
lists:foldl(fun(E, Acc) -> | |
{_Result, NewQueue} = collect_acks(Acc, E, false), | |
NewQueue | |
end, Q, Acks) | |
end) | |
end). | |
bench_multiple(N) -> | |
Acks = lists:seq(1, 8000), | |
Q = lists:foldl(fun(E, Acc) -> | |
gb_trees:insert(E, {E, ctag, msg}, Acc) | |
end, gb_trees:empty(), Acks), | |
timer:tc(fun() -> | |
repeat(1000, fun() -> | |
{_Result, NewQueue} = collect_acks(Q, N, true) | |
end) | |
end). | |
bench_lifo() -> | |
Acks = lists:seq(1, 8000), | |
Q = lists:foldl(fun(E, Acc) -> | |
gb_trees:insert(E, {E, ctag, msg}, Acc) | |
end, gb_trees:empty(), Acks), | |
lists:foldl(fun(E, Acc) -> | |
{Result, NewQueue} = collect_acks(Acc, E, false), | |
NewQueue | |
end, Q, lists:reverse(Acks)). |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment