Last active
April 26, 2023 15:37
-
-
Save 62mkv/779ff24367248519b467b25f60202067 to your computer and use it in GitHub Desktop.
Example of TracingExecutionListener, adapted from https://github.com/ttddyy/r2dbc-proxy-examples/blob/master/listener-example/src/main/java/io/r2dbc/examples/TracingExecutionListener.java to use OpenTelemetry instead of Zipkin/Brave
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.example; | |
import io.opentelemetry.api.trace.SpanKind; | |
import io.opentelemetry.api.trace.Tracer; | |
import io.opentelemetry.api.trace.Span; | |
import io.r2dbc.proxy.core.*; | |
import io.r2dbc.proxy.listener.ProxyMethodExecutionListener; | |
import static java.util.stream.Collectors.joining; | |
/** | |
* Listener to create spans for R2DBC SPI operations. | |
* | |
* @author Tadaya Tsuyukubo | |
*/ | |
public class TracingExecutionListener implements ProxyMethodExecutionListener { | |
private static final String TAG_CONNECTION_ID = "connectionId"; | |
private static final String TAG_CONNECTION_CREATE_THREAD_ID = "threadIdOnCreate"; | |
private static final String TAG_CONNECTION_CLOSE_THREAD_ID = "threadIdOnClose"; | |
private static final String TAG_CONNECTION_CREATE_THREAD_NAME = "threadNameOnCreate"; | |
private static final String TAG_CONNECTION_CLOSE_THREAD_NAME = "threadNameOnClose"; | |
private static final String TAG_THREAD_ID = "threadId"; | |
private static final String TAG_THREAD_NAME = "threadName"; | |
private static final String TAG_QUERIES = "queries"; | |
private static final String TAG_BATCH_SIZE = "batchSize"; | |
private static final String TAG_QUERY_TYPE = "type"; | |
private static final String TAG_QUERY_SUCCESS = "success"; | |
private static final String TAG_QUERY_MAPPED_RESULT_COUNT = "mappedResultCount"; | |
private static final String TAG_TRANSACTION_SAVEPOINT = "savepoint"; | |
private static final String TAG_TRANSACTION_COUNT = "transactionCount"; | |
private static final String TAG_COMMIT_COUNT = "commitCount"; | |
private static final String TAG_ROLLBACK_COUNT = "rollbackCount"; | |
static final String CONNECTION_SPAN_KEY = "connectionSpan"; | |
static final String TRANSACTION_SPAN_KEY = "transactionSpan"; | |
static final String QUERY_SPAN_KEY = "querySpan"; | |
private final Tracer tracer; | |
public TracingExecutionListener(Tracer tracer) { | |
this.tracer = tracer; | |
} | |
@Override | |
public void beforeCreateOnConnectionFactory(MethodExecutionInfo methodExecutionInfo) { | |
Span connectionSpan = this.tracer.spanBuilder("r2dbc:connection") | |
.setSpanKind(SpanKind.CLIENT) | |
.startSpan(); | |
// store the span for retrieval at "afterCreateOnConnectionFactory" | |
methodExecutionInfo.getValueStore().put("initialConnectionSpan", connectionSpan); | |
} | |
@Override | |
public void afterCreateOnConnectionFactory(MethodExecutionInfo methodExecutionInfo) { | |
// retrieve the span created at "beforeCreateOnConnectionFactory" | |
Span connectionSpan = methodExecutionInfo.getValueStore().get("initialConnectionSpan", Span.class); | |
Throwable thrown = methodExecutionInfo.getThrown(); | |
if (thrown != null) { | |
connectionSpan | |
.recordException(thrown) | |
.end(); | |
return; | |
} | |
ConnectionInfo connectionInfo = methodExecutionInfo.getConnectionInfo(); | |
String connectionId = connectionInfo.getConnectionId(); | |
connectionSpan | |
.setAttribute(TAG_CONNECTION_ID, connectionId) | |
.setAttribute(TAG_CONNECTION_CREATE_THREAD_ID, String.valueOf(methodExecutionInfo.getThreadId())) | |
.setAttribute(TAG_CONNECTION_CREATE_THREAD_NAME, methodExecutionInfo.getThreadName()) | |
.updateName("Connection created"); | |
// store the span in connection scoped value store | |
connectionInfo.getValueStore().put(CONNECTION_SPAN_KEY, connectionSpan); | |
} | |
@Override | |
public void afterCloseOnConnection(MethodExecutionInfo methodExecutionInfo) { | |
ConnectionInfo connectionInfo = methodExecutionInfo.getConnectionInfo(); | |
String connectionId = connectionInfo.getConnectionId(); | |
Span connectionSpan = connectionInfo.getValueStore().get(CONNECTION_SPAN_KEY, Span.class); | |
if (connectionSpan == null) { | |
return; // already closed | |
} | |
Throwable thrown = methodExecutionInfo.getThrown(); | |
if (thrown != null) { | |
connectionSpan.recordException(thrown); | |
} | |
connectionSpan | |
.setAttribute(TAG_CONNECTION_ID, connectionId) | |
.setAttribute(TAG_CONNECTION_CLOSE_THREAD_ID, String.valueOf(methodExecutionInfo.getThreadId())) | |
.setAttribute(TAG_CONNECTION_CLOSE_THREAD_NAME, methodExecutionInfo.getThreadName()) | |
.setAttribute(TAG_TRANSACTION_COUNT, String.valueOf(connectionInfo.getTransactionCount())) | |
.setAttribute(TAG_COMMIT_COUNT, String.valueOf(connectionInfo.getCommitCount())) | |
.setAttribute(TAG_ROLLBACK_COUNT, String.valueOf(connectionInfo.getRollbackCount())) | |
.end(); | |
} | |
@Override | |
public void beforeQuery(QueryExecutionInfo queryExecutionInfo) { | |
String connectionId = queryExecutionInfo.getConnectionInfo().getConnectionId(); | |
String queries = queryExecutionInfo.getQueries().stream() | |
.map(QueryInfo::getQuery) | |
.collect(joining(", ")); | |
Span querySpan = this.tracer | |
.spanBuilder("r2dbc:query") | |
.setSpanKind(SpanKind.CLIENT) | |
.setAttribute(TAG_CONNECTION_ID, connectionId) | |
.setAttribute(TAG_QUERY_TYPE, queryExecutionInfo.getType().toString()) | |
.setAttribute(TAG_QUERIES, queries) | |
.startSpan(); | |
if (ExecutionType.BATCH == queryExecutionInfo.getType()) { | |
querySpan.setAttribute(TAG_BATCH_SIZE, Integer.toString(queryExecutionInfo.getBatchSize())); | |
} | |
// pass the query span to "afterQuery" method | |
queryExecutionInfo.getValueStore().put(QUERY_SPAN_KEY, querySpan); | |
} | |
@Override | |
public void afterQuery(QueryExecutionInfo queryExecutionInfo) { | |
Span querySpan = queryExecutionInfo.getValueStore().get(QUERY_SPAN_KEY, Span.class); | |
querySpan | |
.setAttribute(TAG_THREAD_ID, String.valueOf(queryExecutionInfo.getThreadId())) | |
.setAttribute(TAG_THREAD_NAME, queryExecutionInfo.getThreadName()) | |
.setAttribute(TAG_QUERY_SUCCESS, Boolean.toString(queryExecutionInfo.isSuccess())); | |
Throwable thrown = queryExecutionInfo.getThrowable(); | |
if (thrown != null) { | |
querySpan.recordException(thrown); | |
} else { | |
querySpan.setAttribute(TAG_QUERY_MAPPED_RESULT_COUNT, Integer.toString(queryExecutionInfo.getCurrentResultCount())); | |
} | |
querySpan.end(); | |
} | |
@Override | |
public void beforeBeginTransactionOnConnection(MethodExecutionInfo methodExecutionInfo) { | |
Span transactionSpan = this.tracer.spanBuilder("r2dbc:transaction") | |
.setSpanKind(SpanKind.CLIENT) | |
.startSpan(); | |
methodExecutionInfo.getConnectionInfo().getValueStore().put(TRANSACTION_SPAN_KEY, transactionSpan); | |
} | |
@Override | |
public void afterCommitTransactionOnConnection(MethodExecutionInfo methodExecutionInfo) { | |
ConnectionInfo connectionInfo = methodExecutionInfo.getConnectionInfo(); | |
String connectionId = connectionInfo.getConnectionId(); | |
Span transactionSpan = connectionInfo.getValueStore().get(TRANSACTION_SPAN_KEY, Span.class); | |
if (transactionSpan != null) { | |
transactionSpan | |
.updateName("Commit") | |
.setAttribute(TAG_CONNECTION_ID, connectionId) | |
.setAttribute(TAG_THREAD_ID, String.valueOf(methodExecutionInfo.getThreadId())) | |
.setAttribute(TAG_THREAD_NAME, methodExecutionInfo.getThreadName()) | |
.end(); | |
} | |
Span connectionSpan = connectionInfo.getValueStore().get(CONNECTION_SPAN_KEY, Span.class); | |
if (connectionSpan == null) { | |
return; | |
} | |
connectionSpan.updateName("Transaction commit"); | |
} | |
@Override | |
public void afterRollbackTransactionOnConnection(MethodExecutionInfo methodExecutionInfo) { | |
ConnectionInfo connectionInfo = methodExecutionInfo.getConnectionInfo(); | |
String connectionId = connectionInfo.getConnectionId(); | |
Span transactionSpan = connectionInfo.getValueStore().get(TRANSACTION_SPAN_KEY, Span.class); | |
if (transactionSpan != null) { | |
transactionSpan | |
.updateName("Rollback") | |
.setAttribute(TAG_CONNECTION_ID, connectionId) | |
.setAttribute(TAG_THREAD_ID, String.valueOf(methodExecutionInfo.getThreadId())) | |
.setAttribute(TAG_THREAD_NAME, methodExecutionInfo.getThreadName()) | |
.end(); | |
} | |
Span connectionSpan = connectionInfo.getValueStore().get(CONNECTION_SPAN_KEY, Span.class); | |
connectionSpan.updateName("Transaction rollback"); | |
} | |
@Override | |
public void afterRollbackTransactionToSavepointOnConnection(MethodExecutionInfo methodExecutionInfo) { | |
ConnectionInfo connectionInfo = methodExecutionInfo.getConnectionInfo(); | |
String connectionId = connectionInfo.getConnectionId(); | |
String savepoint = (String) methodExecutionInfo.getMethodArgs()[0]; | |
Span transactionSpan = connectionInfo.getValueStore().get(TRANSACTION_SPAN_KEY, Span.class); | |
if (transactionSpan != null) { | |
transactionSpan | |
.updateName("Rollback to savepoint") | |
.setAttribute(TAG_TRANSACTION_SAVEPOINT, savepoint) | |
.setAttribute(TAG_CONNECTION_ID, connectionId) | |
.setAttribute(TAG_THREAD_ID, String.valueOf(methodExecutionInfo.getThreadId())) | |
.setAttribute(TAG_THREAD_NAME, methodExecutionInfo.getThreadName()) | |
.end(); | |
} | |
Span connectionSpan = connectionInfo.getValueStore().get(CONNECTION_SPAN_KEY, Span.class); | |
connectionSpan.updateName("Transaction rollback to savepoint"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment