diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/RetryPolicy.java b/durabletask-client/src/main/java/io/dapr/durabletask/RetryPolicy.java index 9efd912b1..ba7b34a90 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/RetryPolicy.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/RetryPolicy.java @@ -14,8 +14,9 @@ package io.dapr.durabletask; import javax.annotation.Nullable; + import java.time.Duration; -import java.util.Objects; +import java.util.Random; /** * A declarative retry policy that can be configured for activity or sub-orchestration calls. @@ -27,6 +28,7 @@ public final class RetryPolicy { private double backoffCoefficient = 1.0; private Duration maxRetryInterval = Duration.ZERO; private Duration retryTimeout = Duration.ZERO; + private double jitterFactor = 0.0; /** * Creates a new {@code RetryPolicy} object. @@ -173,4 +175,54 @@ public Duration getMaxRetryInterval() { public Duration getRetryTimeout() { return this.retryTimeout; } + + /** + * Sets the jitter factor applied to the computed retry delay. + * + *

A value between 0.0 (no jitter) and 1.0 (up to 100% reduction). For each retry, the delay + * is reduced by a random fraction in the range {@code [0, jitterFactor)}, using a deterministic + * seed derived from the first-attempt timestamp and the attempt number. The seed must be + * deterministic: the delay drives the {@code finalFireAt} of a durable timer, and if replay + * computes a different value, the timer-chain check may create spurious sub-timers that shift + * subsequent sequence IDs and cause a NonDeterministicOrchestratorException. + * This desynchronizes concurrent workflow retries and avoids thundering herd behaviour.

+ * + * @param jitterFactor the jitter factor; must be between 0.0 and 1.0 inclusive + * @return this retry policy object + * @throws IllegalArgumentException if {@code jitterFactor} is outside [0.0, 1.0] + */ + public RetryPolicy setJitterFactor(double jitterFactor) { + if (!Double.isFinite(jitterFactor) || jitterFactor < 0.0 || jitterFactor > 1.0) { + throw new IllegalArgumentException("The value for jitterFactor must be between 0.0 and 1.0 inclusive."); + } + this.jitterFactor = jitterFactor; + return this; + } + + /** + * Gets the configured jitter factor. + * + * @return the configured jitter factor + */ + public double getJitterFactor() { + return this.jitterFactor; + } + + /** + * Applies jitter to a delay value, reducing it by a deterministic random fraction + * in [0, jitterFactor). The result is guaranteed to be at least 1ms. + * + * @param delayInMillis the base delay in milliseconds (must be positive) + * @param jitterFactor the jitter factor in [0.0, 1.0] + * @param seed deterministic seed for the random number generator + * @return the jittered delay in milliseconds, always >= 1 + */ + static long applyJitter(long delayInMillis, double jitterFactor, long seed) { + if (jitterFactor > 0.0) { + double reduction = new Random(seed).nextDouble() * jitterFactor; + delayInMillis = (long) (delayInMillis * (1.0 - reduction)); + return Math.max(delayInMillis, 1); + } + return delayInMillis; + } } diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java index 1ecdcde7e..a12514b70 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java @@ -1503,7 +1503,7 @@ private Duration getNextDelay() { (long) Helpers.powExact(this.policy.getBackoffCoefficient(), this.attemptNumber)); } catch (ArithmeticException overflowException) { if (maxDelayInMillis > 0) { - return this.policy.getMaxRetryInterval(); + nextDelayInMillis = maxDelayInMillis; } else { // If no maximum is specified, just throw throw new ArithmeticException("The retry policy calculation resulted in an arithmetic " @@ -1513,10 +1513,17 @@ private Duration getNextDelay() { // NOTE: A max delay of zero or less is interpreted to mean no max delay if (nextDelayInMillis > maxDelayInMillis && maxDelayInMillis > 0) { - return this.policy.getMaxRetryInterval(); - } else { - return Duration.ofMillis(nextDelayInMillis); + nextDelayInMillis = maxDelayInMillis; } + + // Apply jitter: reduce delay by a random fraction in [0, jitterFactor). + // Seed is deterministic so that replay computes the same finalFireAt, preventing + // the createTimerChain callback from creating spurious extra sub-timers. + long seed = this.firstAttempt.toEpochMilli() + this.attemptNumber; + nextDelayInMillis = RetryPolicy.applyJitter( + nextDelayInMillis, this.policy.getJitterFactor(), seed); + + return Duration.ofMillis(nextDelayInMillis); } // If there's no declarative retry policy defined, then the custom code retry handler diff --git a/durabletask-client/src/test/java/io/dapr/durabletask/RetryPolicyTest.java b/durabletask-client/src/test/java/io/dapr/durabletask/RetryPolicyTest.java new file mode 100644 index 000000000..e0c4b6bc9 --- /dev/null +++ b/durabletask-client/src/test/java/io/dapr/durabletask/RetryPolicyTest.java @@ -0,0 +1,106 @@ +/* + * Copyright 2026 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.*; + +public class RetryPolicyTest { + + // ---- default value ---- + + @Test + void jitterFactorDefaultsToZero() { + RetryPolicy policy = new RetryPolicy(3, Duration.ofSeconds(1)); + assertEquals(0.0, policy.getJitterFactor()); + } + + // ---- valid boundary values ---- + + @Test + void jitterFactorZeroIsAccepted() { + RetryPolicy policy = new RetryPolicy(3, Duration.ofSeconds(1)); + RetryPolicy returned = policy.setJitterFactor(0.0); + assertEquals(0.0, policy.getJitterFactor()); + assertSame(policy, returned, "setJitterFactor should return this for chaining"); + } + + @Test + void jitterFactorOneIsAccepted() { + RetryPolicy policy = new RetryPolicy(3, Duration.ofSeconds(1)); + policy.setJitterFactor(1.0); + assertEquals(1.0, policy.getJitterFactor()); + } + + @Test + void jitterFactorMidRangeIsAccepted() { + RetryPolicy policy = new RetryPolicy(3, Duration.ofSeconds(1)); + policy.setJitterFactor(0.5); + assertEquals(0.5, policy.getJitterFactor()); + } + + // ---- invalid values ---- + + @Test + void jitterFactorBelowZeroThrows() { + RetryPolicy policy = new RetryPolicy(3, Duration.ofSeconds(1)); + assertThrows(IllegalArgumentException.class, () -> policy.setJitterFactor(-0.1)); + } + + @Test + void jitterFactorAboveOneThrows() { + RetryPolicy policy = new RetryPolicy(3, Duration.ofSeconds(1)); + assertThrows(IllegalArgumentException.class, () -> policy.setJitterFactor(1.1)); + } + + @Test + void jitterFactorNaNThrows() { + RetryPolicy policy = new RetryPolicy(3, Duration.ofSeconds(1)); + assertThrows(IllegalArgumentException.class, () -> policy.setJitterFactor(Double.NaN)); + } + + @Test + void jitterFactorPositiveInfinityThrows() { + RetryPolicy policy = new RetryPolicy(3, Duration.ofSeconds(1)); + assertThrows(IllegalArgumentException.class, () -> policy.setJitterFactor(Double.POSITIVE_INFINITY)); + } + + + /** + * With jitterFactor=0 the delay must be unchanged. + */ + @Test + void zeroJitterLeavesDelayUnchanged() { + long baseDelayMillis = 3000L; + assertEquals(baseDelayMillis, RetryPolicy.applyJitter(baseDelayMillis, 0.0, 42L)); + } + + /** + * With jitterFactor=1.0 the delay must never drop below 1ms, + * even when nextDouble() approaches 1.0. + */ + @Test + void jitterWithMaxFactorNeverProducesZeroDelay() { + long baseDelayMillis = 1L; // smallest meaningful delay + + for (int seed = 0; seed < 1000; seed++) { + long result = RetryPolicy.applyJitter(baseDelayMillis, 1.0, seed); + assertTrue(result >= 1, + "Delay must be at least 1ms, but was " + result + " for seed " + seed); + } + } +} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskRetryPolicy.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskRetryPolicy.java index b0e72f917..575d4afdb 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskRetryPolicy.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskRetryPolicy.java @@ -24,9 +24,10 @@ public final class WorkflowTaskRetryPolicy { private final Double backoffCoefficient; private final Duration maxRetryInterval; private final Duration retryTimeout; + private final Double jitterFactor; /** - * Constructor for WorkflowTaskRetryPolicy. + * Constructor for WorkflowTaskRetryPolicy (without jitter). * @param maxNumberOfAttempts Maximum number of attempts to retry the workflow. * @param firstRetryInterval Interval to wait before the first retry. * @param backoffCoefficient Coefficient to increase the retry interval. @@ -39,12 +40,36 @@ public WorkflowTaskRetryPolicy( Double backoffCoefficient, Duration maxRetryInterval, Duration retryTimeout + ) { + this(maxNumberOfAttempts, firstRetryInterval, backoffCoefficient, + maxRetryInterval, retryTimeout, null); + } + + /** + * Constructor for WorkflowTaskRetryPolicy. + * @param maxNumberOfAttempts Maximum number of attempts to retry the workflow. + * @param firstRetryInterval Interval to wait before the first retry. + * @param backoffCoefficient Coefficient to increase the retry interval. + * @param maxRetryInterval Maximum interval to wait between retries. + * @param retryTimeout Timeout for the whole retry process. + * @param jitterFactor Jitter factor between 0.0 and 1.0; reduces each retry delay by a random + * fraction in [0, jitterFactor) to desynchronize concurrent retries. + * 0.0 disables jitter (default). + */ + public WorkflowTaskRetryPolicy( + Integer maxNumberOfAttempts, + Duration firstRetryInterval, + Double backoffCoefficient, + Duration maxRetryInterval, + Duration retryTimeout, + Double jitterFactor ) { this.maxNumberOfAttempts = maxNumberOfAttempts; this.firstRetryInterval = firstRetryInterval; this.backoffCoefficient = backoffCoefficient; this.maxRetryInterval = maxRetryInterval; this.retryTimeout = retryTimeout; + this.jitterFactor = jitterFactor; } public int getMaxNumberOfAttempts() { @@ -67,6 +92,10 @@ public Duration getRetryTimeout() { return retryTimeout; } + public double getJitterFactor() { + return jitterFactor != null ? jitterFactor : 0.0; + } + public static Builder newBuilder() { return new Builder(); } @@ -78,6 +107,7 @@ public static class Builder { private Double backoffCoefficient = 1.0; private Duration maxRetryInterval; private Duration retryTimeout; + private Double jitterFactor = 0.0; private Builder() { } @@ -92,7 +122,8 @@ public WorkflowTaskRetryPolicy build() { this.firstRetryInterval, this.backoffCoefficient, this.maxRetryInterval, - this.retryTimeout + this.retryTimeout, + this.jitterFactor ); } @@ -176,6 +207,26 @@ public Builder setRetryTimeout(Duration retryTimeout) { return this; } + + /** + * Set the jitter factor applied to the computed retry delay. + * + *

A value between 0.0 (no jitter, default) and 1.0 (up to 100% reduction). For each retry, + * the computed delay is reduced by a random fraction in [0, jitterFactor). + * This desynchronizes concurrent workflow retries and avoids thundering herd behaviour.

+ * + * @param jitterFactor Jitter factor between 0.0 and 1.0 inclusive + * @return This builder + */ + public Builder setJitterFactor(double jitterFactor) { + if (!Double.isFinite(jitterFactor) || jitterFactor < 0.0 || jitterFactor > 1.0) { + throw new IllegalArgumentException("The value for jitterFactor must be between 0.0 and 1.0 inclusive."); + } + + this.jitterFactor = jitterFactor; + + return this; + } } } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java index 507fadc93..4d5ef5e34 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java @@ -287,6 +287,7 @@ private RetryPolicy toRetryPolicy(WorkflowTaskRetryPolicy workflowTaskRetryPolic if (workflowTaskRetryPolicy.getRetryTimeout() != null) { retryPolicy.setRetryTimeout(workflowTaskRetryPolicy.getRetryTimeout()); } + retryPolicy.setJitterFactor(workflowTaskRetryPolicy.getJitterFactor()); return retryPolicy; } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java index f1a54690b..eb9b8ee5d 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java @@ -563,4 +563,44 @@ public void callActivityRetryPolicyDefaultMaxRetryIntervalShouldBeZeroWhenNotSet assertEquals(Duration.ZERO, captor.getValue().getRetryPolicy().getMaxRetryInterval()); } + + @Test + public void callActivityRetryPolicyJitterFactorShouldBePropagated() { + String expectedName = "TestActivity"; + String expectedInput = "TestInput"; + double expectedJitterFactor = 0.5; + WorkflowTaskRetryPolicy retryPolicy = WorkflowTaskRetryPolicy.newBuilder() + .setMaxNumberOfAttempts(5) + .setFirstRetryInterval(Duration.ofSeconds(1)) + .setJitterFactor(expectedJitterFactor) + .build(); + WorkflowTaskOptions options = new WorkflowTaskOptions(retryPolicy); + ArgumentCaptor captor = ArgumentCaptor.forClass(TaskOptions.class); + + context.callActivity(expectedName, expectedInput, options, String.class); + + verify(mockInnerContext, times(1)) + .callActivity(eq(expectedName), eq(expectedInput), captor.capture(), eq(String.class)); + + assertEquals(expectedJitterFactor, captor.getValue().getRetryPolicy().getJitterFactor()); + } + + @Test + public void callActivityRetryPolicyDefaultJitterFactorShouldBeZeroWhenNotSet() { + String expectedName = "TestActivity"; + String expectedInput = "TestInput"; + WorkflowTaskRetryPolicy retryPolicy = WorkflowTaskRetryPolicy.newBuilder() + .setMaxNumberOfAttempts(5) + .setFirstRetryInterval(Duration.ofSeconds(1)) + .build(); + WorkflowTaskOptions options = new WorkflowTaskOptions(retryPolicy); + ArgumentCaptor captor = ArgumentCaptor.forClass(TaskOptions.class); + + context.callActivity(expectedName, expectedInput, options, String.class); + + verify(mockInnerContext, times(1)) + .callActivity(eq(expectedName), eq(expectedInput), captor.capture(), eq(String.class)); + + assertEquals(0.0, captor.getValue().getRetryPolicy().getJitterFactor()); + } } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/WorkflowTaskRetryPolicyTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/WorkflowTaskRetryPolicyTest.java new file mode 100644 index 000000000..f4c36f8e2 --- /dev/null +++ b/sdk-workflows/src/test/java/io/dapr/workflows/WorkflowTaskRetryPolicyTest.java @@ -0,0 +1,126 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package io.dapr.workflows; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.*; + +public class WorkflowTaskRetryPolicyTest { + + // ---- default value ---- + + @Test + void jitterFactorDefaultsToZero() { + WorkflowTaskRetryPolicy policy = WorkflowTaskRetryPolicy.newBuilder().build(); + assertEquals(0.0, policy.getJitterFactor()); + } + + /** + * When the policy is constructed via the all-args constructor with a null + * jitterFactor (e.g. deserialisation path), getJitterFactor() must still + * return 0.0 rather than throw a NullPointerException. + */ + @Test + void jitterFactorNullInConstructorReturnsZero() { + WorkflowTaskRetryPolicy policy = new WorkflowTaskRetryPolicy( + 3, + Duration.ofSeconds(1), + 1.0, + null, + null, + null // jitterFactor = null + ); + assertEquals(0.0, policy.getJitterFactor()); + } + + // ---- valid boundary values ---- + + @Test + void jitterFactorZeroIsAccepted() { + WorkflowTaskRetryPolicy policy = WorkflowTaskRetryPolicy.newBuilder() + .setJitterFactor(0.0) + .build(); + assertEquals(0.0, policy.getJitterFactor()); + } + + @Test + void jitterFactorOneIsAccepted() { + WorkflowTaskRetryPolicy policy = WorkflowTaskRetryPolicy.newBuilder() + .setJitterFactor(1.0) + .build(); + assertEquals(1.0, policy.getJitterFactor()); + } + + @Test + void jitterFactorMidRangeIsAccepted() { + WorkflowTaskRetryPolicy policy = WorkflowTaskRetryPolicy.newBuilder() + .setJitterFactor(0.3) + .build(); + assertEquals(0.3, policy.getJitterFactor()); + } + + // ---- invalid values ---- + + @Test + void jitterFactorBelowZeroThrows() { + WorkflowTaskRetryPolicy.Builder builder = WorkflowTaskRetryPolicy.newBuilder(); + assertThrows(IllegalArgumentException.class, () -> builder.setJitterFactor(-0.1)); + } + + @Test + void jitterFactorAboveOneThrows() { + WorkflowTaskRetryPolicy.Builder builder = WorkflowTaskRetryPolicy.newBuilder(); + assertThrows(IllegalArgumentException.class, () -> builder.setJitterFactor(1.1)); + } + + @Test + void jitterFactorNaNThrows() { + WorkflowTaskRetryPolicy.Builder builder = WorkflowTaskRetryPolicy.newBuilder(); + assertThrows(IllegalArgumentException.class, () -> builder.setJitterFactor(Double.NaN)); + } + + @Test + void jitterFactorPositiveInfinityThrows() { + WorkflowTaskRetryPolicy.Builder builder = WorkflowTaskRetryPolicy.newBuilder(); + assertThrows(IllegalArgumentException.class, () -> builder.setJitterFactor(Double.POSITIVE_INFINITY)); + } + + // ---- builder chaining ---- + + @Test + void builderReturnsItself() { + WorkflowTaskRetryPolicy.Builder builder = WorkflowTaskRetryPolicy.newBuilder(); + assertSame(builder, builder.setJitterFactor(0.5)); + } + + // ---- coexistence with other fields ---- + + @Test + void jitterFactorDoesNotAffectOtherFields() { + WorkflowTaskRetryPolicy policy = WorkflowTaskRetryPolicy.newBuilder() + .setMaxNumberOfAttempts(5) + .setFirstRetryInterval(Duration.ofSeconds(2)) + .setBackoffCoefficient(2.0) + .setJitterFactor(0.25) + .build(); + + assertEquals(5, policy.getMaxNumberOfAttempts()); + assertEquals(Duration.ofSeconds(2), policy.getFirstRetryInterval()); + assertEquals(2.0, policy.getBackoffCoefficient()); + assertEquals(0.25, policy.getJitterFactor()); + } +}