Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.isAsyncPropagationEnabled;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.endTaskScope;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.startTaskScope;
Expand Down Expand Up @@ -53,8 +55,10 @@ public void methodAdvice(MethodTransformer transformer) {
.and(takesArgument(0, named("io.lettuce.core.protocol.RedisCommand")))),
getClass().getName() + "$Capture");
transformer.applyAdvice(
isMethod().and(namedOneOf("complete", "completeExceptionally", "onComplete", "encode")),
isMethod().and(namedOneOf("complete", "completeExceptionally", "encode")),
getClass().getName() + "$Activate");
transformer.applyAdvice(
isMethod().and(named("onComplete")), getClass().getName() + "$SuppressAsyncPropagation");

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve propagation for user onComplete callbacks

This matcher now applies to every AsyncCommand.onComplete invocation, not just Lettuce's internal registrations. When application code registers a completion callback under an active request span via onComplete, the new advice disables async propagation while onComplete delegates to the underlying CompletableFuture, so that callback and any async work it schedules are no longer linked to the request/Redis trace; previously this path was covered by Activate and propagated the command context. Please narrow the suppression to the internal registration path or keep user-visible onComplete callbacks propagating.

Useful? React with 👍 / 👎.

transformer.applyAdvice(
isMethod().and(named("cancel")).and(takesArguments(boolean.class)),
getClass().getName() + "$Cancel");
Expand Down Expand Up @@ -91,4 +95,22 @@ public static void before(@Advice.This AsyncCommand asyncCommand) {
InstrumentationContext.get(AsyncCommand.class, State.class), asyncCommand);
}
}

public static final class SuppressAsyncPropagation {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static boolean before() {
if (isAsyncPropagationEnabled()) {
setAsyncPropagationEnabled(false);
return true;
}
return false;
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void after(@Advice.Enter final boolean restore) {
if (restore) {
setAsyncPropagationEnabled(true);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package datadog.trace.instrumentation.lettuce5;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.instrumentation.lettuce5.LettuceClientDecorator.DECORATE;
import static datadog.trace.instrumentation.lettuce5.LettuceInstrumentationUtil.disableAsyncPropagation;
import static datadog.trace.instrumentation.lettuce5.LettuceInstrumentationUtil.restoreAsyncPropagation;

import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import io.lettuce.core.ConnectionFuture;
import io.lettuce.core.RedisURI;
Expand All @@ -14,37 +14,43 @@

public class ConnectionFutureAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope onEnter(@Advice.Argument(1) final RedisURI redisUri) {
final AgentSpan span =
public static void onEnter(
@Advice.Argument(1) final RedisURI redisUri, @Advice.Local("ddSpan") AgentSpan span) {
span =
startSpan(
LettuceClientDecorator.REDIS_CLIENT.toString(), LettuceClientDecorator.OPERATION_NAME);
DECORATE.afterStart(span);
span.setResourceName(DECORATE.resourceNameForConnection(redisUri));
DECORATE.onConnection(span, redisUri);
return activateSpan(span);
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Enter final AgentScope scope,
@Advice.Local("ddSpan") final AgentSpan span,
@Advice.Thrown final Throwable throwable,
@Advice.Argument(1) final RedisURI redisUri,
@Advice.Return(readOnly = false)
ConnectionFuture<? extends StatefulConnection> connectionFuture) {
final AgentSpan span = scope.span();
if (span == null) {
return;
}
if (throwable != null) {
DECORATE.onError(span, throwable);
DECORATE.beforeFinish(span);
scope.close();
span.finish();
return;
}
connectionFuture =
connectionFuture.whenComplete(
new ConnectionContextBiConsumer(
redisUri, InstrumentationContext.get(StatefulConnection.class, RedisURI.class))
.andThen(new LettuceAsyncBiConsumer<>(span)));
scope.close();
final boolean restoreCompletionCallbackPropagation = disableAsyncPropagation();
try {
connectionFuture =
connectionFuture.whenComplete(
new ConnectionContextBiConsumer(
redisUri,
InstrumentationContext.get(StatefulConnection.class, RedisURI.class))
.andThen(new LettuceAsyncBiConsumer<>(span)));
} finally {
restoreAsyncPropagation(restoreCompletionCallbackPropagation);
}
// span finished by LettuceAsyncBiConsumer
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.instrumentation.lettuce5.LettuceClientDecorator.DECORATE;
import static datadog.trace.instrumentation.lettuce5.LettuceInstrumentationUtil.disableAsyncPropagation;
import static datadog.trace.instrumentation.lettuce5.LettuceInstrumentationUtil.expectsResponse;
import static datadog.trace.instrumentation.lettuce5.LettuceInstrumentationUtil.restoreAsyncPropagation;

import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
Expand All @@ -20,7 +22,9 @@ public class LettuceAsyncCommandsAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope onEnter(
@Advice.Argument(0) final RedisCommand command,
@Advice.This final AbstractRedisAsyncCommands thiz) {
@Advice.This final AbstractRedisAsyncCommands thiz,
@Advice.Local("commandExpectsResponse") boolean commandExpectsResponse,
@Advice.Local("restoreAsyncPropagation") boolean restoreAsyncPropagation) {

final AgentSpan span =
startSpan(
Expand All @@ -32,33 +36,53 @@ public static AgentScope onEnter(
.get(thiz.getConnection()));
DECORATE.onCommand(span, command);

return activateSpan(span);
final AgentScope scope = activateSpan(span);
commandExpectsResponse = expectsResponse(command);
if (!commandExpectsResponse) {
restoreAsyncPropagation = disableAsyncPropagation();
}

return scope;
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Argument(0) final RedisCommand command,
@Advice.Enter final AgentScope scope,
@Advice.Local("commandExpectsResponse") final boolean commandExpectsResponse,
@Advice.Local("restoreAsyncPropagation") final boolean restoreAsyncPropagation,
@Advice.Thrown final Throwable throwable,
@Advice.Return AsyncCommand<?, ?, ?> asyncCommand) {

final AgentSpan span = scope.span();
if (throwable != null) {
DECORATE.onError(span, throwable);
DECORATE.beforeFinish(span);
scope.close();
span.finish();
return;
}
try {
if (scope == null) {
return;
}
final AgentSpan span = scope.span();
if (throwable != null) {
DECORATE.onError(span, throwable);
DECORATE.beforeFinish(span);
span.finish();
return;
}

// close spans on error or normal completion
if (expectsResponse(command)) {
asyncCommand.whenComplete(new LettuceAsyncBiConsumer<>(span));
} else {
DECORATE.beforeFinish(span);
span.finish();
// close spans on error or normal completion
if (commandExpectsResponse) {
final boolean restoreCompletionCallbackPropagation = disableAsyncPropagation();
try {
asyncCommand.whenComplete(new LettuceAsyncBiConsumer<>(span));
} finally {
restoreAsyncPropagation(restoreCompletionCallbackPropagation);
}
} else {
DECORATE.beforeFinish(span);
span.finish();
}
// span may be finished by LettuceAsyncBiConsumer
} finally {
restoreAsyncPropagation(restoreAsyncPropagation);
if (scope != null) {
scope.close();
}
}
scope.close();
// span may be finished by LettuceAsyncBiConsumer
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package datadog.trace.instrumentation.lettuce5;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.isAsyncPropagationEnabled;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled;

import io.lettuce.core.protocol.RedisCommand;
import java.util.Arrays;
import java.util.HashSet;
Expand Down Expand Up @@ -75,4 +78,18 @@ public static String getCommandName(final RedisCommand command) {
}
return commandName;
}

public static boolean disableAsyncPropagation() {
if (isAsyncPropagationEnabled()) {
setAsyncPropagationEnabled(false);
return true;
}
return false;
}

public static void restoreAsyncPropagation(final boolean restore) {
if (restore) {
setAsyncPropagationEnabled(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,6 @@ abstract class Lettuce5ClientTestBase extends VersionedNamingTestBase {
RedisAsyncCommands<String, ?> asyncCommands
RedisCommands<String, ?> syncCommands

@Override
boolean useStrictTraceWrites() {
// latest seems leaking continuations that terminates later hence the strict trace will discard our spans.
!isLatestDepTest
}


def setup() {
redisServer.start()
println "Using redis: $redisServer.redisURI"
Expand Down
Loading