Skip to content

Commit 0f9bd91

Browse files
committed
KAFKA-19994: TaskManager may not close all tasks on task timeouts (#21155)
When a TimeoutException occurs while trying to put multiple active tasks back into running, we will add the timed out task back to the state updater, so that we retry it. However, if we run into a Task timeout (failing to make progress for a long time), we will rethrow a StreamsException wrapping the TimeoutException we have drained multiple tasks from the state updater, they will be lost, and not added back to the state updater, and therefore not be closed correctly. The task directories remain locked, causing issues trying to replace the stream thread. Reviewers: Matthias J. Sax <[email protected]>
1 parent cb86075 commit 0f9bd91

File tree

2 files changed

+67
-3
lines changed

2 files changed

+67
-3
lines changed

streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.util.HashSet;
5858
import java.util.Iterator;
5959
import java.util.LinkedHashMap;
60+
import java.util.LinkedHashSet;
6061
import java.util.LinkedList;
6162
import java.util.List;
6263
import java.util.Map;
@@ -978,9 +979,12 @@ private void closeTaskClean(final Task task,
978979
}
979980
}
980981

982+
/**
983+
* @throws StreamsException if fetching committed offsets timed out often enough to exceed task timeout
984+
*/
981985
private void transitRestoredTaskToRunning(final Task task,
982986
final long now,
983-
final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
987+
final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) throws StreamsException {
984988
try {
985989
task.completeRestoration(offsetResetter);
986990
tasks.addTask(task);
@@ -1057,8 +1061,22 @@ public Map<TaskId, RuntimeException> collectExceptionsAndFailedTasksFromStateUpd
10571061
private void handleRestoredTasksFromStateUpdater(final long now,
10581062
final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
10591063
final Duration timeout = Duration.ZERO;
1060-
for (final Task task : stateUpdater.drainRestoredActiveTasks(timeout)) {
1061-
transitRestoredTaskToRunning(task, now, offsetResetter);
1064+
// Create a mutable copy to support iterator.remove()
1065+
final Set<StreamTask> restoredTasks = new LinkedHashSet<>(stateUpdater.drainRestoredActiveTasks(timeout));
1066+
final Iterator<StreamTask> iterator = restoredTasks.iterator();
1067+
1068+
try {
1069+
while (iterator.hasNext()) {
1070+
final Task task = iterator.next();
1071+
transitRestoredTaskToRunning(task, now, offsetResetter);
1072+
iterator.remove(); // Remove successfully transitioned tasks
1073+
}
1074+
} finally {
1075+
// Add back any tasks that we drained but didn't successfully transition
1076+
// from the state updater, so that they are closed during shutdown.
1077+
for (final Task task : restoredTasks) {
1078+
stateUpdater.add(task);
1079+
}
10621080
}
10631081
}
10641082

streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1643,6 +1643,52 @@ public void shouldHandleTimeoutExceptionInTransitRestoredTaskToRunning() {
16431643
verifyNoInteractions(consumer);
16441644
}
16451645

1646+
@Test
1647+
public void shouldAddFailedRestoredTasksBackToStateUpdaterOnException() {
1648+
final StreamTask task1 = statefulTask(taskId00, taskId00ChangelogPartitions)
1649+
.inState(State.RESTORING)
1650+
.withInputPartitions(taskId00Partitions).build();
1651+
final StreamTask task2 = statefulTask(taskId01, taskId01ChangelogPartitions)
1652+
.inState(State.RESTORING)
1653+
.withInputPartitions(taskId01Partitions).build();
1654+
final StreamTask task3 = statefulTask(taskId02, taskId02ChangelogPartitions)
1655+
.inState(State.RESTORING)
1656+
.withInputPartitions(taskId02Partitions).build();
1657+
1658+
// Use LinkedHashSet to ensure predictable iteration order
1659+
final Set<StreamTask> restoredTasks = new java.util.LinkedHashSet<>();
1660+
restoredTasks.add(task1);
1661+
restoredTasks.add(task2);
1662+
restoredTasks.add(task3);
1663+
1664+
final TasksRegistry tasks = mock(TasksRegistry.class);
1665+
final TaskManager taskManager = setUpTransitionToRunningOfRestoredTask(restoredTasks, tasks);
1666+
1667+
// task1 completes successfully, task2 throws StreamsException from maybeInitTaskTimeoutOrThrow
1668+
// task3 is never processed because task2 throws
1669+
final TimeoutException timeoutException = new TimeoutException();
1670+
doThrow(timeoutException).when(task2).completeRestoration(noOpResetter);
1671+
doThrow(new StreamsException("Task timeout exceeded", task2.id())).when(task2).maybeInitTaskTimeoutOrThrow(anyLong(), eq(timeoutException));
1672+
1673+
assertThrows(StreamsException.class, () -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
1674+
1675+
// task1 should be successfully transitioned
1676+
verify(tasks).addTask(task1);
1677+
verify(consumer).resume(task1.inputPartitions());
1678+
verify(task1).clearTaskTimeout();
1679+
1680+
// task2 should be added back to state updater once in the finally block
1681+
// (the add in the catch block doesn't execute because maybeInitTaskTimeoutOrThrow throws)
1682+
verify(stateUpdater).add(task2);
1683+
verify(tasks, never()).addTask(task2);
1684+
verify(task2, never()).clearTaskTimeout();
1685+
1686+
// task3 should also be added back to state updater in the finally block
1687+
verify(stateUpdater).add(task3);
1688+
verify(tasks, never()).addTask(task3);
1689+
verify(task3, never()).clearTaskTimeout();
1690+
}
1691+
16461692
private TaskManager setUpTransitionToRunningOfRestoredTask(final Set<StreamTask> statefulTasks,
16471693
final TasksRegistry tasks) {
16481694
when(stateUpdater.restoresActiveTasks()).thenReturn(true);

0 commit comments

Comments
 (0)