Skip to content

Instantly share code, notes, and snippets.

@maxlapshin
Last active April 30, 2025 20:48
Show Gist options
  • Save maxlapshin/bafa13a0f17810a69dd893100da57be7 to your computer and use it in GitHub Desktop.
Save maxlapshin/bafa13a0f17810a69dd893100da57be7 to your computer and use it in GitHub Desktop.
Opentelemetry tracing
-module(otel).
-include_lib("kernel/include/logger.hrl").
-include_lib("corelib/include/corelib_openapi.hrl").
-export([start_trace/0, start_trace/1]).
-export([end_trace/0]).
-export([start_span/0, start_span/1]).
-export([end_span/0, end_span/1]).
-export([setattrs/1, setname/1]).
-export([get_context/0, set_context/1]).
-export([clear_context/0]).
-export([traceparent/0]).
-export([lhttpc_header/0]).
-export([get_stats/0]).
-export([load_config/1]).
-export([is_enabled/0, is_overloaded/0]).
-export([sampling_allows/0, duration_threshold/0, service_name/0]).
-export([get_trace/1]).
% tests
-export([reset/0]).
% https://www.w3.org/TR/trace-context/#version-format
-define(TRACEPARENT_VERSION, "00").
% https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/collector/trace/v1/trace_service.proto
% https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto
-type trace_id() :: binary().
-type span_id() :: binary().
-type span_kind() :: server | client | producer | consumer | internal.
load_config(#{} = Env) ->
EnvURL = application:get_env(corelib, env_opentelemetry_url, undefined),
ConfigURL = maps:get(opentelemetry_url, Env, undefined),
NewURL = case {ConfigURL, EnvURL} of
{undefined, EnvURL} when is_binary(EnvURL) ->
?LOG_INFO("Environment opentelemetry url is used: ~p", [EnvURL]),
EnvURL;
_ ->
ConfigURL
end,
CurrentURL = application:get_env(corelib, opentelemetry_url, undefined),
case CurrentURL of
NewURL ->
do_nothing;
_ ->
application:set_env(corelib, opentelemetry_url, NewURL),
gen_server:call(otel_collector, update),
otel_stats:reset()
end,
ok.
is_enabled() ->
persistent_term:get(otel_enabled, false).
is_overloaded() ->
persistent_term:get(otel_overloaded, false).
service_name() ->
persistent_term:get(otel_service_name, <<"streamer">>).
sampling_allows() ->
case persistent_term:get(otel_sampling, undefined) of
undefined ->
true;
SamplingPercent ->
rand:uniform(100) =< SamplingPercent
end.
duration_threshold() ->
persistent_term:get(otel_duration_threshold, undefined).
-spec start_trace() -> trace_id() | false.
start_trace() ->
Enabled = is_enabled(),
Overloaded = is_overloaded(),
SamplingAllows = sampling_allows(),
Ready = Enabled andalso not Overloaded andalso SamplingAllows,
case Ready of
true ->
T = erlang:system_time(seconds),
TraceId = hexify:hex(<<T:32, (rand:uniform(1 bsl 32)):32, (rand:uniform(1 bsl 32)):32, (rand:uniform(1 bsl 32)):32>>),
put('$otel_trace_id', TraceId),
put('$otel_span', []),
TraceId;
false ->
Counter = case {Overloaded, SamplingAllows} of
{true, _} -> #{drop_overload_count => 1};
{_, false} -> #{drop_sampling_count => 1};
_ -> #{}
end,
otel_stats:put_counters(Counter),
false
end.
-spec start_trace(binary()) -> trace_id() | false.
start_trace(<<?TRACEPARENT_VERSION, "-",TraceParent/binary>>) ->
case is_enabled() of
true ->
case binary:split(TraceParent, <<"-">>, [global]) of
[TraceId, ParentSpanId | _] ->
set_context([#{traceId => TraceId, spanId => ParentSpanId}]),
TraceId;
_ ->
start_trace()
end;
false ->
false
end;
start_trace(undefined) ->
start_trace().
-spec end_trace() -> ok.
end_trace() ->
case get('$otel_trace_id') of
undefined ->
ok;
TraceId ->
otel_collector:save_trace(TraceId)
end,
clear_context(),
ok.
-spec start_span() -> span_id() | false.
start_span() ->
start_span(#{}).
% #{kind => .., name => ..}
-spec start_span(#{kind => span_kind(), name => binary(), attributes => #{_ => _}}) -> span_id() | false.
start_span(#{} = Span) ->
case get('$otel_trace_id') of
undefined ->
false;
TraceId ->
T = erlang:system_time(micro_seconds),
SpanId = hexify:hex(<<(T div 1000000):32, (T rem 1000000):32>>),
Spans = get('$otel_span'),
ParentSpanId = case Spans of
[#{spanId := PSI}|_] -> PSI;
[] -> undefined
end,
Span1 = Span#{
startTimeUnixNano => erlang:system_time(nano_seconds),
spanId => SpanId,
parentSpanId => ParentSpanId,
attributes => maps:get(attributes, Span, #{}),
traceId => TraceId
},
put('$otel_span', [Span1|Spans]),
SpanId
end.
traceparent() ->
case get('$otel_span') of
[#{spanId := SpanId, traceId := TraceId}|_] ->
<<"00-",TraceId/binary,"-",SpanId/binary,"-01">>;
_ ->
undefined
end.
lhttpc_header() ->
case traceparent() of
undefined -> [];
Traceparent -> [{"Traceparent", binary_to_list(Traceparent)}]
end.
get_context() ->
case get('$otel_span') of
[#{spanId := _SpanId, traceId := _TraceId}|_] = Context ->
Context;
_ ->
undefined
end.
set_context([#{traceId := TraceId, spanId := _}|_] = Context) ->
put('$otel_trace_id', TraceId),
put('$otel_span', Context);
set_context(undefined) ->
ok.
clear_context() ->
erase('$otel_trace_id'),
erase('$otel_span'),
ok.
end_span() ->
end_span(undefined).
end_span(Status) ->
case get('$otel_span') of
[CurrentSpan|Spans] ->
Span1 = CurrentSpan#{
endTimeUnixNano => erlang:system_time(nano_seconds)
},
Span2 = case Status of
undefined -> Span1#{status => #{}};
ok -> Span1#{status => #{code => ok}};
_ when is_binary(Status) -> Span1#{status => #{code => error, message => Status}};
_ -> Span1#{status => #{code => error, message => iolist_to_binary(io_lib:format("~p",[Status]))}}
end,
otel_collector:save_span(Span2),
put('$otel_span', Spans),
ok;
_ ->
ok
end.
setattrs(#{} = NewAttrs) ->
case get('$otel_span') of
undefined ->
false;
[#{attributes := Attrs} = Span|Spans] ->
NewAttrs2 = maps:fold(fun(K,V,Acc) ->
K1 = case K of
_ when is_binary(K) -> K;
_ when is_atom(K) -> atom_to_binary(K,latin1)
end,
V1 = case V of
_ when is_binary(V) -> V;
[L|_] when is_integer(L) -> iolist_to_binary(V);
_ when is_number(V) -> V;
_ when V == true; V == false -> V;
_ when is_atom(V) -> atom_to_binary(V,latin1)
end,
Acc#{K1 => V1}
end, #{}, NewAttrs),
Attrs2 = maps:merge(Attrs, NewAttrs2),
Span1 = Span#{attributes => Attrs2},
put('$otel_span', [Span1|Spans])
end.
setname(Name) ->
case get('$otel_span') of
[#{} = Span|Spans] ->
put('$otel_span', [Span#{name => Name}|Spans]),
ok;
_ ->
false
end.
get_trace(TraceId) ->
otel_collector:read(TraceId).
-spec get_stats() -> opentelemetry_stats_map().
get_stats() ->
otel_stats:get_stats().
reset() ->
otel_collector:reset(),
otel_stats:reset().
-module(otel_collector).
-include_lib("kernel/include/logger.hrl").
-include_lib("corelib/include/corelib_openapi.hrl").
-export([save_span/1, save_trace/1, read/1]).
-export([flush/0]).
-export([start_link/0]).
-export([init/1, handle_call/3, handle_info/2, terminate/2]).
% tests
-export([reset/0]).
save_span(#{spanId := _} = Span) ->
gen_server:call(otel_collector, Span),
ok.
save_trace(TraceId) ->
gen_server:call(otel_collector, {save_trace, TraceId}),
ok.
read(TraceId) ->
Spans = [S || {_,T,S} <- ets:tab2list(otel_spans), T == TraceId],
case Spans of
[] -> undefined;
_ -> #{spans => Spans}
end.
-spec flush() -> opentelemetry_stats_map().
flush() ->
{ok, URL, Spans} = gen_server:call(otel_collector, flush),
case Spans of
[_|_] when URL =/= undefined -> upload(URL, Spans);
_ -> #{}
end.
-spec upload(url(), list()) -> opentelemetry_stats_map().
upload(URL, Spans) ->
Spans1 = lists:map(fun(#{attributes := A} = S) ->
A1 = [#{key => K,value => case V of
_ when is_binary(V) -> #{stringValue => V};
_ when is_integer(V) -> #{intValue => V};
_ when V == true; V == false -> #{boolValue => V};
_ when is_atom(V) -> #{stringValue => atom_to_binary(V,latin1)};
_ when is_number(V) -> #{doubleValue => V};
_ -> #{stringValue => iolist_to_binary(io_lib:format("~p",V))}
end} || {K,V} <- maps:to_list(A), V =/= undefined],
S1 = S#{attributes => A1},
S2 = case S1 of
#{status := #{code := ok}} -> S1#{status => #{code => 1}};
#{status := #{code := error} = St} -> S1#{status => St#{code => 2}};
_ -> S1
end,
S3 = case S2 of
#{kind := internal} -> S2#{kind => 1};
#{kind := server} -> S2#{kind => 2};
#{kind := client} -> S2#{kind => 3};
#{kind := producer} -> S2#{kind => 4};
#{kind := consumer} -> S2#{kind => 5};
#{} -> S2#{kind => 1}
end,
S4 = S3#{
traceId => maps:get(traceId,S3),
spanId => maps:get(spanId,S3),
parentSpanId => case maps:get(parentSpanId,S3,undefined) of
undefined -> <<>>;
PSpanId -> PSpanId
end
},
S4
end, Spans),
Packet = #{
resourceSpans => [#{
resource => #{
attributes => [
#{key => <<"service.name">>, value => #{stringValue => otel:service_name()}},
#{key => <<"service.instance.id">>, value => #{stringValue => corelib:instance_id()}},
#{key => <<"host.name">>, value => #{stringValue => corelib:hostname()}},
#{key => <<"service.version">>, value => #{stringValue => base:version(describe)}}
]
},
scopeSpans => [#{
spans => Spans1
}]
}]
},
JSON = corejson:encode(Packet),
% URL may be "http://localhost:4318/v1/traces"
case lhttpc:request(URL, post, [{"Content-Type", "application/json"}], JSON, 5000) of
{ok, {{Code,_}, _, Body}} ->
Json =
try json:decode(Body)
catch _:_:_ ->
?LOG_DEBUG("Unable to decode json: ~p", [binary:part(Body, 0, min(byte_size(Body), 30))]),
#{}
end,
Msg = maps:get(<<"message">>, Json, <<>>),
?LOG_DEBUG(#{code => Code, spans => length(Spans), response => Msg}),
case Code of
200 -> #{sent_count => length(Spans)};
_ -> #{drop_network_count => length(Spans)}
end;
{error ,E} ->
?LOG_INFO(#{error => E}),
#{drop_network_count => length(Spans)}
end.
reset() ->
gen_server:call(otel_collector, reset),
reset_persistent_terms(),
ets:delete_all_objects(otel_spans).
reset_persistent_terms() ->
persistent_term:erase(otel_enabled),
persistent_term:erase(otel_overloaded),
persistent_term:erase(otel_sampling),
persistent_term:erase(otel_service_name),
persistent_term:erase(otel_duration_threshold).
start_link() ->
gen_server:start_link({local,?MODULE}, ?MODULE, [], []).
-record(collector, {
enabled = false,
overloaded = false,
url,
sampling,
duration_threshold,
span_count = 0,
rate_limit,
limit_timer
}).
init([]) ->
ets:new(otel_spans, [public,named_table]),
LimitTimer = erlang:send_after(1000, self(), flush_limit_timer),
Collector = #collector{
limit_timer = LimitTimer
},
{ok, Collector}.
read_url(undefined, Collector) ->
Collector#collector{
url = undefined
};
read_url(FullURL, Collector) ->
Q = uri_string:parse(iolist_to_binary(FullURL)),
Query = maps:get(query, Q, <<>>),
URL = uri_string:recompose(maps:remove(query,Q)),
Params = uri_string:dissect_query(Query),
RateLimit = case proplists:get_value(<<"rate_limit">>, Params) of
undefined -> undefined;
RL -> binary_to_integer(RL)
end,
Sampling = case proplists:get_value(<<"sampling">>, Params) of
undefined -> undefined;
SL -> binary_to_integer(SL)
end,
DurationThreshold = case proplists:get_value(<<"duration_threshold">>, Params) of
undefined -> undefined;
DT -> binary_to_integer(DT)
end,
persistent_term:put(otel_sampling, Sampling),
persistent_term:put(otel_duration_threshold, Sampling),
ServiceName = proplists:get_value(<<"service_name">>, Params, <<"streamer">>),
persistent_term:put(otel_service_name, ServiceName),
Collector#collector{
url = URL,
rate_limit = RateLimit,
sampling = Sampling,
duration_threshold = DurationThreshold
}.
handle_call(update, _, #collector{} = Collector0) ->
URL = application:get_env(corelib, opentelemetry_url, undefined),
Collector1 = read_url(URL, Collector0),
Collector = set_enabled_flag(Collector1),
{reply, ok, Collector#collector{span_count = 0}};
handle_call(flush, _, #collector{url = URL} = Collector) ->
Spans = [S || {_,_,S} <- ets:tab2list(otel_spans)],
ets:delete_all_objects(otel_spans),
{reply, {ok, URL, Spans}, Collector};
handle_call({save_trace, _TraceId}, _, #collector{} = Collector) ->
{reply, ok, Collector};
handle_call(#{spanId := _} = Span, _, #collector{} = Collector) ->
Collector1 = save_span1(Span, Collector),
{reply, ok, Collector1};
handle_call(reset, _, #collector{limit_timer = Timer}) ->
{reply, ok, #collector{limit_timer = Timer}}.
handle_info(flush_limit_timer, #collector{limit_timer = OldTimer} = Collector) ->
is_reference(OldTimer) andalso erlang:cancel_timer(OldTimer),
T = erlang:system_time(milli_seconds),
T1 = (T div 1000)*1000 + 1500 - T, % Считаем время до середины следующей секунды
LimitTimer = erlang:send_after(T1, self(), flush_limit_timer),
{noreply, set_enabled_flag(Collector#collector{limit_timer = LimitTimer, span_count = 0})};
handle_info(#{spanId := _} = Span, #collector{} = Collector) ->
Collector1 = save_span1(Span, Collector),
{noreply, Collector1}.
set_enabled_flag(#collector{url = URL, enabled = Enabled, overloaded = Overloaded} = Collector) ->
RateLimit = Collector#collector.rate_limit,
SpanCount = Collector#collector.span_count,
RateLimitExhausted = is_integer(RateLimit) andalso SpanCount >= RateLimit,
case RateLimitExhausted of
Overloaded -> ok;
_ -> persistent_term:put(otel_overloaded, RateLimitExhausted)
end,
NewEnabled = URL =/= undefined,
case NewEnabled of
Enabled -> ok;
_ -> persistent_term:put(otel_enabled, NewEnabled)
end,
Collector#collector{enabled = NewEnabled, overloaded = RateLimitExhausted}.
save_span1(#{spanId := SpanId, traceId := TraceId} = Span, #collector{span_count = SC} = Collector) ->
ets:insert(otel_spans, {SpanId, TraceId, Span}),
Collector1 = set_enabled_flag(Collector#collector{span_count = SC+1}),
Collector1.
terminate(_,_) ->
ok.
-module(otel_stats).
-export([start_link/0]).
-include_lib("kernel/include/logger.hrl").
-include_lib("corelib/include/corelib_openapi.hrl").
-export([init/1, handle_info/2, handle_call/3, terminate/2, get_stats/0, put_counters/1]).
% tests
-export([reset/0]).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec get_stats() -> opentelemetry_stats_map().
get_stats() ->
gen_server:call(otel_stats, get_stats).
-spec put_counters(opentelemetry_stats_map()) -> {counters, opentelemetry_stats_map()}.
put_counters(#{} = Counters) ->
otel_stats ! {counters, Counters}.
reset() ->
gen_server:call(otel_stats, reset).
-record(stats, {
counters = #{}
}).
init([]) ->
{ok, #stats{}}.
handle_call(get_stats, _, #stats{counters = Counters} = State) ->
{reply, Counters, State};
handle_call(reset, _, #stats{} = State) ->
{reply, ok, State#stats{counters = #{}}}.
handle_info({counters, Counters}, #stats{counters = Counters0} = State) ->
{noreply, State#stats{counters = accumulate_counters(Counters0, Counters)}};
handle_info(Msg, #stats{} = State) ->
?LOG_INFO(#{unknown_msg => Msg}),
{noreply, State}.
terminate(_,_) ->
ok.
-spec accumulate_counters(opentelemetry_stats_map(), opentelemetry_stats_map()) -> opentelemetry_stats_map().
accumulate_counters(#{} = InCounters, #{} = Counters) ->
maps:fold(fun
(K,V,Acc) -> Acc#{K => V + maps:get(K,Acc,0)}
end, Counters, InCounters).
-module(otel_sup).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
{ok, { {one_for_one, 5, 10}, [
{otel_collector,{otel_collector, start_link, []}, transient, 100, worker, [] },
{otel_uploader, {otel_uploader, start_link, []}, transient, 100, worker, [] },
{otel_stats, {otel_stats, start_link, []}, transient, 100, worker, [] }
]}}.
-module(otel_uploader).
-export([start_link/0]).
-include_lib("kernel/include/logger.hrl").
-define(TIMEOUT, 5000).
-export([init/1, handle_info/2, handle_call/3, terminate/2]).
% tests
-export([flush/0]).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
flush() ->
gen_server:call(otel_uploader, flush).
-record(uploader, {
timer
}).
init([]) ->
T = erlang:send_after(?TIMEOUT, self(), flush),
{ok, #uploader{timer = T}}.
handle_call(flush, _, #uploader{} = Uploader0) ->
{noreply, Uploader} = handle_info(flush, Uploader0),
{reply, ok, Uploader}.
handle_info(flush, #uploader{timer = T0} = Uploader) ->
erlang:cancel_timer(T0),
Counters = otel_collector:flush(),
otel_stats:put_counters(Counters),
T = erlang:send_after(?TIMEOUT, self(), flush),
{noreply, Uploader#uploader{timer = T}};
handle_info(Msg, #uploader{} = Uploader) ->
?LOG_INFO(#{unknown_msg => Msg}),
{noreply, Uploader}.
terminate(_,_) ->
ok.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment