Skip to content

Instantly share code, notes, and snippets.

@nickva
Created November 7, 2023 06:17
Show Gist options
  • Save nickva/31232b3ed9f57ce8f96b2e4ecfdec524 to your computer and use it in GitHub Desktop.
Save nickva/31232b3ed9f57ce8f96b2e4ecfdec524 to your computer and use it in GitHub Desktop.
Reproducer for a binary memory leak across a dist channel on OTP 24+
%
% $ erl -name [email protected]
% > c(distblockleak).
%
% $ erl -name [email protected]
% > c(distblockleak), distblockleak:go('[email protected]', "./junk.bin", 10000, 100, 200).
%
-module(distblockleak).
-behaviour(gen_server).
-export([
go/5,
stop/0,
get_stats_int/0
]).
-export([
init/1,
handle_call/3,
handle_cast/2,
terminate/2
]).
-define(MB, 1 bsl 20).
go(Node, File, LenMBs, ServerProcs, CallerProcs) ->
stop(),
LenBytes = write(File, LenMBs),
register(?MODULE, sup_start()),
Servers = spawn_servers(Node, File, ServerProcs),
Callers = spawn_callers(Servers, LenBytes, CallerProcs),
sup_add(spawn(fun() -> stats(Node) end)),
[Pid ! start_calling || Pid <- Callers],
started.
stop() ->
case whereis(?MODULE) of
undefined -> not_running;
Pid when is_pid(Pid) -> unlink(Pid), exit(Pid, kill), stopped
end.
write(File, LenMBs) ->
{ok, Fd} = file:open(File, [append, raw, binary]),
case already_there(File, LenMBs) of
true ->
LenMBs * ?MB;
false ->
ok = file:truncate(Fd),
LenBytes = write_blocks(Fd, rand:bytes(?MB), LenMBs),
ok = file:close(Fd),
LenBytes
end.
% Avoid waiting to write a new file every time
%
already_there(File, LenMBs) ->
case file:open(File, [read, raw, binary]) of
{ok, Fd} ->
try
case file:position(Fd, eof) of
{ok, Pos} ->
case Pos >= LenMBs * ?MB of
true ->
io:format("File ~p already there and >= ~p MBs~n", [File, LenMBs]),
true;
false ->
false
end;
_ ->
false
end
after
file:close(Fd)
end;
_ ->
false
end.
write_blocks(Fd, _, 0) ->
file:sync(Fd),
{ok, Bytes} = file:position(Fd, eof),
Bytes;
write_blocks(Fd, Block, N) ->
ok = file:write(Fd, Block),
write_blocks(Fd, Block, N-1).
spawn_servers(Node, File, N) ->
lists:map(fun(_) ->
{ok, Pid} = erpc:call(Node, gen_server, start, [?MODULE, [File], []]),
sup_add(Pid),
Pid
end, lists:seq(1, N)).
spawn_callers(Servers, LenBytes, N) ->
[caller_start(Servers, LenBytes) || _ <- lists:seq(1, N)].
caller_start(Servers, LenBytes) ->
sup_add(spawn(fun() ->
rand:seed(exsss, os:timestamp()),
receive start_calling -> ok end,
caller_loop(shuffle(Servers), LenBytes)
end)).
caller_loop([S | Rest], LenBytes) ->
{Pid, Ref} = spawn_monitor(fun() ->
R = gen_server:call(S, rand_pread(LenBytes), infinity),
exit(R)
end),
receive
{'DOWN', Ref, _, _, _} ->
ok
after rand:uniform(1000) ->
erlang:demonitor(Ref, [flush]),
exit(Pid, kill)
end,
receive ok -> ok after 1 -> ok end,
caller_loop(Rest ++ [S], LenBytes).
rand_pread(LenBytes) ->
{pread, rand:uniform(LenBytes), rand:uniform(1 bsl 24)}.
% The sup hack is just to keep everything linked together
% so it can be easily torn down and restarted
%
sup_start() ->
spawn_link(fun() -> process_flag(trap_exit, true), sup([]) end).
sup(Pids) ->
receive
{add, Pid, From} ->
link(Pid),
From ! {ok, self()},
Pids1 = [Pid | Pids],
sup(Pids1);
{'EXIT', Pid, _Reason} ->
Pids1 = lists:delete(Pid, Pids),
sup(Pids1)
end.
sup_add(Pid) ->
Sup = whereis(?MODULE),
Sup ! {add, Pid, self()},
receive {ok, Sup} -> ok end,
Pid.
shuffle(L) ->
[S || {_, S} <- lists:sort([{rand:uniform(), X} || X <- L])].
stats(Node) ->
timer:sleep(1000),
io:format("process_info:~n", []),
T0 = erlang:monotonic_time(millisecond),
try get_stats(Node, rand:uniform(90000)) of
[_ | _] = Procs->
Dt = erlang:monotonic_time(millisecond) - T0,
io:format(" > procs: ~p msec: ~p~n", [length(Procs), Dt])
catch
error:{erpc, timeout} ->
Dt = erlang:monotonic_time(millisecond) - T0,
io:format(" > TIMEOUT msec: ~p~n", [Dt])
end,
io:format(" > dist tcp_inet allocations: ~p~n", [tcp_inet()]),
stats(Node).
tcp_inet() ->
{ok, {_, _, #{tcp_inet := TcpInet}}} = instrument:allocations(),
TcpInet.
get_stats(Node, Timeout) ->
erpc:call(Node, ?MODULE, get_stats_int, [], Timeout).
get_stats_int() ->
[catch(process_info(P, [message_queue_len, dictionary])) || P <- processes()].
init([File]) ->
{ok, Fd} = file:open(File, [read, raw, binary, append]),
{ok, #{fd => Fd}}.
terminate(_Reason, #{fd := Fd}) ->
file:close(Fd).
handle_call({pread, Pos, Len}, _From, #{fd := Fd} = St) ->
case file:pread(Fd, Pos, Len) of
{ok, Bin} ->
{reply, Bin, St};
_ ->
{reply, <<>>, St}
end.
handle_cast(Msg, #{} = St) ->
{stop, {unexpected_cast, Msg}, St}.
@nickva
Copy link
Author

nickva commented Nov 7, 2023

OTP 25

process_info:
  > procs: 153 msec: 312
  > dist tcp_inet allocations: #{binary =>
                                     {5,27,34,91,180,393,699,1450,2969,755875,
                                      0,0,0,0,0,0,0,0},
                                 driver_tid =>
                                     {10,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0},
                                 driver_tsd =>
                                     {10,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0},
                                 drv_binary =>
                                     {0,0,0,0,0,0,0,0,0,34,0,0,0,0,0,0,0,0},
                                 drv_internal =>
                                     {0,10,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0}}
process_info:
  > procs: 153 msec: 42
  > dist tcp_inet allocations: #{binary =>
                                     {6,27,34,91,182,396,707,1488,3017,768247,
                                      0,0,0,0,0,0,0,0},
                                 driver_tid =>
                                     {10,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0},
                                 driver_tsd =>
                                     {10,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0},
                                 drv_binary =>
                                     {0,0,0,0,0,0,0,0,0,36,0,0,0,0,0,0,0,0},
                                 drv_internal =>
                                     {0,10,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0}}
process_info:
([email protected])2> distblockleak:stop().
stopped

([email protected])3> recon:bin_leak(1).
[{<0.88.0>,-3,
  [{current_function,{disk_log,loop,1}},
   {initial_call,{proc_lib,init_p,5}}]}]

([email protected])4> erlang:memory().
[{total,52375171672},
 {processes,43839592},
 {processes_used,43839336},
 {system,52331332080},
 {atom,491713},
 {atom_used,462311},
 {binary,52286491928},
 {code,8859086},
 {ets,523320}]

@nickva
Copy link
Author

nickva commented Nov 7, 2023

OTP 23

 process_info:
  > procs: 152 msec: 74
  > dist tcp_inet allocations: #{driver_tid =>
                                     {13,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0},
                                 driver_tsd =>
                                     {13,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0},
                                 drv_binary =>
                                     {0,0,0,0,0,0,0,0,0,2477,0,0,0,0,0,0,0,0},
                                 drv_internal =>
                                     {0,13,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0}}
process_info:
  > procs: 152 msec: 64
  > dist tcp_inet allocations: #{driver_tid =>
                                     {13,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0},
                                 driver_tsd =>
                                     {13,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0},
                                 drv_binary =>
                                     {0,0,0,0,0,0,0,0,0,2402,0,0,0,0,0,0,0,0},
                                 drv_internal =>
                                     {0,13,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0}}
([email protected])4> distblockleak:stop().
stopped

([email protected])7> erlang:memory().
[{total,59003920},
 {processes,11925736},
 {processes_used,11924568},
 {system,47078184},
 {atom,450745},
 {atom_used,447806},
 {binary,2044544},
 {code,7817291},
 {ets,518264}]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment