Created
September 21, 2021 15:26
-
-
Save HaloFour/6cac6653120da96fc1e8e1effca7b5b5 to your computer and use it in GitHub Desktop.
TracingOperator with Context
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.function.BiFunction; | |
import java.util.function.Function; | |
import io.opentelemetry.api.GlobalOpenTelemetry; | |
import io.opentelemetry.api.OpenTelemetry; | |
import io.opentelemetry.api.common.AttributeKey; | |
import io.opentelemetry.api.trace.Span; | |
import io.opentelemetry.api.trace.SpanBuilder; | |
import io.opentelemetry.api.trace.Tracer; | |
import io.opentelemetry.context.Scope; | |
import io.opentelemetry.sdk.OpenTelemetrySdk; | |
import io.opentelemetry.sdk.trace.ReadWriteSpan; | |
import io.opentelemetry.sdk.trace.ReadableSpan; | |
import io.opentelemetry.sdk.trace.SdkTracerProvider; | |
import io.opentelemetry.sdk.trace.SpanProcessor; | |
import org.reactivestreams.Publisher; | |
import org.reactivestreams.Subscription; | |
import reactor.core.CoreSubscriber; | |
import reactor.core.Scannable; | |
import reactor.core.publisher.Mono; | |
import reactor.core.publisher.Operators; | |
import reactor.util.context.Context; | |
public class Example { | |
private static final OpenTelemetry openTelemetry = configureOpenTelemetry(); | |
private static final Tracer tracer = openTelemetry.getTracer("test"); | |
private static final Mono<String> READ_ITEM = readItem(); | |
public static void main(String[] args) { | |
Span span = tracer.spanBuilder("parent").startSpan(); | |
try (Scope ignored = span.makeCurrent()) { | |
String item = READ_ITEM.block(); | |
System.out.println(item); | |
} | |
} | |
static Mono<String> readItem() { | |
return httpGet("https://mydatabase").map(v -> "item: " + v) | |
.transform(TracingOperator.withSpan("readItem")); | |
} | |
static Mono<String> httpGet(String url) { | |
return Mono.defer(() -> Mono.just("http response from " + url)) | |
.transform(TracingOperator.withSpan("httpGet", builder -> builder | |
.setAttribute("url", url) | |
)); | |
} | |
private static OpenTelemetry configureOpenTelemetry() { | |
return OpenTelemetrySdk.builder() | |
.setTracerProvider(SdkTracerProvider.builder() | |
.addSpanProcessor(new SpanProcessor() { | |
@Override | |
public void onStart(io.opentelemetry.context.Context parentContext, ReadWriteSpan span) { | |
} | |
@Override | |
public boolean isStartRequired() { | |
return false; | |
} | |
@Override | |
public void onEnd(ReadableSpan span) { | |
System.out.println(span); | |
} | |
@Override | |
public boolean isEndRequired() { | |
return true; | |
} | |
}) | |
.build()) | |
.buildAndRegisterGlobal(); | |
} | |
} | |
final class TracingOperator<T> implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> { | |
private static final String REACTOR_OTEL_CONTEXT_KEY = "otel-context"; | |
public static <T> Function<Publisher<T>, Publisher<T>> withSpan(String spanName) { | |
return withSpan(spanName, Function.identity()); | |
} | |
public static <T> Function<Publisher<T>, Publisher<T>> withSpan(String spanName, Function<SpanBuilder, SpanBuilder> builder) { | |
return (Function<Publisher<T>, Publisher<T>>) Operators.lift(new TracingOperator<T>(spanName, builder)); | |
} | |
private final Tracer tracer; | |
private final String spanName; | |
private final Function<SpanBuilder, SpanBuilder> builder; | |
private final io.opentelemetry.context.Context assemblyContext; | |
private TracingOperator(String spanName, Function<SpanBuilder, SpanBuilder> builder) { | |
this.tracer = GlobalOpenTelemetry.getTracer("reactor"); | |
this.spanName = spanName; | |
this.builder = builder; | |
this.assemblyContext = io.opentelemetry.context.Context.current(); | |
} | |
@Override | |
public CoreSubscriber<? super T> apply(Scannable scannable, CoreSubscriber<? super T> subscriber) { | |
Context subscriberContext = subscriber.currentContext(); | |
io.opentelemetry.context.Context parentContext = figureOutParentContext(subscriberContext); | |
Span span = builder.apply(tracer.spanBuilder(spanName).setParent(parentContext)).startSpan(); | |
io.opentelemetry.context.Context spanContext = parentContext.with(span); | |
subscriberContext = subscriberContext.put(REACTOR_OTEL_CONTEXT_KEY, spanContext); | |
return new TracingSubscriber<>(subscriber, subscriberContext, spanContext, span); | |
} | |
/** | |
* We have three potential OTel contexts here and we need to work out which is the best one to use. | |
* | |
* <ul> | |
* <li>The current OTel context</li> | |
* <li>The OTel Context stored in the Reactor Context</li> | |
* <li>The OTel context captured during assembly</li> | |
* </ul> | |
*/ | |
private io.opentelemetry.context.Context figureOutParentContext(Context subscriberContext) { | |
if (subscriberContext.hasKey(REACTOR_OTEL_CONTEXT_KEY)) { | |
return subscriberContext.get(REACTOR_OTEL_CONTEXT_KEY); | |
} | |
return io.opentelemetry.context.Context.current(); | |
} | |
} | |
final class TracingSubscriber<T> implements CoreSubscriber<T>, Subscription { | |
private static final AttributeKey<Boolean> REACTOR_CANCELED_ATTRIBUTE_KEY = AttributeKey.booleanKey("reactor.canceled"); | |
private final CoreSubscriber<T> subscriber; | |
private final Context subscriberContext; | |
private final io.opentelemetry.context.Context otelContext; | |
private final Span span; | |
private Subscription subscription; | |
public TracingSubscriber(CoreSubscriber<T> subscriber, Context subscriberContext, io.opentelemetry.context.Context otelContext, Span span) { | |
this.subscriber = subscriber; | |
this.subscriberContext = subscriberContext; | |
this.otelContext = otelContext; | |
this.span = span; | |
} | |
@Override | |
public synchronized void onSubscribe(Subscription subscription) { | |
if (Operators.validate(this.subscription, subscription)) { | |
this.subscription = subscription; | |
try (Scope scope = this.otelContext.makeCurrent()) { | |
subscriber.onSubscribe(this); | |
} | |
} | |
} | |
@Override | |
public void request(long n) { | |
if (subscription != null) { | |
try (Scope scope = this.otelContext.makeCurrent()) { | |
subscription.request(n); | |
} | |
} | |
} | |
@Override | |
public void cancel() { | |
if (subscription != null) { | |
subscription.cancel(); | |
if (span != null) { | |
span.setAttribute(REACTOR_CANCELED_ATTRIBUTE_KEY, true) | |
.end(); | |
} | |
} | |
} | |
@Override | |
public void onNext(T t) { | |
try (Scope scope = otelContext.makeCurrent()) { | |
subscriber.onNext(t); | |
} | |
} | |
@Override | |
public void onError(Throwable t) { | |
if (span != null) { | |
span.recordException(t) | |
.end(); | |
} | |
try (Scope scope = otelContext.makeCurrent()) { | |
subscriber.onError(t); | |
} | |
} | |
@Override | |
public void onComplete() { | |
if (span != null) { | |
span.end(); | |
} | |
try (Scope scope = otelContext.makeCurrent()) { | |
subscriber.onComplete(); | |
} | |
} | |
@Override | |
public Context currentContext() { | |
return this.subscriberContext; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment