From 4787a27bc777e2b4b0d5430990ff67aeff277eea Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 9 Jun 2026 15:37:57 +0800 Subject: [PATCH] Pipe: stop historical source supply after drop --- .../db/pipe/agent/task/PipeDataNodeTask.java | 2 +- ...icalDataRegionTsFileAndDeletionSource.java | 104 +++++++++++++----- 2 files changed, 75 insertions(+), 31 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java index 0d4e50cabef1f..c5e78685b9d42 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTask.java @@ -68,9 +68,9 @@ public void create() { @Override public void drop() { final long startTime = System.currentTimeMillis(); - sourceStage.drop(); processorStage.drop(); sinkStage.drop(); + sourceStage.drop(); LOGGER.info( DataNodePipeMessages.DROP_PIPE_DN_TASK_SUCCESSFULLY_WITHIN_MS, this, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java index e71d80a61a6bc..898bc9464d1f9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java @@ -78,6 +78,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -162,10 +163,12 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource private boolean isForwardingPipeRequests; private volatile boolean hasBeenStarted = false; + private volatile boolean hasBeenClosed = false; private Queue pendingQueue; private final Map> filteredTsFileResources2TableNames = new HashMap<>(); + private final Set pinnedTsFileResources = new HashSet<>(); private final Map pendingResource2ReplicateIndexForIoTV2 = new HashMap<>(); private int extractedHistoricalTsFileCount = 0; @@ -480,6 +483,9 @@ private ProgressIndex extractRecoverProgressIndex(RecoverProgressIndex toBeTrans @Override public synchronized void start() { + if (hasBeenClosed) { + return; + } if (!shouldExtractInsertion && !shouldExtractDeletion) { hasBeenStarted = true; return; @@ -508,12 +514,21 @@ public synchronized void start() { if (shouldExtractInsertion) { flushTsFilesForExtraction(dataRegion); + if (hasBeenClosed) { + return; + } extractTsFiles(dataRegion, startHistoricalExtractionTime, originalResourceList); } + if (hasBeenClosed) { + return; + } if (shouldExtractDeletion) { Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionId)) .ifPresent(manager -> extractDeletions(manager, originalResourceList)); } + if (hasBeenClosed) { + return; + } // Sort tsFileResource and deletionResource long startTime = System.currentTimeMillis(); @@ -561,6 +576,9 @@ private void extractTsFiles( final DataRegion dataRegion, final long startHistoricalExtractionTime, final List originalResourceList) { + if (hasBeenClosed) { + return; + } final TsFileManager tsFileManager = dataRegion.getTsFileManager(); tsFileManager.readLock(); try { @@ -580,7 +598,8 @@ private void extractTsFiles( .peek(originalResourceList::add) .filter( resource -> - isHistoricalSourceEnabled + !hasBeenClosed + && isHistoricalSourceEnabled && // Some resource is marked as deleted but not removed from the list. !resource.isDeleted() @@ -613,7 +632,8 @@ && mayTsFileResourceOverlappedWithPattern(resource))) .peek(originalResourceList::add) .filter( resource -> - isHistoricalSourceEnabled + !hasBeenClosed + && isHistoricalSourceEnabled && // Some resource is marked as deleted but not removed from the list. !resource.isDeleted() @@ -645,11 +665,15 @@ && mayTsFileResourceOverlappedWithPattern(resource))) .keySet() .removeIf( resource -> { + if (hasBeenClosed) { + return true; + } // Pin the resource, in case the file is removed by compaction or anything. // Will unpin it after the PipeTsFileInsertionEvent is created and pinned. try { PipeDataNodeResourceManager.tsfile() .pinTsFileResource(resource, shouldTransferModFile, pipeName); + pinnedTsFileResources.add(resource); return false; } catch (final IOException e) { LOGGER.warn( @@ -821,17 +845,21 @@ private void extractDeletions( @Override public synchronized Event supply() { + if (hasBeenClosed) { + return null; + } if (!hasBeenStarted && StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) { start(); } - if (Objects.isNull(pendingQueue)) { + if (hasBeenClosed || Objects.isNull(pendingQueue)) { return null; } final PersistentResource resource = pendingQueue.peek(); if (resource == null) { - return supplyTerminateEvent(); + final Event event = supplyTerminateEvent(); + return hasBeenClosed ? clearReferenceCountAndReturnNull(event) : event; } if (resource instanceof TsFileResource) { @@ -839,17 +867,26 @@ public synchronized Event supply() { if (consumeSkippedHistoricalTsFileEventIfNecessary(tsFileResource)) { clearReplicateIndexForResource(tsFileResource); pendingQueue.poll(); - return supplyProgressReportEvent(tsFileResource.getMaxProgressIndex()); + final Event event = supplyProgressReportEvent(tsFileResource.getMaxProgressIndex()); + return hasBeenClosed ? clearReferenceCountAndReturnNull(event) : event; } final Event event = supplyTsFileEvent(tsFileResource); pendingQueue.poll(); - return event; + return hasBeenClosed ? clearReferenceCountAndReturnNull(event) : event; } final Event event = supplyDeletionEvent((DeletionResource) resource); pendingQueue.poll(); - return event; + return hasBeenClosed ? clearReferenceCountAndReturnNull(event) : event; + } + + private Event clearReferenceCountAndReturnNull(final Event event) { + if (event instanceof EnrichedEvent) { + ((EnrichedEvent) event) + .clearReferenceCount(PipeHistoricalDataRegionTsFileAndDeletionSource.class.getName()); + } + return null; } private Event supplyTerminateEvent() { @@ -909,6 +946,8 @@ protected boolean consumeSkippedHistoricalTsFileEventIfNecessary(final TsFileRes dataRegionId, resource.getTsFilePath(), e); + } finally { + pinnedTsFileResources.remove(resource); } } } @@ -1003,6 +1042,8 @@ protected Event supplyTsFileEvent(final TsFileResource resource) { pipeName, dataRegionId, resource.getTsFilePath()); + } finally { + pinnedTsFileResources.remove(resource); } } } @@ -1086,8 +1127,9 @@ protected void clearReplicateIndexForResource(final PersistentResource resource) public synchronized boolean hasConsumedAll() { // If the pendingQueue is null when the function is called, it implies that the extractor only // extracts deletion thus the historical event has nothing to consume. - return hasBeenStarted - && (Objects.isNull(pendingQueue) || pendingQueue.isEmpty() && isTerminateSignalSent); + return hasBeenClosed + || hasBeenStarted + && (Objects.isNull(pendingQueue) || pendingQueue.isEmpty() && isTerminateSignalSent); } @Override @@ -1096,30 +1138,32 @@ public int getPendingQueueSize() { } @Override - public synchronized void close() { - if (!isTerminateSignalSent) { - PipeTerminateEvent.clearHistoricalTransferSummary(pipeName, creationTime, dataRegionId); - } - if (Objects.nonNull(pendingQueue)) { - pendingQueue.forEach( + public void close() { + hasBeenClosed = true; + synchronized (this) { + if (!isTerminateSignalSent) { + PipeTerminateEvent.clearHistoricalTransferSummary(pipeName, creationTime, dataRegionId); + } + pinnedTsFileResources.forEach( resource -> { - if (resource instanceof TsFileResource) { - try { - PipeDataNodeResourceManager.tsfile() - .unpinTsFileResource( - (TsFileResource) resource, shouldTransferModFile, pipeName); - } catch (final IOException e) { - LOGGER.warn( - DataNodePipeMessages.PIPE_FAILED_TO_UNPIN_TSFILERESOURCE_AFTER_DROPPING, - pipeName, - dataRegionId, - ((TsFileResource) resource).getTsFilePath()); - } + try { + PipeDataNodeResourceManager.tsfile() + .unpinTsFileResource(resource, shouldTransferModFile, pipeName); + } catch (final IOException e) { + LOGGER.warn( + DataNodePipeMessages.PIPE_FAILED_TO_UNPIN_TSFILERESOURCE_AFTER_DROPPING, + pipeName, + dataRegionId, + resource.getTsFilePath()); } }); - pendingQueue.clear(); - pendingQueue = null; + pinnedTsFileResources.clear(); + if (Objects.nonNull(pendingQueue)) { + pendingQueue.clear(); + pendingQueue = null; + } + filteredTsFileResources2TableNames.clear(); + pendingResource2ReplicateIndexForIoTV2.clear(); } - pendingResource2ReplicateIndexForIoTV2.clear(); } }