Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ public void create() {
@Override
public void drop() {
final long startTime = System.currentTimeMillis();
sourceStage.drop();
processorStage.drop();
sinkStage.drop();
sourceStage.drop();
LOGGER.info(
DataNodePipeMessages.DROP_PIPE_DN_TASK_SUCCESSFULLY_WITHIN_MS,
this,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -162,10 +163,12 @@
private boolean isForwardingPipeRequests;

private volatile boolean hasBeenStarted = false;
private volatile boolean hasBeenClosed = false;

private Queue<PersistentResource> pendingQueue;
private final Map<TsFileResource, Set<String>> filteredTsFileResources2TableNames =
new HashMap<>();
private final Set<TsFileResource> pinnedTsFileResources = new HashSet<>();
private final Map<PersistentResource, Long> pendingResource2ReplicateIndexForIoTV2 =
new HashMap<>();
private int extractedHistoricalTsFileCount = 0;
Expand Down Expand Up @@ -480,6 +483,9 @@

@Override
public synchronized void start() {
if (hasBeenClosed) {
return;
}
if (!shouldExtractInsertion && !shouldExtractDeletion) {
hasBeenStarted = true;
return;
Expand Down Expand Up @@ -508,12 +514,21 @@

if (shouldExtractInsertion) {
flushTsFilesForExtraction(dataRegion);
if (hasBeenClosed) {
return;
}
extractTsFiles(dataRegion, startHistoricalExtractionTime, originalResourceList);
}
if (hasBeenClosed) {
return;
}
if (shouldExtractDeletion) {
Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionId))
.ifPresent(manager -> extractDeletions(manager, originalResourceList));
}
if (hasBeenClosed) {
return;
}

// Sort tsFileResource and deletionResource
long startTime = System.currentTimeMillis();
Expand Down Expand Up @@ -561,6 +576,9 @@
final DataRegion dataRegion,
final long startHistoricalExtractionTime,
final List<PersistentResource> originalResourceList) {
if (hasBeenClosed) {
return;
}
final TsFileManager tsFileManager = dataRegion.getTsFileManager();
tsFileManager.readLock();
try {
Expand All @@ -580,7 +598,8 @@
.peek(originalResourceList::add)
.filter(
resource ->
isHistoricalSourceEnabled
!hasBeenClosed
&& isHistoricalSourceEnabled
&&
// Some resource is marked as deleted but not removed from the list.
!resource.isDeleted()
Expand Down Expand Up @@ -613,7 +632,8 @@
.peek(originalResourceList::add)
.filter(
resource ->
isHistoricalSourceEnabled
!hasBeenClosed
&& isHistoricalSourceEnabled
&&
// Some resource is marked as deleted but not removed from the list.
!resource.isDeleted()
Expand Down Expand Up @@ -645,11 +665,15 @@
.keySet()
.removeIf(
resource -> {
if (hasBeenClosed) {
return true;
}
// Pin the resource, in case the file is removed by compaction or anything.
// Will unpin it after the PipeTsFileInsertionEvent is created and pinned.
try {
PipeDataNodeResourceManager.tsfile()
.pinTsFileResource(resource, shouldTransferModFile, pipeName);
pinnedTsFileResources.add(resource);
return false;
} catch (final IOException e) {
LOGGER.warn(
Expand Down Expand Up @@ -820,36 +844,49 @@
}

@Override
public synchronized Event supply() {

Check failure on line 847 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 17 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ6rWCuichQzNLXloTSq&open=AZ6rWCuichQzNLXloTSq&pullRequest=17877
if (hasBeenClosed) {
return null;
}
if (!hasBeenStarted && StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
start();
}

if (Objects.isNull(pendingQueue)) {
if (hasBeenClosed || Objects.isNull(pendingQueue)) {
return null;
}

final PersistentResource resource = pendingQueue.peek();
if (resource == null) {
return supplyTerminateEvent();
final Event event = supplyTerminateEvent();
return hasBeenClosed ? clearReferenceCountAndReturnNull(event) : event;
}

if (resource instanceof TsFileResource) {
final TsFileResource tsFileResource = (TsFileResource) resource;
if (consumeSkippedHistoricalTsFileEventIfNecessary(tsFileResource)) {
clearReplicateIndexForResource(tsFileResource);
pendingQueue.poll();
return supplyProgressReportEvent(tsFileResource.getMaxProgressIndex());
final Event event = supplyProgressReportEvent(tsFileResource.getMaxProgressIndex());
return hasBeenClosed ? clearReferenceCountAndReturnNull(event) : event;
}

final Event event = supplyTsFileEvent(tsFileResource);
pendingQueue.poll();
return event;
return hasBeenClosed ? clearReferenceCountAndReturnNull(event) : event;
}

final Event event = supplyDeletionEvent((DeletionResource) resource);
pendingQueue.poll();
return event;
return hasBeenClosed ? clearReferenceCountAndReturnNull(event) : event;
}

private Event clearReferenceCountAndReturnNull(final Event event) {
if (event instanceof EnrichedEvent) {
((EnrichedEvent) event)
.clearReferenceCount(PipeHistoricalDataRegionTsFileAndDeletionSource.class.getName());
}
return null;
}

private Event supplyTerminateEvent() {
Expand Down Expand Up @@ -909,6 +946,8 @@
dataRegionId,
resource.getTsFilePath(),
e);
} finally {
pinnedTsFileResources.remove(resource);
}
}
}
Expand Down Expand Up @@ -1003,6 +1042,8 @@
pipeName,
dataRegionId,
resource.getTsFilePath());
} finally {
pinnedTsFileResources.remove(resource);
}
}
}
Expand Down Expand Up @@ -1086,8 +1127,9 @@
public synchronized boolean hasConsumedAll() {
// If the pendingQueue is null when the function is called, it implies that the extractor only
// extracts deletion thus the historical event has nothing to consume.
return hasBeenStarted
&& (Objects.isNull(pendingQueue) || pendingQueue.isEmpty() && isTerminateSignalSent);
return hasBeenClosed
|| hasBeenStarted
&& (Objects.isNull(pendingQueue) || pendingQueue.isEmpty() && isTerminateSignalSent);
}

@Override
Expand All @@ -1096,30 +1138,32 @@
}

@Override
public synchronized void close() {
if (!isTerminateSignalSent) {
PipeTerminateEvent.clearHistoricalTransferSummary(pipeName, creationTime, dataRegionId);
}
if (Objects.nonNull(pendingQueue)) {
pendingQueue.forEach(
public void close() {
hasBeenClosed = true;
synchronized (this) {
if (!isTerminateSignalSent) {
PipeTerminateEvent.clearHistoricalTransferSummary(pipeName, creationTime, dataRegionId);
}
pinnedTsFileResources.forEach(
resource -> {
if (resource instanceof TsFileResource) {
try {
PipeDataNodeResourceManager.tsfile()
.unpinTsFileResource(
(TsFileResource) resource, shouldTransferModFile, pipeName);
} catch (final IOException e) {
LOGGER.warn(
DataNodePipeMessages.PIPE_FAILED_TO_UNPIN_TSFILERESOURCE_AFTER_DROPPING,
pipeName,
dataRegionId,
((TsFileResource) resource).getTsFilePath());
}
try {
PipeDataNodeResourceManager.tsfile()
.unpinTsFileResource(resource, shouldTransferModFile, pipeName);
} catch (final IOException e) {
LOGGER.warn(
DataNodePipeMessages.PIPE_FAILED_TO_UNPIN_TSFILERESOURCE_AFTER_DROPPING,
pipeName,
dataRegionId,
resource.getTsFilePath());
}
});
pendingQueue.clear();
pendingQueue = null;
pinnedTsFileResources.clear();
if (Objects.nonNull(pendingQueue)) {
pendingQueue.clear();
pendingQueue = null;
}
filteredTsFileResources2TableNames.clear();
pendingResource2ReplicateIndexForIoTV2.clear();
}
pendingResource2ReplicateIndexForIoTV2.clear();
}
}
Loading