Last active
April 30, 2025 20:48
-
-
Save maxlapshin/bafa13a0f17810a69dd893100da57be7 to your computer and use it in GitHub Desktop.
Opentelemetry tracing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(otel). | |
-include_lib("kernel/include/logger.hrl"). | |
-include_lib("corelib/include/corelib_openapi.hrl"). | |
-export([start_trace/0, start_trace/1]). | |
-export([end_trace/0]). | |
-export([start_span/0, start_span/1]). | |
-export([end_span/0, end_span/1]). | |
-export([setattrs/1, setname/1]). | |
-export([get_context/0, set_context/1]). | |
-export([clear_context/0]). | |
-export([traceparent/0]). | |
-export([lhttpc_header/0]). | |
-export([get_stats/0]). | |
-export([load_config/1]). | |
-export([is_enabled/0, is_overloaded/0]). | |
-export([sampling_allows/0, duration_threshold/0, service_name/0]). | |
-export([get_trace/1]). | |
% tests | |
-export([reset/0]). | |
% https://www.w3.org/TR/trace-context/#version-format | |
-define(TRACEPARENT_VERSION, "00"). | |
% https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/collector/trace/v1/trace_service.proto | |
% https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto | |
-type trace_id() :: binary(). | |
-type span_id() :: binary(). | |
-type span_kind() :: server | client | producer | consumer | internal. | |
load_config(#{} = Env) -> | |
EnvURL = application:get_env(corelib, env_opentelemetry_url, undefined), | |
ConfigURL = maps:get(opentelemetry_url, Env, undefined), | |
NewURL = case {ConfigURL, EnvURL} of | |
{undefined, EnvURL} when is_binary(EnvURL) -> | |
?LOG_INFO("Environment opentelemetry url is used: ~p", [EnvURL]), | |
EnvURL; | |
_ -> | |
ConfigURL | |
end, | |
CurrentURL = application:get_env(corelib, opentelemetry_url, undefined), | |
case CurrentURL of | |
NewURL -> | |
do_nothing; | |
_ -> | |
application:set_env(corelib, opentelemetry_url, NewURL), | |
gen_server:call(otel_collector, update), | |
otel_stats:reset() | |
end, | |
ok. | |
is_enabled() -> | |
persistent_term:get(otel_enabled, false). | |
is_overloaded() -> | |
persistent_term:get(otel_overloaded, false). | |
service_name() -> | |
persistent_term:get(otel_service_name, <<"streamer">>). | |
sampling_allows() -> | |
case persistent_term:get(otel_sampling, undefined) of | |
undefined -> | |
true; | |
SamplingPercent -> | |
rand:uniform(100) =< SamplingPercent | |
end. | |
duration_threshold() -> | |
persistent_term:get(otel_duration_threshold, undefined). | |
-spec start_trace() -> trace_id() | false. | |
start_trace() -> | |
Enabled = is_enabled(), | |
Overloaded = is_overloaded(), | |
SamplingAllows = sampling_allows(), | |
Ready = Enabled andalso not Overloaded andalso SamplingAllows, | |
case Ready of | |
true -> | |
T = erlang:system_time(seconds), | |
TraceId = hexify:hex(<<T:32, (rand:uniform(1 bsl 32)):32, (rand:uniform(1 bsl 32)):32, (rand:uniform(1 bsl 32)):32>>), | |
put('$otel_trace_id', TraceId), | |
put('$otel_span', []), | |
TraceId; | |
false -> | |
Counter = case {Overloaded, SamplingAllows} of | |
{true, _} -> #{drop_overload_count => 1}; | |
{_, false} -> #{drop_sampling_count => 1}; | |
_ -> #{} | |
end, | |
otel_stats:put_counters(Counter), | |
false | |
end. | |
-spec start_trace(binary()) -> trace_id() | false. | |
start_trace(<<?TRACEPARENT_VERSION, "-",TraceParent/binary>>) -> | |
case is_enabled() of | |
true -> | |
case binary:split(TraceParent, <<"-">>, [global]) of | |
[TraceId, ParentSpanId | _] -> | |
set_context([#{traceId => TraceId, spanId => ParentSpanId}]), | |
TraceId; | |
_ -> | |
start_trace() | |
end; | |
false -> | |
false | |
end; | |
start_trace(undefined) -> | |
start_trace(). | |
-spec end_trace() -> ok. | |
end_trace() -> | |
case get('$otel_trace_id') of | |
undefined -> | |
ok; | |
TraceId -> | |
otel_collector:save_trace(TraceId) | |
end, | |
clear_context(), | |
ok. | |
-spec start_span() -> span_id() | false. | |
start_span() -> | |
start_span(#{}). | |
% #{kind => .., name => ..} | |
-spec start_span(#{kind => span_kind(), name => binary(), attributes => #{_ => _}}) -> span_id() | false. | |
start_span(#{} = Span) -> | |
case get('$otel_trace_id') of | |
undefined -> | |
false; | |
TraceId -> | |
T = erlang:system_time(micro_seconds), | |
SpanId = hexify:hex(<<(T div 1000000):32, (T rem 1000000):32>>), | |
Spans = get('$otel_span'), | |
ParentSpanId = case Spans of | |
[#{spanId := PSI}|_] -> PSI; | |
[] -> undefined | |
end, | |
Span1 = Span#{ | |
startTimeUnixNano => erlang:system_time(nano_seconds), | |
spanId => SpanId, | |
parentSpanId => ParentSpanId, | |
attributes => maps:get(attributes, Span, #{}), | |
traceId => TraceId | |
}, | |
put('$otel_span', [Span1|Spans]), | |
SpanId | |
end. | |
traceparent() -> | |
case get('$otel_span') of | |
[#{spanId := SpanId, traceId := TraceId}|_] -> | |
<<"00-",TraceId/binary,"-",SpanId/binary,"-01">>; | |
_ -> | |
undefined | |
end. | |
lhttpc_header() -> | |
case traceparent() of | |
undefined -> []; | |
Traceparent -> [{"Traceparent", binary_to_list(Traceparent)}] | |
end. | |
get_context() -> | |
case get('$otel_span') of | |
[#{spanId := _SpanId, traceId := _TraceId}|_] = Context -> | |
Context; | |
_ -> | |
undefined | |
end. | |
set_context([#{traceId := TraceId, spanId := _}|_] = Context) -> | |
put('$otel_trace_id', TraceId), | |
put('$otel_span', Context); | |
set_context(undefined) -> | |
ok. | |
clear_context() -> | |
erase('$otel_trace_id'), | |
erase('$otel_span'), | |
ok. | |
end_span() -> | |
end_span(undefined). | |
end_span(Status) -> | |
case get('$otel_span') of | |
[CurrentSpan|Spans] -> | |
Span1 = CurrentSpan#{ | |
endTimeUnixNano => erlang:system_time(nano_seconds) | |
}, | |
Span2 = case Status of | |
undefined -> Span1#{status => #{}}; | |
ok -> Span1#{status => #{code => ok}}; | |
_ when is_binary(Status) -> Span1#{status => #{code => error, message => Status}}; | |
_ -> Span1#{status => #{code => error, message => iolist_to_binary(io_lib:format("~p",[Status]))}} | |
end, | |
otel_collector:save_span(Span2), | |
put('$otel_span', Spans), | |
ok; | |
_ -> | |
ok | |
end. | |
setattrs(#{} = NewAttrs) -> | |
case get('$otel_span') of | |
undefined -> | |
false; | |
[#{attributes := Attrs} = Span|Spans] -> | |
NewAttrs2 = maps:fold(fun(K,V,Acc) -> | |
K1 = case K of | |
_ when is_binary(K) -> K; | |
_ when is_atom(K) -> atom_to_binary(K,latin1) | |
end, | |
V1 = case V of | |
_ when is_binary(V) -> V; | |
[L|_] when is_integer(L) -> iolist_to_binary(V); | |
_ when is_number(V) -> V; | |
_ when V == true; V == false -> V; | |
_ when is_atom(V) -> atom_to_binary(V,latin1) | |
end, | |
Acc#{K1 => V1} | |
end, #{}, NewAttrs), | |
Attrs2 = maps:merge(Attrs, NewAttrs2), | |
Span1 = Span#{attributes => Attrs2}, | |
put('$otel_span', [Span1|Spans]) | |
end. | |
setname(Name) -> | |
case get('$otel_span') of | |
[#{} = Span|Spans] -> | |
put('$otel_span', [Span#{name => Name}|Spans]), | |
ok; | |
_ -> | |
false | |
end. | |
get_trace(TraceId) -> | |
otel_collector:read(TraceId). | |
-spec get_stats() -> opentelemetry_stats_map(). | |
get_stats() -> | |
otel_stats:get_stats(). | |
reset() -> | |
otel_collector:reset(), | |
otel_stats:reset(). |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(otel_collector). | |
-include_lib("kernel/include/logger.hrl"). | |
-include_lib("corelib/include/corelib_openapi.hrl"). | |
-export([save_span/1, save_trace/1, read/1]). | |
-export([flush/0]). | |
-export([start_link/0]). | |
-export([init/1, handle_call/3, handle_info/2, terminate/2]). | |
% tests | |
-export([reset/0]). | |
save_span(#{spanId := _} = Span) -> | |
gen_server:call(otel_collector, Span), | |
ok. | |
save_trace(TraceId) -> | |
gen_server:call(otel_collector, {save_trace, TraceId}), | |
ok. | |
read(TraceId) -> | |
Spans = [S || {_,T,S} <- ets:tab2list(otel_spans), T == TraceId], | |
case Spans of | |
[] -> undefined; | |
_ -> #{spans => Spans} | |
end. | |
-spec flush() -> opentelemetry_stats_map(). | |
flush() -> | |
{ok, URL, Spans} = gen_server:call(otel_collector, flush), | |
case Spans of | |
[_|_] when URL =/= undefined -> upload(URL, Spans); | |
_ -> #{} | |
end. | |
-spec upload(url(), list()) -> opentelemetry_stats_map(). | |
upload(URL, Spans) -> | |
Spans1 = lists:map(fun(#{attributes := A} = S) -> | |
A1 = [#{key => K,value => case V of | |
_ when is_binary(V) -> #{stringValue => V}; | |
_ when is_integer(V) -> #{intValue => V}; | |
_ when V == true; V == false -> #{boolValue => V}; | |
_ when is_atom(V) -> #{stringValue => atom_to_binary(V,latin1)}; | |
_ when is_number(V) -> #{doubleValue => V}; | |
_ -> #{stringValue => iolist_to_binary(io_lib:format("~p",V))} | |
end} || {K,V} <- maps:to_list(A), V =/= undefined], | |
S1 = S#{attributes => A1}, | |
S2 = case S1 of | |
#{status := #{code := ok}} -> S1#{status => #{code => 1}}; | |
#{status := #{code := error} = St} -> S1#{status => St#{code => 2}}; | |
_ -> S1 | |
end, | |
S3 = case S2 of | |
#{kind := internal} -> S2#{kind => 1}; | |
#{kind := server} -> S2#{kind => 2}; | |
#{kind := client} -> S2#{kind => 3}; | |
#{kind := producer} -> S2#{kind => 4}; | |
#{kind := consumer} -> S2#{kind => 5}; | |
#{} -> S2#{kind => 1} | |
end, | |
S4 = S3#{ | |
traceId => maps:get(traceId,S3), | |
spanId => maps:get(spanId,S3), | |
parentSpanId => case maps:get(parentSpanId,S3,undefined) of | |
undefined -> <<>>; | |
PSpanId -> PSpanId | |
end | |
}, | |
S4 | |
end, Spans), | |
Packet = #{ | |
resourceSpans => [#{ | |
resource => #{ | |
attributes => [ | |
#{key => <<"service.name">>, value => #{stringValue => otel:service_name()}}, | |
#{key => <<"service.instance.id">>, value => #{stringValue => corelib:instance_id()}}, | |
#{key => <<"host.name">>, value => #{stringValue => corelib:hostname()}}, | |
#{key => <<"service.version">>, value => #{stringValue => base:version(describe)}} | |
] | |
}, | |
scopeSpans => [#{ | |
spans => Spans1 | |
}] | |
}] | |
}, | |
JSON = corejson:encode(Packet), | |
% URL may be "http://localhost:4318/v1/traces" | |
case lhttpc:request(URL, post, [{"Content-Type", "application/json"}], JSON, 5000) of | |
{ok, {{Code,_}, _, Body}} -> | |
Json = | |
try json:decode(Body) | |
catch _:_:_ -> | |
?LOG_DEBUG("Unable to decode json: ~p", [binary:part(Body, 0, min(byte_size(Body), 30))]), | |
#{} | |
end, | |
Msg = maps:get(<<"message">>, Json, <<>>), | |
?LOG_DEBUG(#{code => Code, spans => length(Spans), response => Msg}), | |
case Code of | |
200 -> #{sent_count => length(Spans)}; | |
_ -> #{drop_network_count => length(Spans)} | |
end; | |
{error ,E} -> | |
?LOG_INFO(#{error => E}), | |
#{drop_network_count => length(Spans)} | |
end. | |
reset() -> | |
gen_server:call(otel_collector, reset), | |
reset_persistent_terms(), | |
ets:delete_all_objects(otel_spans). | |
reset_persistent_terms() -> | |
persistent_term:erase(otel_enabled), | |
persistent_term:erase(otel_overloaded), | |
persistent_term:erase(otel_sampling), | |
persistent_term:erase(otel_service_name), | |
persistent_term:erase(otel_duration_threshold). | |
start_link() -> | |
gen_server:start_link({local,?MODULE}, ?MODULE, [], []). | |
-record(collector, { | |
enabled = false, | |
overloaded = false, | |
url, | |
sampling, | |
duration_threshold, | |
span_count = 0, | |
rate_limit, | |
limit_timer | |
}). | |
init([]) -> | |
ets:new(otel_spans, [public,named_table]), | |
LimitTimer = erlang:send_after(1000, self(), flush_limit_timer), | |
Collector = #collector{ | |
limit_timer = LimitTimer | |
}, | |
{ok, Collector}. | |
read_url(undefined, Collector) -> | |
Collector#collector{ | |
url = undefined | |
}; | |
read_url(FullURL, Collector) -> | |
Q = uri_string:parse(iolist_to_binary(FullURL)), | |
Query = maps:get(query, Q, <<>>), | |
URL = uri_string:recompose(maps:remove(query,Q)), | |
Params = uri_string:dissect_query(Query), | |
RateLimit = case proplists:get_value(<<"rate_limit">>, Params) of | |
undefined -> undefined; | |
RL -> binary_to_integer(RL) | |
end, | |
Sampling = case proplists:get_value(<<"sampling">>, Params) of | |
undefined -> undefined; | |
SL -> binary_to_integer(SL) | |
end, | |
DurationThreshold = case proplists:get_value(<<"duration_threshold">>, Params) of | |
undefined -> undefined; | |
DT -> binary_to_integer(DT) | |
end, | |
persistent_term:put(otel_sampling, Sampling), | |
persistent_term:put(otel_duration_threshold, Sampling), | |
ServiceName = proplists:get_value(<<"service_name">>, Params, <<"streamer">>), | |
persistent_term:put(otel_service_name, ServiceName), | |
Collector#collector{ | |
url = URL, | |
rate_limit = RateLimit, | |
sampling = Sampling, | |
duration_threshold = DurationThreshold | |
}. | |
handle_call(update, _, #collector{} = Collector0) -> | |
URL = application:get_env(corelib, opentelemetry_url, undefined), | |
Collector1 = read_url(URL, Collector0), | |
Collector = set_enabled_flag(Collector1), | |
{reply, ok, Collector#collector{span_count = 0}}; | |
handle_call(flush, _, #collector{url = URL} = Collector) -> | |
Spans = [S || {_,_,S} <- ets:tab2list(otel_spans)], | |
ets:delete_all_objects(otel_spans), | |
{reply, {ok, URL, Spans}, Collector}; | |
handle_call({save_trace, _TraceId}, _, #collector{} = Collector) -> | |
{reply, ok, Collector}; | |
handle_call(#{spanId := _} = Span, _, #collector{} = Collector) -> | |
Collector1 = save_span1(Span, Collector), | |
{reply, ok, Collector1}; | |
handle_call(reset, _, #collector{limit_timer = Timer}) -> | |
{reply, ok, #collector{limit_timer = Timer}}. | |
handle_info(flush_limit_timer, #collector{limit_timer = OldTimer} = Collector) -> | |
is_reference(OldTimer) andalso erlang:cancel_timer(OldTimer), | |
T = erlang:system_time(milli_seconds), | |
T1 = (T div 1000)*1000 + 1500 - T, % Считаем время до середины следующей секунды | |
LimitTimer = erlang:send_after(T1, self(), flush_limit_timer), | |
{noreply, set_enabled_flag(Collector#collector{limit_timer = LimitTimer, span_count = 0})}; | |
handle_info(#{spanId := _} = Span, #collector{} = Collector) -> | |
Collector1 = save_span1(Span, Collector), | |
{noreply, Collector1}. | |
set_enabled_flag(#collector{url = URL, enabled = Enabled, overloaded = Overloaded} = Collector) -> | |
RateLimit = Collector#collector.rate_limit, | |
SpanCount = Collector#collector.span_count, | |
RateLimitExhausted = is_integer(RateLimit) andalso SpanCount >= RateLimit, | |
case RateLimitExhausted of | |
Overloaded -> ok; | |
_ -> persistent_term:put(otel_overloaded, RateLimitExhausted) | |
end, | |
NewEnabled = URL =/= undefined, | |
case NewEnabled of | |
Enabled -> ok; | |
_ -> persistent_term:put(otel_enabled, NewEnabled) | |
end, | |
Collector#collector{enabled = NewEnabled, overloaded = RateLimitExhausted}. | |
save_span1(#{spanId := SpanId, traceId := TraceId} = Span, #collector{span_count = SC} = Collector) -> | |
ets:insert(otel_spans, {SpanId, TraceId, Span}), | |
Collector1 = set_enabled_flag(Collector#collector{span_count = SC+1}), | |
Collector1. | |
terminate(_,_) -> | |
ok. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(otel_stats). | |
-export([start_link/0]). | |
-include_lib("kernel/include/logger.hrl"). | |
-include_lib("corelib/include/corelib_openapi.hrl"). | |
-export([init/1, handle_info/2, handle_call/3, terminate/2, get_stats/0, put_counters/1]). | |
% tests | |
-export([reset/0]). | |
start_link() -> | |
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). | |
-spec get_stats() -> opentelemetry_stats_map(). | |
get_stats() -> | |
gen_server:call(otel_stats, get_stats). | |
-spec put_counters(opentelemetry_stats_map()) -> {counters, opentelemetry_stats_map()}. | |
put_counters(#{} = Counters) -> | |
otel_stats ! {counters, Counters}. | |
reset() -> | |
gen_server:call(otel_stats, reset). | |
-record(stats, { | |
counters = #{} | |
}). | |
init([]) -> | |
{ok, #stats{}}. | |
handle_call(get_stats, _, #stats{counters = Counters} = State) -> | |
{reply, Counters, State}; | |
handle_call(reset, _, #stats{} = State) -> | |
{reply, ok, State#stats{counters = #{}}}. | |
handle_info({counters, Counters}, #stats{counters = Counters0} = State) -> | |
{noreply, State#stats{counters = accumulate_counters(Counters0, Counters)}}; | |
handle_info(Msg, #stats{} = State) -> | |
?LOG_INFO(#{unknown_msg => Msg}), | |
{noreply, State}. | |
terminate(_,_) -> | |
ok. | |
-spec accumulate_counters(opentelemetry_stats_map(), opentelemetry_stats_map()) -> opentelemetry_stats_map(). | |
accumulate_counters(#{} = InCounters, #{} = Counters) -> | |
maps:fold(fun | |
(K,V,Acc) -> Acc#{K => V + maps:get(K,Acc,0)} | |
end, Counters, InCounters). | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(otel_sup). | |
-behaviour(supervisor). | |
-export([start_link/0]). | |
-export([init/1]). | |
start_link() -> | |
supervisor:start_link({local, ?MODULE}, ?MODULE, []). | |
init([]) -> | |
{ok, { {one_for_one, 5, 10}, [ | |
{otel_collector,{otel_collector, start_link, []}, transient, 100, worker, [] }, | |
{otel_uploader, {otel_uploader, start_link, []}, transient, 100, worker, [] }, | |
{otel_stats, {otel_stats, start_link, []}, transient, 100, worker, [] } | |
]}}. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(otel_uploader). | |
-export([start_link/0]). | |
-include_lib("kernel/include/logger.hrl"). | |
-define(TIMEOUT, 5000). | |
-export([init/1, handle_info/2, handle_call/3, terminate/2]). | |
% tests | |
-export([flush/0]). | |
start_link() -> | |
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). | |
flush() -> | |
gen_server:call(otel_uploader, flush). | |
-record(uploader, { | |
timer | |
}). | |
init([]) -> | |
T = erlang:send_after(?TIMEOUT, self(), flush), | |
{ok, #uploader{timer = T}}. | |
handle_call(flush, _, #uploader{} = Uploader0) -> | |
{noreply, Uploader} = handle_info(flush, Uploader0), | |
{reply, ok, Uploader}. | |
handle_info(flush, #uploader{timer = T0} = Uploader) -> | |
erlang:cancel_timer(T0), | |
Counters = otel_collector:flush(), | |
otel_stats:put_counters(Counters), | |
T = erlang:send_after(?TIMEOUT, self(), flush), | |
{noreply, Uploader#uploader{timer = T}}; | |
handle_info(Msg, #uploader{} = Uploader) -> | |
?LOG_INFO(#{unknown_msg => Msg}), | |
{noreply, Uploader}. | |
terminate(_,_) -> | |
ok. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment