Skip to content

Instantly share code, notes, and snippets.

@lmolkova
Last active September 27, 2021 19:35
Show Gist options
  • Save lmolkova/156a9e6453f5d3f8242689cfe6a6c07a to your computer and use it in GitHub Desktop.
Save lmolkova/156a9e6453f5d3f8242689cfe6a6c07a to your computer and use it in GitHub Desktop.
package org.example;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.exporter.jaeger.JaegerGrpcSpanExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.context.ContextView;
import reactor.util.retry.Retry;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.IntStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
public class Tests {
// tracing instrumentation
private <T> Mono<T> traceMono(Mono<T> publisher, String spanName) {
// this hack forces 'publisher' to run in the onNext callback of `TracingSubscriber`
// created for this publisher and with current() span that refer to span created here
// without it it runs in the parent scope
return DUMMY
.flatMap(i -> publisher)
.doOnEach((signal) -> {
if (signal.isOnError()) {
recordException(signal.getContextView(), signal.getThrowable());
endSpan(StatusCode.ERROR, signal.getContextView());
} else if (signal.isOnComplete()){
endSpan(StatusCode.UNSET, signal.getContextView());
}
})
.contextWrite(ctx -> startSpan(ctx, spanName));
}
private reactor.util.context.Context startSpan(reactor.util.context.Context subscriberContext, String name) {
io.opentelemetry.context.Context parent =
subscriberContext.getOrDefault("otel-context-key", io.opentelemetry.context.Context.current());
Span span = tracer.spanBuilder(name).setParent(parent).startSpan();
System.out.printf("Starting span '%s', Parent span id %s, started id - %s\n",
name,
Span.fromContext(parent).getSpanContext().getSpanId(),
span.getSpanContext().getSpanId());
return subscriberContext.put("otel-context-key", parent.with(span));
}
private void endSpan(StatusCode status, ContextView subscriberContext) {
io.opentelemetry.context.Context current =
subscriberContext.getOrDefault("otel-context-key", io.opentelemetry.context.Context.current());
Span span = Span.fromContext(current);
span.setStatus(status);
span.end();
System.out.printf("Ended span '%s', id %s\n", ((ReadableSpan)span).getName(), span.getSpanContext().getSpanId());
}
private void recordException(ContextView subscriberContext, Throwable t) {
io.opentelemetry.context.Context current =
subscriberContext.getOrDefault("otel-context-key", io.opentelemetry.context.Context.current());
Span span = Span.fromContext(current);
span.recordException(t);
}
// end of instrumentation
private Tracer tracer;
private TestExporter exporter;
private static SpanExporter jaeger = JaegerGrpcSpanExporter.builder()
.setEndpoint("http://localhost:14250")
.build();
private static Mono<String> DUMMY = null;
@BeforeAll
public static void before() {
TracingOperator2.registerOnEachOperator();
// has to capture DUMMY after hook is registered
DUMMY = Mono.just("1");
}
@AfterAll
public static void after() throws InterruptedException {
Thread.sleep(1000);
jaeger.shutdown();
}
@BeforeEach
public void setup(TestInfo info) {
exporter = new TestExporter();
SdkTracerProvider otelProvider = SdkTracerProvider.builder()
.addSpanProcessor(SimpleSpanProcessor.create(exporter))
.addSpanProcessor(SimpleSpanProcessor.create(jaeger)).build();
tracer = OpenTelemetrySdk.builder().setTracerProvider(otelProvider).build().getTracer(info.getDisplayName());
}
Mono<Span> outer(Mono<Span> inner) {
return inner.transform(n -> traceMono(n, "outer"));
}
private <T> T runTest(String testName, Mono<T> source) {
Span parent = tracer.spanBuilder(testName).startSpan();
try (Scope s = parent.makeCurrent()) {
return source.block();
} finally {
parent.end();
}
}
@Test
public void testJust() {
Mono<Span> source =
// oops, since it's just runs synchronously all the way, just is executed before
// parent span even starts
Mono
.just(Span.current()).transform(i -> traceMono(i, "innerJust"))
.transform(o -> traceMono(o, "outer"));
Span innerCurrent = runTest("testJust", source);
// !!!
assertFalse(innerCurrent.getSpanContext().isValid());
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("innerJust"));
assertIsParent(exporter.exportedSpans.get("testJust"), exporter.exportedSpans.get("outer"));
}
@Test
public void testDefer() {
Mono<Span> source =
// now it's deferred and happens in 'innerDefer' context
Mono.defer(() -> {
assertEquals("innerDefer", ((ReadableSpan)Span.current()).getName());
return Mono.just(Span.current());
}).transform(i -> traceMono(i, "innerDefer"))
.transform(o -> traceMono(o, "outer"));
Span innerCurrent = runTest("testDefer", source);
assertSpanEquals(innerCurrent, exporter.exportedSpans.get("innerDefer"));
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("innerDefer"));
assertIsParent(exporter.exportedSpans.get("testDefer"), exporter.exportedSpans.get("outer"));
}
@Test
public void testInnerNested() {
Mono<Span> source = outer(Mono.defer( () -> {
Span inner = tracer.spanBuilder("inner").startSpan();
// DON'T do!
// don't make spans current in reactor code, use mono.transform(m -> traceMono()) instead
// OR, if you make spans current in reactor code, close them and pass in the reactor context anyway
try (Scope s = inner.makeCurrent()) {
return Mono.fromCallable(() -> Span.current()).transform(d -> traceMono(d, "innerInner"))
.contextWrite(ctx -> {
// when it executes, nested is no longer current :(
// without explicit context passing, this example will not work: innerInner will be child of 'outer'
assertEquals("outer", ((ReadableSpan)Span.current()).getName());
return ctx.put("otel-context-key", io.opentelemetry.context.Context.current().with(inner));
});
} finally {
inner.end();
}
}));
Span innerCurrent = runTest("testInnerNested", source);
assertSpanEquals(innerCurrent, exporter.exportedSpans.get("innerInner"));
assertIsParent(exporter.exportedSpans.get("inner"), exporter.exportedSpans.get("innerInner"));
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("inner"));
assertIsParent(exporter.exportedSpans.get("testInnerNested"), exporter.exportedSpans.get("outer"));
}
@Test
public void testInnerNestedBlocking() {
Mono<Span> source = outer(Mono.defer( () -> {
Span inner = tracer.spanBuilder("inner").startSpan();
try (Scope s = inner.makeCurrent()) {
// sanity check - inner reactor code has no relation to outer
Mono.just(Span.current()).transform(d -> traceMono(d, "innerInner"))
.subscribe((span) -> assertEquals("inner", ((ReadableSpan)span).getName()));
return Mono.just(inner);
} finally {
inner.end();
}
}));
Span innerCurrent = runTest("testInnerNestedBlocking", source);
assertSpanEquals(innerCurrent, exporter.exportedSpans.get("inner"));
assertIsParent(exporter.exportedSpans.get("inner"), exporter.exportedSpans.get("innerInner"));
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("inner"));
assertIsParent(exporter.exportedSpans.get("testInnerNestedBlocking"), exporter.exportedSpans.get("outer"));
}
@Test
public void testDelay() {
Mono<Span> source = outer(Mono
.delay(Duration.ofMillis(1))
.flatMap(l -> Mono.just(Span.current()))
.transform(d -> traceMono(d, "innerDelay"))
);
Span innerCurrent = runTest("testDelay", source);
assertSpanEquals(innerCurrent, exporter.exportedSpans.get("innerDelay"));
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("innerDelay"));
assertIsParent(exporter.exportedSpans.get("testDelay"), exporter.exportedSpans.get("outer"));
}
@Test
public void testReuseMono() {
Mono<Span> source = outer(Mono
.defer(() -> Mono.just(Span.current()))
.transform(j -> traceMono(j, "inner")));
IntStream stream = IntStream.range(0, 10);
stream.parallel().forEach(i -> {
Span innerCurrent = source.block();
assertEquals("inner", ( (ReadableSpan)innerCurrent).getName());
});
assertEquals(20, exporter.exportedSpansById.size());
assertEquals(10, exporter.exportedSpansById.entrySet().stream().filter(e -> e.getValue().getName() == "inner").count());
assertEquals(10, exporter.exportedSpansById.entrySet().stream().filter(e -> e.getValue().getName() == "outer").count());
for (Map.Entry<String, SpanData> exported : exporter.exportedSpansById.entrySet()) {
SpanData parent = exporter.exportedSpansById.get(exported.getValue().getParentSpanId());
if (exported.getValue().getName() == "inner") {
assertEquals("outer", parent.getName());
} else {
assertNull(parent);
}
}
}
@Test
public void testRetries() {
AtomicReference<Integer> attempt = new AtomicReference<>(0);
Mono<Span> source = outer(
Mono.defer(() -> {
Span.current().updateName("inner" + attempt.get());
return (attempt.getAndUpdate(i -> i + 1) % 2 == 0)
? Mono.error(new TestException())
: Mono.just(Span.current());
})
.transform(t -> traceMono(t, "inner"))
.retryWhen(Retry.fixedDelay(3, Duration.ofMillis(1)).filter(t -> t instanceof TestException))
.transform(t -> traceMono(t, "innerWithRetries")));
Span innerCurrent = runTest("testRetries", source);
assertSpanEquals(innerCurrent, exporter.exportedSpans.get("inner1"));
assertIsParent(exporter.exportedSpans.get("innerWithRetries"), exporter.exportedSpans.get("inner0"));
assertIsParent(exporter.exportedSpans.get("innerWithRetries"), exporter.exportedSpans.get("inner1"));
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("innerWithRetries"));
assertEquals(StatusCode.ERROR, exporter.exportedSpans.get("inner0").getStatus().getStatusCode());
assertEquals(StatusCode.UNSET, exporter.exportedSpans.get("inner1").getStatus().getStatusCode());
assertEquals(StatusCode.UNSET, exporter.exportedSpans.get("innerWithRetries").getStatus().getStatusCode());
assertEquals(StatusCode.UNSET, exporter.exportedSpans.get("outer").getStatus().getStatusCode());
assertEquals(StatusCode.UNSET, exporter.exportedSpans.get("testRetries").getStatus().getStatusCode());
}
@Test
public void testZip() {
Mono<Span> inner = Mono
.delay(Duration.ofMillis(1)).transform(t -> traceMono(t, "delay"))
.zipWith(Mono.just("1").transform(t -> traceMono(t, "just")))
// NOTE: !! current is 'zip' span if Mono.defer or Mono.fromCallable is called.
// If Mono.just(Span.current()) was used, current would be the 'outer' span
.flatMap(i -> Mono.fromCallable(() -> Span.current()).transform(t -> traceMono(t, "zip")));
Mono<Span> source = outer(inner);
Span innerCurrent = runTest("testZip", source);
assertSpanEquals(innerCurrent, exporter.exportedSpans.get("zip"));
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("delay"));
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("just"));
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("zip"));
}
@Test
public void testDeferThen() {
Mono<Span> source = outer(
Mono.fromCallable(() -> Span.current()).transform(i -> traceMono(i, "fromCallable")) // traces fromCallable
.then(Mono.fromCallable(() -> Span.current()).transform(i -> traceMono(i, "then")))); // traces then
Span innerCurrent = runTest("testDeferThen", source);
assertSpanEquals(innerCurrent, exporter.exportedSpans.get("then"));
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("fromCallable"));
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("then"));
assertIsParent(exporter.exportedSpans.get("testDeferThen"), exporter.exportedSpans.get("outer"));
}
@Test
public void testDeferThenBracketsMatter() {
Mono<Span> source = outer(
Mono.fromCallable(() -> Span.current()).transform(i -> traceMono(i, "fromCallable")) // traces fromCallable
.then(Mono.fromCallable(() -> Span.current())).transform(i -> traceMono(i, "fromCallableAndThen"))); // traces (fromCallable->then)
Span innerCurrent = runTest("testDeferThenBracketsMatter", source);
assertSpanEquals(innerCurrent, exporter.exportedSpans.get("fromCallableAndThen"));
assertIsParent(exporter.exportedSpans.get("fromCallableAndThen"), exporter.exportedSpans.get("fromCallable"));
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("fromCallableAndThen"));
assertIsParent(exporter.exportedSpans.get("testDeferThenBracketsMatter"), exporter.exportedSpans.get("outer"));
}
@Test
public void testMap() {
Mono<Span> source = outer(
Mono.delay(Duration.ofMillis(1)).transform(i -> traceMono(i, "delay"))
.map(d -> Span.current()).transform(i -> traceMono(i, "delayAndMap")));
Span innerCurrent = runTest("testMap", source);
assertSpanEquals(innerCurrent, exporter.exportedSpans.get("delayAndMap"));
assertIsParent(exporter.exportedSpans.get("delayAndMap"), exporter.exportedSpans.get("delay"));
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("delayAndMap"));
assertIsParent(exporter.exportedSpans.get("testMap"), exporter.exportedSpans.get("outer"));
}
@Test
public void testCallable() {
Mono<Span> source =
// now it's callable and happens in 'innerCallable' context
Mono.fromCallable(() -> {
assertEquals("innerCallable", ((ReadableSpan)Span.current()).getName());
return Span.current();
}).transform(i -> traceMono(i, "innerCallable"))
.transform(o -> traceMono(o, "outer"));
Span innerCurrent = runTest("testCallable", source);
assertSpanEquals(innerCurrent, exporter.exportedSpans.get("innerCallable"));
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("innerCallable"));
assertIsParent(exporter.exportedSpans.get("testCallable"), exporter.exportedSpans.get("outer"));
}
private void assertSpanEquals(Span span, SpanData spanData) {
assertSpanEquals(((ReadableSpan) span).toSpanData(), spanData);
}
private void assertSpanEquals(SpanData expected, SpanData actual) {
assertEquals(expected.getName(), actual.getName());
assertEquals(expected.getSpanId(), actual.getSpanId());
assertEquals(expected.getTraceId(), actual.getTraceId());
assertEquals(expected.getParentSpanId(), actual.getParentSpanId());
}
private void assertIsParent(SpanData parent, SpanData child) {
assertEquals(parent.getTraceId(), child.getTraceId());
assertEquals(parent.getSpanId(), child.getParentSpanId());
}
private class TestExporter implements SpanExporter {
public Map<String, SpanData> exportedSpans = new ConcurrentHashMap<>();
public Map<String, SpanData> exportedSpansById = new ConcurrentHashMap<>();
@Override
public CompletableResultCode export(Collection<SpanData> collection) {
for (SpanData sp : collection) {
exportedSpans.put(sp.getName(), sp);
exportedSpansById.put(sp.getSpanId(), sp);
}
return CompletableResultCode.ofSuccess();
}
@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}
@Override
public CompletableResultCode shutdown() {
return CompletableResultCode.ofSuccess();
}
}
class TestException extends Exception {
}
}
class TracingOperator2 {
public static void registerOnEachOperator() {
Hooks.onEachOperator(TracingSubscriber.class.getName(), Operators.lift((pub, sub) -> new TracingOperator2.TracingSubscriber<>(sub, sub.currentContext())));
}
static class TracingSubscriber<T> implements CoreSubscriber<T> {
private io.opentelemetry.context.Context traceContext;
private final Subscriber<? super T> subscriber;
private final reactor.util.context.Context context;
public TracingSubscriber(Subscriber<? super T> subscriber, reactor.util.context.Context ctx) {
this(subscriber, ctx, io.opentelemetry.context.Context.current());
}
public TracingSubscriber(
Subscriber<? super T> subscriber,
reactor.util.context.Context ctx,
io.opentelemetry.context.Context contextToPropagate) {
this.subscriber = subscriber;
this.context = ctx;
this.traceContext = context.getOrDefault("otel-context-key", contextToPropagate);
}
@Override
public void onSubscribe(Subscription subscription) {
withActiveSpan(() -> subscriber.onSubscribe(subscription), "onSubscribe");
}
@Override
public void onNext(T o) {
withActiveSpan(() -> subscriber.onNext(o), "onNext");
}
@Override
public void onError(Throwable throwable) {
withActiveSpan(() -> subscriber.onError(throwable), "onError");
}
@Override
public void onComplete() {
withActiveSpan(subscriber::onComplete, "onComplete");
}
@Override
public reactor.util.context.Context currentContext() {
return context;
}
private void withActiveSpan(Runnable runnable, String callback) {
try (Scope ignored = traceContext.makeCurrent()) {
/*System.out.printf("<-- Making span current '%s', current id - %s\n",
callback, Span.current().getSpanContext().getSpanId());*/
runnable.run();
//System.out.printf(" done %s-->\n", Span.current().getSpanContext().getSpanId());
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment