Last active
September 27, 2021 19:35
-
-
Save lmolkova/156a9e6453f5d3f8242689cfe6a6c07a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.example; | |
import io.opentelemetry.api.trace.Span; | |
import io.opentelemetry.api.trace.StatusCode; | |
import io.opentelemetry.api.trace.Tracer; | |
import io.opentelemetry.context.Scope; | |
import io.opentelemetry.exporter.jaeger.JaegerGrpcSpanExporter; | |
import io.opentelemetry.sdk.OpenTelemetrySdk; | |
import io.opentelemetry.sdk.common.CompletableResultCode; | |
import io.opentelemetry.sdk.trace.ReadableSpan; | |
import io.opentelemetry.sdk.trace.SdkTracerProvider; | |
import io.opentelemetry.sdk.trace.data.SpanData; | |
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; | |
import io.opentelemetry.sdk.trace.export.SpanExporter; | |
import org.junit.jupiter.api.AfterAll; | |
import org.junit.jupiter.api.BeforeAll; | |
import org.junit.jupiter.api.BeforeEach; | |
import org.junit.jupiter.api.Test; | |
import org.junit.jupiter.api.TestInfo; | |
import org.reactivestreams.Subscriber; | |
import org.reactivestreams.Subscription; | |
import reactor.core.CoreSubscriber; | |
import reactor.core.Scannable; | |
import reactor.core.publisher.Hooks; | |
import reactor.core.publisher.Mono; | |
import reactor.core.publisher.Operators; | |
import reactor.util.context.ContextView; | |
import reactor.util.retry.Retry; | |
import java.time.Duration; | |
import java.util.Collection; | |
import java.util.Map; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.atomic.AtomicReference; | |
import java.util.function.BiFunction; | |
import java.util.stream.IntStream; | |
import static org.junit.jupiter.api.Assertions.assertEquals; | |
import static org.junit.jupiter.api.Assertions.assertFalse; | |
import static org.junit.jupiter.api.Assertions.assertNull; | |
public class Tests { | |
// tracing instrumentation | |
private <T> Mono<T> traceMono(Mono<T> publisher, String spanName) { | |
// this hack forces 'publisher' to run in the onNext callback of `TracingSubscriber` | |
// created for this publisher and with current() span that refer to span created here | |
// without it it runs in the parent scope | |
return DUMMY | |
.flatMap(i -> publisher) | |
.doOnEach((signal) -> { | |
if (signal.isOnError()) { | |
recordException(signal.getContextView(), signal.getThrowable()); | |
endSpan(StatusCode.ERROR, signal.getContextView()); | |
} else if (signal.isOnComplete()){ | |
endSpan(StatusCode.UNSET, signal.getContextView()); | |
} | |
}) | |
.contextWrite(ctx -> startSpan(ctx, spanName)); | |
} | |
private reactor.util.context.Context startSpan(reactor.util.context.Context subscriberContext, String name) { | |
io.opentelemetry.context.Context parent = | |
subscriberContext.getOrDefault("otel-context-key", io.opentelemetry.context.Context.current()); | |
Span span = tracer.spanBuilder(name).setParent(parent).startSpan(); | |
System.out.printf("Starting span '%s', Parent span id %s, started id - %s\n", | |
name, | |
Span.fromContext(parent).getSpanContext().getSpanId(), | |
span.getSpanContext().getSpanId()); | |
return subscriberContext.put("otel-context-key", parent.with(span)); | |
} | |
private void endSpan(StatusCode status, ContextView subscriberContext) { | |
io.opentelemetry.context.Context current = | |
subscriberContext.getOrDefault("otel-context-key", io.opentelemetry.context.Context.current()); | |
Span span = Span.fromContext(current); | |
span.setStatus(status); | |
span.end(); | |
System.out.printf("Ended span '%s', id %s\n", ((ReadableSpan)span).getName(), span.getSpanContext().getSpanId()); | |
} | |
private void recordException(ContextView subscriberContext, Throwable t) { | |
io.opentelemetry.context.Context current = | |
subscriberContext.getOrDefault("otel-context-key", io.opentelemetry.context.Context.current()); | |
Span span = Span.fromContext(current); | |
span.recordException(t); | |
} | |
// end of instrumentation | |
private Tracer tracer; | |
private TestExporter exporter; | |
private static SpanExporter jaeger = JaegerGrpcSpanExporter.builder() | |
.setEndpoint("http://localhost:14250") | |
.build(); | |
private static Mono<String> DUMMY = null; | |
@BeforeAll | |
public static void before() { | |
TracingOperator2.registerOnEachOperator(); | |
// has to capture DUMMY after hook is registered | |
DUMMY = Mono.just("1"); | |
} | |
@AfterAll | |
public static void after() throws InterruptedException { | |
Thread.sleep(1000); | |
jaeger.shutdown(); | |
} | |
@BeforeEach | |
public void setup(TestInfo info) { | |
exporter = new TestExporter(); | |
SdkTracerProvider otelProvider = SdkTracerProvider.builder() | |
.addSpanProcessor(SimpleSpanProcessor.create(exporter)) | |
.addSpanProcessor(SimpleSpanProcessor.create(jaeger)).build(); | |
tracer = OpenTelemetrySdk.builder().setTracerProvider(otelProvider).build().getTracer(info.getDisplayName()); | |
} | |
Mono<Span> outer(Mono<Span> inner) { | |
return inner.transform(n -> traceMono(n, "outer")); | |
} | |
private <T> T runTest(String testName, Mono<T> source) { | |
Span parent = tracer.spanBuilder(testName).startSpan(); | |
try (Scope s = parent.makeCurrent()) { | |
return source.block(); | |
} finally { | |
parent.end(); | |
} | |
} | |
@Test | |
public void testJust() { | |
Mono<Span> source = | |
// oops, since it's just runs synchronously all the way, just is executed before | |
// parent span even starts | |
Mono | |
.just(Span.current()).transform(i -> traceMono(i, "innerJust")) | |
.transform(o -> traceMono(o, "outer")); | |
Span innerCurrent = runTest("testJust", source); | |
// !!! | |
assertFalse(innerCurrent.getSpanContext().isValid()); | |
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("innerJust")); | |
assertIsParent(exporter.exportedSpans.get("testJust"), exporter.exportedSpans.get("outer")); | |
} | |
@Test | |
public void testDefer() { | |
Mono<Span> source = | |
// now it's deferred and happens in 'innerDefer' context | |
Mono.defer(() -> { | |
assertEquals("innerDefer", ((ReadableSpan)Span.current()).getName()); | |
return Mono.just(Span.current()); | |
}).transform(i -> traceMono(i, "innerDefer")) | |
.transform(o -> traceMono(o, "outer")); | |
Span innerCurrent = runTest("testDefer", source); | |
assertSpanEquals(innerCurrent, exporter.exportedSpans.get("innerDefer")); | |
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("innerDefer")); | |
assertIsParent(exporter.exportedSpans.get("testDefer"), exporter.exportedSpans.get("outer")); | |
} | |
@Test | |
public void testInnerNested() { | |
Mono<Span> source = outer(Mono.defer( () -> { | |
Span inner = tracer.spanBuilder("inner").startSpan(); | |
// DON'T do! | |
// don't make spans current in reactor code, use mono.transform(m -> traceMono()) instead | |
// OR, if you make spans current in reactor code, close them and pass in the reactor context anyway | |
try (Scope s = inner.makeCurrent()) { | |
return Mono.fromCallable(() -> Span.current()).transform(d -> traceMono(d, "innerInner")) | |
.contextWrite(ctx -> { | |
// when it executes, nested is no longer current :( | |
// without explicit context passing, this example will not work: innerInner will be child of 'outer' | |
assertEquals("outer", ((ReadableSpan)Span.current()).getName()); | |
return ctx.put("otel-context-key", io.opentelemetry.context.Context.current().with(inner)); | |
}); | |
} finally { | |
inner.end(); | |
} | |
})); | |
Span innerCurrent = runTest("testInnerNested", source); | |
assertSpanEquals(innerCurrent, exporter.exportedSpans.get("innerInner")); | |
assertIsParent(exporter.exportedSpans.get("inner"), exporter.exportedSpans.get("innerInner")); | |
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("inner")); | |
assertIsParent(exporter.exportedSpans.get("testInnerNested"), exporter.exportedSpans.get("outer")); | |
} | |
@Test | |
public void testInnerNestedBlocking() { | |
Mono<Span> source = outer(Mono.defer( () -> { | |
Span inner = tracer.spanBuilder("inner").startSpan(); | |
try (Scope s = inner.makeCurrent()) { | |
// sanity check - inner reactor code has no relation to outer | |
Mono.just(Span.current()).transform(d -> traceMono(d, "innerInner")) | |
.subscribe((span) -> assertEquals("inner", ((ReadableSpan)span).getName())); | |
return Mono.just(inner); | |
} finally { | |
inner.end(); | |
} | |
})); | |
Span innerCurrent = runTest("testInnerNestedBlocking", source); | |
assertSpanEquals(innerCurrent, exporter.exportedSpans.get("inner")); | |
assertIsParent(exporter.exportedSpans.get("inner"), exporter.exportedSpans.get("innerInner")); | |
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("inner")); | |
assertIsParent(exporter.exportedSpans.get("testInnerNestedBlocking"), exporter.exportedSpans.get("outer")); | |
} | |
@Test | |
public void testDelay() { | |
Mono<Span> source = outer(Mono | |
.delay(Duration.ofMillis(1)) | |
.flatMap(l -> Mono.just(Span.current())) | |
.transform(d -> traceMono(d, "innerDelay")) | |
); | |
Span innerCurrent = runTest("testDelay", source); | |
assertSpanEquals(innerCurrent, exporter.exportedSpans.get("innerDelay")); | |
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("innerDelay")); | |
assertIsParent(exporter.exportedSpans.get("testDelay"), exporter.exportedSpans.get("outer")); | |
} | |
@Test | |
public void testReuseMono() { | |
Mono<Span> source = outer(Mono | |
.defer(() -> Mono.just(Span.current())) | |
.transform(j -> traceMono(j, "inner"))); | |
IntStream stream = IntStream.range(0, 10); | |
stream.parallel().forEach(i -> { | |
Span innerCurrent = source.block(); | |
assertEquals("inner", ( (ReadableSpan)innerCurrent).getName()); | |
}); | |
assertEquals(20, exporter.exportedSpansById.size()); | |
assertEquals(10, exporter.exportedSpansById.entrySet().stream().filter(e -> e.getValue().getName() == "inner").count()); | |
assertEquals(10, exporter.exportedSpansById.entrySet().stream().filter(e -> e.getValue().getName() == "outer").count()); | |
for (Map.Entry<String, SpanData> exported : exporter.exportedSpansById.entrySet()) { | |
SpanData parent = exporter.exportedSpansById.get(exported.getValue().getParentSpanId()); | |
if (exported.getValue().getName() == "inner") { | |
assertEquals("outer", parent.getName()); | |
} else { | |
assertNull(parent); | |
} | |
} | |
} | |
@Test | |
public void testRetries() { | |
AtomicReference<Integer> attempt = new AtomicReference<>(0); | |
Mono<Span> source = outer( | |
Mono.defer(() -> { | |
Span.current().updateName("inner" + attempt.get()); | |
return (attempt.getAndUpdate(i -> i + 1) % 2 == 0) | |
? Mono.error(new TestException()) | |
: Mono.just(Span.current()); | |
}) | |
.transform(t -> traceMono(t, "inner")) | |
.retryWhen(Retry.fixedDelay(3, Duration.ofMillis(1)).filter(t -> t instanceof TestException)) | |
.transform(t -> traceMono(t, "innerWithRetries"))); | |
Span innerCurrent = runTest("testRetries", source); | |
assertSpanEquals(innerCurrent, exporter.exportedSpans.get("inner1")); | |
assertIsParent(exporter.exportedSpans.get("innerWithRetries"), exporter.exportedSpans.get("inner0")); | |
assertIsParent(exporter.exportedSpans.get("innerWithRetries"), exporter.exportedSpans.get("inner1")); | |
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("innerWithRetries")); | |
assertEquals(StatusCode.ERROR, exporter.exportedSpans.get("inner0").getStatus().getStatusCode()); | |
assertEquals(StatusCode.UNSET, exporter.exportedSpans.get("inner1").getStatus().getStatusCode()); | |
assertEquals(StatusCode.UNSET, exporter.exportedSpans.get("innerWithRetries").getStatus().getStatusCode()); | |
assertEquals(StatusCode.UNSET, exporter.exportedSpans.get("outer").getStatus().getStatusCode()); | |
assertEquals(StatusCode.UNSET, exporter.exportedSpans.get("testRetries").getStatus().getStatusCode()); | |
} | |
@Test | |
public void testZip() { | |
Mono<Span> inner = Mono | |
.delay(Duration.ofMillis(1)).transform(t -> traceMono(t, "delay")) | |
.zipWith(Mono.just("1").transform(t -> traceMono(t, "just"))) | |
// NOTE: !! current is 'zip' span if Mono.defer or Mono.fromCallable is called. | |
// If Mono.just(Span.current()) was used, current would be the 'outer' span | |
.flatMap(i -> Mono.fromCallable(() -> Span.current()).transform(t -> traceMono(t, "zip"))); | |
Mono<Span> source = outer(inner); | |
Span innerCurrent = runTest("testZip", source); | |
assertSpanEquals(innerCurrent, exporter.exportedSpans.get("zip")); | |
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("delay")); | |
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("just")); | |
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("zip")); | |
} | |
@Test | |
public void testDeferThen() { | |
Mono<Span> source = outer( | |
Mono.fromCallable(() -> Span.current()).transform(i -> traceMono(i, "fromCallable")) // traces fromCallable | |
.then(Mono.fromCallable(() -> Span.current()).transform(i -> traceMono(i, "then")))); // traces then | |
Span innerCurrent = runTest("testDeferThen", source); | |
assertSpanEquals(innerCurrent, exporter.exportedSpans.get("then")); | |
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("fromCallable")); | |
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("then")); | |
assertIsParent(exporter.exportedSpans.get("testDeferThen"), exporter.exportedSpans.get("outer")); | |
} | |
@Test | |
public void testDeferThenBracketsMatter() { | |
Mono<Span> source = outer( | |
Mono.fromCallable(() -> Span.current()).transform(i -> traceMono(i, "fromCallable")) // traces fromCallable | |
.then(Mono.fromCallable(() -> Span.current())).transform(i -> traceMono(i, "fromCallableAndThen"))); // traces (fromCallable->then) | |
Span innerCurrent = runTest("testDeferThenBracketsMatter", source); | |
assertSpanEquals(innerCurrent, exporter.exportedSpans.get("fromCallableAndThen")); | |
assertIsParent(exporter.exportedSpans.get("fromCallableAndThen"), exporter.exportedSpans.get("fromCallable")); | |
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("fromCallableAndThen")); | |
assertIsParent(exporter.exportedSpans.get("testDeferThenBracketsMatter"), exporter.exportedSpans.get("outer")); | |
} | |
@Test | |
public void testMap() { | |
Mono<Span> source = outer( | |
Mono.delay(Duration.ofMillis(1)).transform(i -> traceMono(i, "delay")) | |
.map(d -> Span.current()).transform(i -> traceMono(i, "delayAndMap"))); | |
Span innerCurrent = runTest("testMap", source); | |
assertSpanEquals(innerCurrent, exporter.exportedSpans.get("delayAndMap")); | |
assertIsParent(exporter.exportedSpans.get("delayAndMap"), exporter.exportedSpans.get("delay")); | |
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("delayAndMap")); | |
assertIsParent(exporter.exportedSpans.get("testMap"), exporter.exportedSpans.get("outer")); | |
} | |
@Test | |
public void testCallable() { | |
Mono<Span> source = | |
// now it's callable and happens in 'innerCallable' context | |
Mono.fromCallable(() -> { | |
assertEquals("innerCallable", ((ReadableSpan)Span.current()).getName()); | |
return Span.current(); | |
}).transform(i -> traceMono(i, "innerCallable")) | |
.transform(o -> traceMono(o, "outer")); | |
Span innerCurrent = runTest("testCallable", source); | |
assertSpanEquals(innerCurrent, exporter.exportedSpans.get("innerCallable")); | |
assertIsParent(exporter.exportedSpans.get("outer"), exporter.exportedSpans.get("innerCallable")); | |
assertIsParent(exporter.exportedSpans.get("testCallable"), exporter.exportedSpans.get("outer")); | |
} | |
private void assertSpanEquals(Span span, SpanData spanData) { | |
assertSpanEquals(((ReadableSpan) span).toSpanData(), spanData); | |
} | |
private void assertSpanEquals(SpanData expected, SpanData actual) { | |
assertEquals(expected.getName(), actual.getName()); | |
assertEquals(expected.getSpanId(), actual.getSpanId()); | |
assertEquals(expected.getTraceId(), actual.getTraceId()); | |
assertEquals(expected.getParentSpanId(), actual.getParentSpanId()); | |
} | |
private void assertIsParent(SpanData parent, SpanData child) { | |
assertEquals(parent.getTraceId(), child.getTraceId()); | |
assertEquals(parent.getSpanId(), child.getParentSpanId()); | |
} | |
private class TestExporter implements SpanExporter { | |
public Map<String, SpanData> exportedSpans = new ConcurrentHashMap<>(); | |
public Map<String, SpanData> exportedSpansById = new ConcurrentHashMap<>(); | |
@Override | |
public CompletableResultCode export(Collection<SpanData> collection) { | |
for (SpanData sp : collection) { | |
exportedSpans.put(sp.getName(), sp); | |
exportedSpansById.put(sp.getSpanId(), sp); | |
} | |
return CompletableResultCode.ofSuccess(); | |
} | |
@Override | |
public CompletableResultCode flush() { | |
return CompletableResultCode.ofSuccess(); | |
} | |
@Override | |
public CompletableResultCode shutdown() { | |
return CompletableResultCode.ofSuccess(); | |
} | |
} | |
class TestException extends Exception { | |
} | |
} | |
class TracingOperator2 { | |
public static void registerOnEachOperator() { | |
Hooks.onEachOperator(TracingSubscriber.class.getName(), Operators.lift((pub, sub) -> new TracingOperator2.TracingSubscriber<>(sub, sub.currentContext()))); | |
} | |
static class TracingSubscriber<T> implements CoreSubscriber<T> { | |
private io.opentelemetry.context.Context traceContext; | |
private final Subscriber<? super T> subscriber; | |
private final reactor.util.context.Context context; | |
public TracingSubscriber(Subscriber<? super T> subscriber, reactor.util.context.Context ctx) { | |
this(subscriber, ctx, io.opentelemetry.context.Context.current()); | |
} | |
public TracingSubscriber( | |
Subscriber<? super T> subscriber, | |
reactor.util.context.Context ctx, | |
io.opentelemetry.context.Context contextToPropagate) { | |
this.subscriber = subscriber; | |
this.context = ctx; | |
this.traceContext = context.getOrDefault("otel-context-key", contextToPropagate); | |
} | |
@Override | |
public void onSubscribe(Subscription subscription) { | |
withActiveSpan(() -> subscriber.onSubscribe(subscription), "onSubscribe"); | |
} | |
@Override | |
public void onNext(T o) { | |
withActiveSpan(() -> subscriber.onNext(o), "onNext"); | |
} | |
@Override | |
public void onError(Throwable throwable) { | |
withActiveSpan(() -> subscriber.onError(throwable), "onError"); | |
} | |
@Override | |
public void onComplete() { | |
withActiveSpan(subscriber::onComplete, "onComplete"); | |
} | |
@Override | |
public reactor.util.context.Context currentContext() { | |
return context; | |
} | |
private void withActiveSpan(Runnable runnable, String callback) { | |
try (Scope ignored = traceContext.makeCurrent()) { | |
/*System.out.printf("<-- Making span current '%s', current id - %s\n", | |
callback, Span.current().getSpanContext().getSpanId());*/ | |
runnable.run(); | |
//System.out.printf(" done %s-->\n", Span.current().getSpanContext().getSpanId()); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment