Last active
July 8, 2019 11:59
-
-
Save slashdotdash/2dba20ae50d006d8cd1e165769bc2caa to your computer and use it in GitHub Desktop.
AWS CloudWatch reporter for Commanded telemetry.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule CloudWatchReporter do | |
use GenServer | |
require Logger | |
alias ExAws.Cloudwatch | |
@namespace "My/App" | |
def start_link(args) do | |
GenServer.start_link(__MODULE__, args, name: __MODULE__) | |
end | |
@impl GenServer | |
def init(_args) do | |
Process.flag(:trap_exit, true) | |
:ok = Environment.init() | |
events = [ | |
[:vm, :memory], | |
[:vm, :total_run_queue_lengths], | |
[:commanded, :command, :dispatch, :start], | |
[:commanded, :command, :dispatch, :success], | |
[:commanded, :command, :dispatch, :failure], | |
[:commanded, :event, :published] | |
] | |
:telemetry.attach_many( | |
"cloudwatch-instrumenter", | |
events, | |
&CloudWatchReporter.handle_event/4, | |
nil | |
) | |
enqueue_send_metrics_timer() | |
{:ok, []} | |
end | |
@impl GenServer | |
def handle_call(:get_queue, _from, queue) do | |
{:reply, {:ok, queue}, queue} | |
end | |
@impl GenServer | |
def handle_cast({:put_metrics, metrics}, queue) do | |
queue = Enum.reduce(metrics, queue, fn metric, queue -> [metric | queue] end) | |
{:noreply, queue} | |
end | |
@impl GenServer | |
def handle_info(:send_metrics, queue) do | |
do_send_metrics(queue) | |
enqueue_send_metrics_timer() | |
{:noreply, []} | |
end | |
@impl GenServer | |
def handle_info(message, queue) do | |
Logger.info(fn -> | |
"CloudWatch reporter unexpectedly received message: " <> inspect(message) | |
end) | |
{:noreply, queue} | |
end | |
# Send any queued metrics on process shutdown. | |
@impl GenServer | |
def terminate(_reason, queue) do | |
do_send_metrics(queue) | |
queue | |
end | |
@memory_metrics %{ | |
"Atom" => :atom, | |
"Binary" => :binary, | |
"Code" => :code, | |
"ETS" => :ets, | |
"Processes" => :processes, | |
"System" => :system, | |
"Total" => :total | |
} | |
# See http://erlang.org/doc/man/erlang.html#memory-0 | |
def handle_event([:vm, :memory], measurements, _metadata, _opts) do | |
hostname = Environment.hostname() | |
@memory_metrics | |
|> Enum.reduce([], fn {name, metric}, metrics -> | |
case Map.get(measurements, metric) do | |
bytes_allocated when is_integer(bytes_allocated) -> | |
[ | |
[ | |
metric_name: "VMMemory", | |
value: bytes_allocated, | |
unit: "Bytes", | |
dimensions: %{"Hostname" => hostname, "Memory" => name}, | |
timestamp: DateTime.utc_now() | |
] | |
| metrics | |
] | |
nil -> | |
metrics | |
end | |
end) | |
|> put_metrics() | |
end | |
@queue_length_metrics %{ | |
"CPU" => :cpu, | |
"IO" => :io, | |
"Total" => :total | |
} | |
# See https://hexdocs.pm/telemetry_poller/Telemetry.Poller.html#module-run-queue-lengths | |
def handle_event([:vm, :total_run_queue_lengths], measurements, _metadata, _opts) do | |
hostname = Environment.hostname() | |
@queue_length_metrics | |
|> Enum.reduce([], fn {name, metric}, metrics -> | |
case Map.get(measurements, metric) do | |
queue_length when is_integer(queue_length) -> | |
[ | |
[ | |
metric_name: "VMQueueLength", | |
value: queue_length, | |
unit: "Count", | |
dimensions: %{"Hostname" => hostname, "Queue" => name}, | |
timestamp: DateTime.utc_now() | |
] | |
| metrics | |
] | |
nil -> | |
metrics | |
end | |
end) | |
|> put_metrics() | |
end | |
def handle_event([:commanded, :command, :dispatch, :start], _measurements, metadata, _opts) do | |
%{command: command} = metadata | |
command_name = module_name(command) | |
put_metrics([ | |
[ | |
metric_name: "CommandDispatch", | |
value: 1, | |
unit: "None", | |
dimensions: %{"Command" => command_name}, | |
timestamp: DateTime.utc_now() | |
] | |
]) | |
end | |
def handle_event([:commanded, :command, :dispatch, :success], measurements, metadata, _opts) do | |
%{duration: duration} = measurements | |
%{command: command} = metadata | |
command_name = module_name(command) | |
put_metrics([ | |
[ | |
metric_name: "CommandDispatchSuccess", | |
value: 1, | |
unit: "None", | |
dimensions: %{"Command" => command_name}, | |
timestamp: DateTime.utc_now() | |
], | |
[ | |
metric_name: "CommandDispatchDuration", | |
value: duration, | |
unit: "Microseconds", | |
dimensions: %{"Command" => command_name}, | |
timestamp: DateTime.utc_now() | |
] | |
]) | |
end | |
def handle_event([:commanded, :command, :dispatch, :failure], _measurements, metadata, _opts) do | |
%{command: command} = metadata | |
command_name = module_name(command) | |
put_metrics([ | |
[ | |
metric_name: "CommandDispatchFailure", | |
value: 1, | |
unit: "None", | |
dimensions: %{"Command" => command_name}, | |
timestamp: DateTime.utc_now() | |
] | |
]) | |
end | |
def handle_event([:commanded, :event, :published], measurements, metadata, _opts) do | |
%{timestamp: timestamp} = measurements | |
%{event: event} = metadata | |
event_name = module_name(event) | |
timestamp = | |
case timestamp do | |
%NaiveDateTime{} = timestamp -> DateTime.from_naive!(timestamp, "Etc/UTC") | |
%DateTime{} = timestamp -> timestamp | |
_ -> DateTime.utc_now() | |
end | |
put_metrics([ | |
[ | |
metric_name: "EventPublish", | |
value: 1, | |
unit: "None", | |
dimensions: %{"Event" => event_name}, | |
timestamp: timestamp | |
] | |
]) | |
end | |
defp put_metrics([]), do: :ok | |
defp put_metrics(metrics), do: GenServer.cast(__MODULE__, {:put_metrics, metrics}) | |
defp do_send_metrics([]), do: :ok | |
defp do_send_metrics(metrics) do | |
for batch <- metrics |> Enum.reverse() |> Enum.chunk_every(20) do | |
request = Cloudwatch.put_metric_data(batch, @namespace) | |
case ExAws.request(request) do | |
{:ok, _result} -> | |
:ok | |
{:error, error} = reply -> | |
Logger.warn(fn -> "Failed to put CloudWatch metric data due to: " <> inspect(error) end) | |
reply | |
end | |
end | |
end | |
# Send metrics to CloudWatch with at least a one minute delay. | |
defp enqueue_send_metrics_timer do | |
Process.send_after(self(), :send_metrics, :timer.minutes(1)) | |
end | |
defp module_name(struct) do | |
struct.__struct__ |> Module.split() |> Enum.at(-1) | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Environment do | |
def init do | |
case hostname() do | |
hostname when is_binary(hostname) -> :ok | |
nil -> Application.put_env(:my_app, :hostname, inet_hostname()) | |
end | |
end | |
def hostname do | |
Application.get_env(:my_app, :hostname) | |
end | |
defp inet_hostname do | |
{:ok, hostname} = :inet.gethostname() | |
to_string(hostname) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment