Skip to content

Commit 1fb4c6a

Browse files
authored
#minor Adding Dynamic Workflows support for SdkTestingExecutor (#318)
# TL;DR Adds support for testing `SdkDynamicWorkflowTask` (dynamic workflows) using `SdkTestingExecutor`, enabling assertions on their generated DAGs similarly to regular workflows. ## Type - [ ] Bug Fix - [x] Feature - [ ] Plugin ## Are all requirements met? - [x] Code completed - [x] Smoke tested - [x] Unit tests added - [x] Code documentation added - [x] Any pending items have an associated Issue ## Complete description This PR extends `SdkTestingExecutor` to support dynamic workflows (`SdkDynamicWorkflowTask`) by wrapping them into a lightweight delegating workflow internally. This allows developers to test the structure and outputs of dynamic workflows using the same mechanisms used to test regular workflows, improving consistency and coverage in unit testing Flyte tasks. There were no existing issues or tracking requests for this feature — it was implemented to fill a current gap in testing capabilities for dynamic workflows in `flytekit-testing`. ## Tracking Issue NA ## Follow-up issue NA --------- Signed-off-by: Rodolfo Carvalho <[email protected]>
1 parent 152f2e9 commit 1fb4c6a

File tree

4 files changed

+218
-0
lines changed

4 files changed

+218
-0
lines changed
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2025 Flyte Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing,
11+
* software distributed under the License is distributed on an
12+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
13+
* KIND, either express or implied. See the License for the
14+
* specific language governing permissions and limitations
15+
* under the License.
16+
*/
17+
package org.flyte.flytekit.testing;
18+
19+
import java.util.Objects;
20+
import org.flyte.flytekit.SdkDynamicWorkflowTask;
21+
import org.flyte.flytekit.SdkType;
22+
import org.flyte.flytekit.SdkTypes;
23+
import org.flyte.flytekit.SdkWorkflow;
24+
import org.flyte.flytekit.SdkWorkflowBuilder;
25+
26+
public class SdkDynamicWorkflowTaskDelegatingWorkflow<InputT, OutputT>
27+
extends SdkWorkflow<Void, OutputT> {
28+
private final SdkDynamicWorkflowTask<InputT, OutputT> delegate;
29+
private final InputT input;
30+
31+
public SdkDynamicWorkflowTaskDelegatingWorkflow(
32+
SdkDynamicWorkflowTask<InputT, OutputT> delegate,
33+
InputT input,
34+
SdkType<OutputT> outputSdkType) {
35+
super(SdkTypes.nulls(), outputSdkType);
36+
this.delegate = Objects.requireNonNull(delegate, "delegate cannot be null");
37+
this.input = input;
38+
}
39+
40+
@Override
41+
protected OutputT expand(SdkWorkflowBuilder builder, Void ignored) {
42+
return delegate.run(builder, this.input);
43+
}
44+
}

flytekit-testing/src/main/java/org/flyte/flytekit/testing/SdkTestingExecutor.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,41 @@ public static SdkTestingExecutor of(
116116
.build();
117117
}
118118

119+
/**
120+
* Creates a new {@link SdkTestingExecutor} for testing a {@link SdkDynamicWorkflowTask}. This
121+
* method wraps the given dynamic workflow task and its input into a delegating workflow, allowing
122+
* the task to be executed and tested in isolation.
123+
*
124+
* @param task the dynamic workflow task to test
125+
* @param input the input to the dynamic workflow task
126+
* @param outputType the expected output type of the dynamic workflow task
127+
* @param <InputT> the type of the input
128+
* @param <OutputT> the type of the output
129+
* @return a new {@link SdkTestingExecutor} instance
130+
* <p>Example usage:
131+
* <pre>{@code
132+
* int expected = 6;
133+
*
134+
* SumIfEvenDynamicWorkflowTask.Output output =
135+
* SdkTestingExecutor.of(
136+
* new SumIfEvenDynamicWorkflowTask(),
137+
* SumIfEvenDynamicWorkflowTask.Input.create(of(2), of(4)),
138+
* JacksonSdkType.of(SumIfEvenDynamicWorkflowTask.Output.class))
139+
* .withTaskOutput(
140+
* new SumTask(),
141+
* SumTask.SumInput.create(of(2), of(4)),
142+
* SumTask.SumOutput.create(of(expected)))
143+
* .execute()
144+
* .getOutputAs(JacksonSdkType.of(SumIfEvenDynamicWorkflowTask.Output.class));
145+
*
146+
* assertEquals(expected, output.c().get());
147+
* }</pre>
148+
*/
149+
public static <InputT, OutputT> SdkTestingExecutor of(
150+
SdkDynamicWorkflowTask<InputT, OutputT> task, InputT input, SdkType<OutputT> outputType) {
151+
return of(new SdkDynamicWorkflowTaskDelegatingWorkflow<>(task, input, outputType));
152+
}
153+
119154
@AutoValue
120155
public abstract static class Result {
121156
abstract Map<String, Literal> literalMap();
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2025 Flyte Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing,
11+
* software distributed under the License is distributed on an
12+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
13+
* KIND, either express or implied. See the License for the
14+
* specific language governing permissions and limitations
15+
* under the License.
16+
*/
17+
package org.flyte.flytekit.testing;
18+
19+
import static org.flyte.flytekit.SdkBindingDataFactory.of;
20+
import static org.junit.jupiter.api.Assertions.*;
21+
22+
import org.flyte.flytekit.jackson.JacksonSdkType;
23+
import org.junit.jupiter.api.Test;
24+
25+
public class SdkDynamicWorkflowTaskDelegatingWorkflowTest {
26+
@Test
27+
public void testDelegatingWorkflow_EvenValues() {
28+
int expected = 6;
29+
30+
SumIfEvenDynamicWorkflowTask.Output output =
31+
SdkTestingExecutor.of(
32+
new SumIfEvenDynamicWorkflowTask(),
33+
SumIfEvenDynamicWorkflowTask.Input.create(of(2), of(4)),
34+
JacksonSdkType.of(SumIfEvenDynamicWorkflowTask.Output.class))
35+
.withTaskOutput(
36+
new SumTask(),
37+
SumTask.SumInput.create(of(2), of(4)),
38+
SumTask.SumOutput.create(of(expected)))
39+
.execute()
40+
.getOutputAs(JacksonSdkType.of(SumIfEvenDynamicWorkflowTask.Output.class));
41+
assertEquals(expected, output.c().get());
42+
}
43+
44+
@Test
45+
public void testDelegatingWorkflow_Odd() {
46+
int expected = 0;
47+
48+
SumIfEvenDynamicWorkflowTask.Output output =
49+
SdkTestingExecutor.of(
50+
new SumIfEvenDynamicWorkflowTask(),
51+
SumIfEvenDynamicWorkflowTask.Input.create(of(1), of(4)),
52+
JacksonSdkType.of(SumIfEvenDynamicWorkflowTask.Output.class))
53+
.withTaskOutput(
54+
new SumTask(),
55+
SumTask.SumInput.create(of(0), of(0)),
56+
SumTask.SumOutput.create(of(expected)))
57+
.execute()
58+
.getOutputAs(JacksonSdkType.of(SumIfEvenDynamicWorkflowTask.Output.class));
59+
assertEquals(expected, output.c().get());
60+
}
61+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright 2025 Flyte Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing,
11+
* software distributed under the License is distributed on an
12+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
13+
* KIND, either express or implied. See the License for the
14+
* specific language governing permissions and limitations
15+
* under the License.
16+
*/
17+
package org.flyte.flytekit.testing;
18+
19+
import static org.flyte.flytekit.SdkBindingDataFactory.of;
20+
21+
import com.google.auto.service.AutoService;
22+
import com.google.auto.value.AutoValue;
23+
import org.flyte.flytekit.*;
24+
import org.flyte.flytekit.jackson.JacksonSdkType;
25+
26+
@AutoService(SumIfEvenDynamicWorkflowTask.class)
27+
public class SumIfEvenDynamicWorkflowTask
28+
extends SdkDynamicWorkflowTask<
29+
SumIfEvenDynamicWorkflowTask.Input, SumIfEvenDynamicWorkflowTask.Output> {
30+
@AutoValue
31+
public abstract static class Input {
32+
33+
abstract SdkBindingData<Long> a();
34+
35+
abstract SdkBindingData<Long> b();
36+
37+
static Input create(SdkBindingData<Long> a, SdkBindingData<Long> b) {
38+
return new AutoValue_SumIfEvenDynamicWorkflowTask_Input(a, b);
39+
}
40+
}
41+
42+
@AutoValue
43+
public abstract static class Output {
44+
45+
abstract SdkBindingData<Long> c();
46+
47+
static Output create(SdkBindingData<Long> c) {
48+
return new AutoValue_SumIfEvenDynamicWorkflowTask_Output(c);
49+
}
50+
}
51+
52+
public SumIfEvenDynamicWorkflowTask() {
53+
super(JacksonSdkType.of(Input.class), JacksonSdkType.of(Output.class));
54+
}
55+
56+
@Override
57+
public Output run(SdkWorkflowBuilder builder, Input input) {
58+
/*
59+
* This is to demonstrate that we can use concrete values in the dynamic workflow task
60+
*/
61+
long aConcreteValue = input.a().get();
62+
long bConcreteValue = input.b().get();
63+
64+
SumTask.SumOutput outputs =
65+
builder
66+
.apply(
67+
SdkConditions.when(
68+
"is-even",
69+
SdkConditions.isTrue(
70+
of(aConcreteValue % 2 == 0 && bConcreteValue % 2 == 0)),
71+
new SumTask(),
72+
SumTask.SumInput.create(input.a(), input.b()))
73+
.otherwise("is-odd", new SumTask(), SumTask.SumInput.create(of(0L), of(0L))))
74+
.getOutputs();
75+
76+
return Output.create(outputs.c());
77+
}
78+
}

0 commit comments

Comments
 (0)