Last active
April 9, 2024 10:19
-
-
Save JoeZ99/c5636b9e0f693557a5b526fa65dc8633 to your computer and use it in GitHub Desktop.
A simple elixir "memory monitor"
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule MemoryMonitor do | |
@moduledoc """ | |
This module implements a GenServer that monitors the memory usage of a process. | |
It periodically checks the memory consumption. | |
""" | |
use GenServer | |
@time_to_check :timer.seconds(10) | |
@doc """ | |
Starts the MemoryMonitor process linked to the caller process with given options. | |
""" | |
@spec start_link(pid()) :: GenServer.on_start() | |
def start_link(pid_to_monitor) do | |
GenServer.start_link(__MODULE__, pid_to_monitor) | |
end | |
@impl GenServer | |
@spec init(pid) :: {:ok, pid} | |
def init(pid) do | |
Process.send_after(self(), :check, @time_to_check) | |
Process.monitor(pid) | |
{:ok, {0, pid}} | |
end | |
@doc """ | |
Handles incoming messages to the GenServer. | |
- `:check` - Performs memory usage checks on the monitored process | |
""" | |
@impl GenServer | |
def handle_info(:check, {_, pid_to_monitor}) do | |
Process.send_after(self(), :check, @time_to_check) | |
memory_used = get_memory_usage(pid_to_monitor) | |
{:noreply, {memory_used, pid_to_monitor}} | |
end | |
# when the monitored process dies, die | |
def handle_info({:DOWN, _ref, :process, pid, _}, {_, pid}), do: {:stop, :normal, nil} | |
@spec get_memory_usage(pid) :: non_neg_integer | |
defp get_memory_usage(pid_to_monitor) do | |
processes_tree = get_processes_tree(pid_to_monitor, MapSet.new([])) | |
{bin_mem_size, _} = get_bin_memory(processes_tree) | |
process_mem_size = get_heap_memory(processes_tree) | |
bin_mem_size + process_mem_size | |
end | |
@spec get_processes_tree(pid | port, MapSet.t()) :: map | |
defp get_processes_tree(pid, used_pids) when is_pid(pid) do | |
if MapSet.member?(used_pids, pid), | |
do: nil, | |
else: do_get_processes_tree(pid, used_pids) | |
end | |
# the linked resource may be a port, not a pid | |
defp get_processes_tree(_, _), do: nil | |
@spec do_get_processes_tree(pid | port, MapSet.t()) :: map | |
defp do_get_processes_tree(pid, used_pids) when is_pid(pid) do | |
used_pids = MapSet.put(used_pids, pid) | |
{process_extra_info, process_info} = | |
pid | |
|> Process.info([:dictionary, :current_function, :status, :links, :memory, :binary]) | |
|> Keyword.split([:dictionary, :current_function, :status]) | |
if child?(process_extra_info, used_pids) || MapSet.size(used_pids) == 1 do | |
process_info | |
|> Map.new() | |
|> Map.update(:links, [], &get_allowed_processes(&1, used_pids)) | |
else | |
nil | |
end | |
end | |
@spec get_allowed_processes(pid, MapSet.t()) :: [map] | |
defp get_allowed_processes(pids, used_pids) do | |
pids | |
|> Enum.map(&get_processes_tree(&1, used_pids)) | |
|> Enum.reject(&is_nil/1) | |
end | |
@spec get_bin_memory(map, {integer, MapSet.t()}) :: {integer, MapSet.t()} | |
defp get_bin_memory( | |
%{binary: binaries, links: links}, | |
{mem_used, used_bin_refs} \\ {0, MapSet.new([])} | |
) do | |
{bin_mem, used_bin_refs} = Enum.reduce(binaries, {mem_used, used_bin_refs}, &maybe_sum/2) | |
Enum.reduce(links, {bin_mem, used_bin_refs}, &get_bin_memory/2) | |
end | |
@spec maybe_sum({integer, integer, integer}, {integer, MapSet.t()}) :: {integer, MapSet.t()} | |
defp maybe_sum({bin_ref, mem, _}, {total_mem, used_bin_refs}) do | |
if MapSet.member?(used_bin_refs, bin_ref), | |
do: {total_mem, used_bin_refs}, | |
else: {total_mem + mem, MapSet.put(used_bin_refs, bin_ref)} | |
end | |
@spec get_heap_memory(map) :: integer | |
defp get_heap_memory(%{memory: mem, links: links}) do | |
links | |
|> Enum.map(&get_heap_memory/1) | |
|> Enum.sum() | |
|> Kernel.+(mem) | |
end | |
@spec child?(keyword, MapSet.t()) :: boolean | |
defp child?(process_extra_info, used_pids) do | |
dictionary = Keyword.get(process_extra_info, :dictionary) | |
status = Keyword.get(process_extra_info, :status) | |
{module, _, _} = Keyword.get(process_extra_info, :current_function) | |
parents = get_parents(dictionary) | |
# if it's Task.asyn_stream monitoring process, then its status is :waiting | |
!MapSet.disjoint?(parents, used_pids) || (module == Task.Supervised && status == :waiting) | |
end | |
@spec get_parents(keyword) :: MapSet.t() | |
defp get_parents(dictionary) do | |
ancestors = Keyword.get(dictionary, :"$ancestors", []) | |
callers = Keyword.get(dictionary, :"$callers", []) | |
MapSet.new(ancestors ++ callers) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment