diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java index 45264138596f..52827ac00088 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java @@ -39,10 +39,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_KEY; @@ -84,8 +84,7 @@ public class PipeTransferBatchReqBuilder implements AutoCloseable { // If the leader cache is enabled, the batch will be divided by the leader endpoint, // each endpoint has a batch. // This is only used in plain batch since tsfile does not return redirection info. - private final Map endPointToBatch = - new ConcurrentHashMap<>(); + private final Map endPointToBatch = new HashMap<>(); public PipeTransferBatchReqBuilder(final PipeParameters parameters) { final boolean usingTsFileBatch = @@ -178,22 +177,29 @@ public synchronized void onEvent(final TabletInsertionEvent event) public synchronized List> getAllNonEmptyAndShouldEmitBatches() { final List> nonEmptyAndShouldEmitBatches = - new ArrayList<>(); + new ArrayList<>(endPointToBatch.size() + 1); if (!defaultBatch.isEmpty() && defaultBatch.shouldEmit()) { nonEmptyAndShouldEmitBatches.add(new Pair<>(null, defaultBatch)); } - endPointToBatch.forEach( - (endPoint, batch) -> { - if (!batch.isEmpty() && batch.shouldEmit()) { - nonEmptyAndShouldEmitBatches.add(new Pair<>(endPoint, batch)); - } - }); + for (final Map.Entry entry : endPointToBatch.entrySet()) { + final PipeTabletEventPlainBatch batch = entry.getValue(); + if (!batch.isEmpty() && batch.shouldEmit()) { + nonEmptyAndShouldEmitBatches.add(new Pair<>(entry.getKey(), batch)); + } + } return nonEmptyAndShouldEmitBatches; } - public boolean isEmpty() { - return defaultBatch.isEmpty() - && endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty); + public synchronized boolean isEmpty() { + if (!defaultBatch.isEmpty()) { + return false; + } + for (final PipeTabletEventPlainBatch batch : endPointToBatch.values()) { + if (!batch.isEmpty()) { + return false; + } + } + return true; } public synchronized void discardEventsOfPipe( @@ -206,12 +212,13 @@ public synchronized void discardEventsOfPipe(final CommitterKey committerKey) { endPointToBatch.values().forEach(batch -> batch.discardEventsOfPipe(committerKey)); } - public int size() { + public synchronized int size() { try { - return defaultBatch.events.size() - + endPointToBatch.values().stream() - .map(batch -> batch.events.size()) - .reduce(0, Integer::sum); + int size = defaultBatch.events.size(); + for (final PipeTabletEventPlainBatch batch : endPointToBatch.values()) { + size += batch.events.size(); + } + return size; } catch (final Exception e) { LOGGER.warn( "Failed to get the size of PipeTransferBatchReqBuilder, return 0. Exception: {}", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java index ad3586df830e..9345d2574629 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java @@ -63,7 +63,7 @@ public synchronized void startListenAndAssign( }); } - public synchronized void stopListenAndAssign( + public void stopListenAndAssign( final String dataRegionId, final PipeRealtimeDataRegionSource source) { PipeDataRegionAssigner assignerToClose = null;