Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_KEY;
Expand Down Expand Up @@ -84,8 +84,7 @@ public class PipeTransferBatchReqBuilder implements AutoCloseable {
// If the leader cache is enabled, the batch will be divided by the leader endpoint,
// each endpoint has a batch.
// This is only used in plain batch since tsfile does not return redirection info.
private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch =
new ConcurrentHashMap<>();
private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch = new HashMap<>();

public PipeTransferBatchReqBuilder(final PipeParameters parameters) {
final boolean usingTsFileBatch =
Expand Down Expand Up @@ -178,22 +177,29 @@ public synchronized void onEvent(final TabletInsertionEvent event)
public synchronized List<Pair<TEndPoint, PipeTabletEventBatch>>
getAllNonEmptyAndShouldEmitBatches() {
final List<Pair<TEndPoint, PipeTabletEventBatch>> nonEmptyAndShouldEmitBatches =
new ArrayList<>();
new ArrayList<>(endPointToBatch.size() + 1);
if (!defaultBatch.isEmpty() && defaultBatch.shouldEmit()) {
nonEmptyAndShouldEmitBatches.add(new Pair<>(null, defaultBatch));
}
endPointToBatch.forEach(
(endPoint, batch) -> {
if (!batch.isEmpty() && batch.shouldEmit()) {
nonEmptyAndShouldEmitBatches.add(new Pair<>(endPoint, batch));
}
});
for (final Map.Entry<TEndPoint, PipeTabletEventPlainBatch> entry : endPointToBatch.entrySet()) {
final PipeTabletEventPlainBatch batch = entry.getValue();
if (!batch.isEmpty() && batch.shouldEmit()) {
nonEmptyAndShouldEmitBatches.add(new Pair<>(entry.getKey(), batch));
}
}
return nonEmptyAndShouldEmitBatches;
}

public boolean isEmpty() {
return defaultBatch.isEmpty()
&& endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty);
public synchronized boolean isEmpty() {
if (!defaultBatch.isEmpty()) {
return false;
}
for (final PipeTabletEventPlainBatch batch : endPointToBatch.values()) {
if (!batch.isEmpty()) {
return false;
}
}
return true;
}

public synchronized void discardEventsOfPipe(
Expand All @@ -206,12 +212,13 @@ public synchronized void discardEventsOfPipe(final CommitterKey committerKey) {
endPointToBatch.values().forEach(batch -> batch.discardEventsOfPipe(committerKey));
}

public int size() {
public synchronized int size() {
try {
return defaultBatch.events.size()
+ endPointToBatch.values().stream()
.map(batch -> batch.events.size())
.reduce(0, Integer::sum);
int size = defaultBatch.events.size();
for (final PipeTabletEventPlainBatch batch : endPointToBatch.values()) {
size += batch.events.size();
}
return size;
} catch (final Exception e) {
LOGGER.warn(
"Failed to get the size of PipeTransferBatchReqBuilder, return 0. Exception: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public synchronized void startListenAndAssign(
});
}

public synchronized void stopListenAndAssign(
public void stopListenAndAssign(
final String dataRegionId, final PipeRealtimeDataRegionSource source) {
PipeDataRegionAssigner assignerToClose = null;

Expand Down
Loading