Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,24 @@

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.persistent.PulsarCompactorSubscription;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.MessageMetadata;

/**
* The assigner to assign entries to the proper {@link Consumer} in the shared subscription.
*/

Copy link

Copilot AI Jan 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an unnecessary blank line between the Javadoc comment and the class annotations. The annotations should immediately follow the Javadoc without a blank line in between.

Suggested change

Copilot uses AI. Check for mistakes.
@Slf4j
@RequiredArgsConstructor
public class SharedConsumerAssignor {

Expand All @@ -50,6 +56,8 @@ public class SharedConsumerAssignor {
// Process the unassigned messages, e.g. adding them to the replay queue
private final java.util.function.Consumer<EntryAndMetadata> unassignedMessageProcessor;

private final Subscription subscription;

public Map<Consumer, List<EntryAndMetadata>> assign(final List<EntryAndMetadata> entryAndMetadataList,
final int numConsumers) {
assert numConsumers >= 0;
Expand Down Expand Up @@ -80,15 +88,41 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final List<EntryAndMetadata>

if (metadata == null || !metadata.hasUuid() || !metadata.hasChunkId() || !metadata.hasNumChunksFromMsg()) {
consumerToEntries.computeIfAbsent(consumer, __ -> new ArrayList<>()).add(entryAndMetadata);
availablePermits--;
} else {
final Consumer consumerForUuid = getConsumerForUuid(metadata, consumer, availablePermits);
final String uuid = metadata.getUuid();
Consumer consumerForUuid = uuidToConsumer.get(uuid);
if (consumerForUuid == null) {
unassignedMessageProcessor.accept(entryAndMetadata);
continue;
if (metadata.getChunkId() != 0) {
if (subscription != null) {
log.warn("[{}][{}] Skip the message because of it not the first chunk."
Copy link

Copilot AI Jan 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message contains a grammatical error. "because of it not the first chunk" should be "because it is not the first chunk".

Suggested change
log.warn("[{}][{}] Skip the message because of it not the first chunk."
log.warn("[{}][{}] Skip the message because it is not the first chunk."

Copilot uses AI. Check for mistakes.
+ " Position: {}, UUID: {}, ChunkId: {}, NumChunksFromMsg: {}",
subscription.getTopicName(), subscription.getName(), entryAndMetadata.getPosition(),
metadata.getUuid(), metadata.getChunkId(), metadata.getNumChunksFromMsg());
// Directly ack the message
if (!(subscription instanceof PulsarCompactorSubscription)) {
Comment on lines +98 to +103
Copy link

Copilot AI Jan 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message uses double concatenation with a plus sign outside the format arguments, which is unconventional and harder to read. The message should be formatted as a single string template or the concatenation should be inside the curly braces if necessary for readability.

Suggested change
log.warn("[{}][{}] Skip the message because of it not the first chunk."
+ " Position: {}, UUID: {}, ChunkId: {}, NumChunksFromMsg: {}",
subscription.getTopicName(), subscription.getName(), entryAndMetadata.getPosition(),
metadata.getUuid(), metadata.getChunkId(), metadata.getNumChunksFromMsg());
// Directly ack the message
if (!(subscription instanceof PulsarCompactorSubscription)) {
log.warn("[{}][{}] Skip the message because of it not the first chunk. Position: {}, UUID: {}, ChunkId: {}, NumChunksFromMsg: {}",
subscription.getTopicName(), subscription.getName(), entryAndMetadata.getPosition(),
metadata.getUuid(), metadata.getChunkId(), metadata.getNumChunksFromMsg());
// Directly ack the message
if (!(subscription instanceof PulsarCompactorSubscription)) {
if (!(subscription instanceof PulsarCompactorSubscription)) {

Copilot uses AI. Check for mistakes.
subscription.acknowledgeMessage(Collections.singletonList(
entryAndMetadata.getPosition()), AckType.Individual, Collections.emptyMap());
entryAndMetadata.release();
}
}
Comment on lines +106 to +108
Copy link

Copilot AI Jan 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When subscription is null or the subscription is a PulsarCompactorSubscription, non-first chunks without a cached consumer are not acknowledged or released (the if block at lines 97-108 doesn't execute acknowledgment). However, the code still proceeds to cache the UUID mapping (line 111) and add the message to consumerToEntries (line 120). This means subsequent chunks with the same UUID will be delivered to this consumer, even though chunk0 was never received. This partially defeats the purpose of the fix, as the subscription will still receive incomplete chunked messages in these cases.

Suggested change
entryAndMetadata.release();
}
}
entryAndMetadata.release();
} else {
// For compactor subscriptions, just release the entry without ack
entryAndMetadata.release();
}
} else {
// No subscription available, just release the entry
entryAndMetadata.release();
}
// Do not cache UUID mapping or deliver this non-first chunk
continue;

Copilot uses AI. Check for mistakes.
}
consumerForUuid = consumer;
uuidToConsumer.put(uuid, consumerForUuid);
}

final int permits = consumerToPermits.computeIfAbsent(consumerForUuid, Consumer::getAvailablePermits);
if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 1) {
// The last chunk is received, we should remove the uuid
Copy link

Copilot AI Jan 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment states "we should remove the uuid" but should clarify "we should remove the uuid from the cache" for better clarity.

Suggested change
// The last chunk is received, we should remove the uuid
// The last chunk is received, we should remove the uuid from the cache

Copilot uses AI. Check for mistakes.
uuidToConsumer.remove(uuid);
}

consumerToEntries.computeIfAbsent(consumerForUuid, __ -> new ArrayList<>()).add(entryAndMetadata);
Comment on lines +103 to 120
Copy link

Copilot AI Jan 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a non-first chunk is acknowledged and released (lines 104-106), the message should not continue to be processed. However, the code proceeds to add this already-released message to consumerToEntries (line 120) and cache its UUID mapping (line 111). This will cause the consumer to receive a message whose buffer has already been released, leading to potential reference counting errors or access to freed memory. After acknowledging and releasing the message, the code should continue to the next iteration of the loop without further processing.

Copilot uses AI. Check for mistakes.
consumerToPermits.put(consumerForUuid, permits - 1);
if (consumerForUuid == consumer) {
availablePermits--;
}
}
availablePermits--;
}

for (; index < entryAndMetadataList.size(); index++) {
Expand All @@ -111,29 +145,4 @@ private Consumer getConsumer(final int numConsumers) {
}
return null;
}

private Consumer getConsumerForUuid(final MessageMetadata metadata,
final Consumer defaultConsumer,
final int currentAvailablePermits) {
final String uuid = metadata.getUuid();
Consumer consumer = uuidToConsumer.get(uuid);
if (consumer == null) {
if (metadata.getChunkId() != 0) {
// Not the first chunk, skip it
return null;
}
consumer = defaultConsumer;
uuidToConsumer.put(uuid, consumer);
}
final int permits = consumerToPermits.computeIfAbsent(consumer, Consumer::getAvailablePermits);
if (permits <= 0) {
return null;
}
if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 1) {
// The last chunk is received, we should remove the cache
uuidToConsumer.remove(uuid);
}
consumerToPermits.put(consumer, currentAvailablePermits - 1);
return consumer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso
: RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.initializeDispatchRateLimiterIfNeeded();
this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addEntryToReplay);
this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addEntryToReplay, subscription);
ServiceConfiguration serviceConfiguration = topic.getBrokerService().pulsar().getConfiguration();
this.readFailureBackoff = new Backoff(
serviceConfiguration.getDispatcherReadFailureBackoffInitialTimeInMs(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public PersistentDispatcherMultipleConsumersClassic(PersistentTopic topic, Manag
: RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.initializeDispatchRateLimiterIfNeeded();
this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addMessageToReplay);
this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addMessageToReplay, subscription);
this.readFailureBackoff = new Backoff(
topic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(),
TimeUnit.MILLISECONDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void prepareData() {
roundRobinConsumerSelector.clear();
entryAndMetadataList.clear();
replayQueue.clear();
assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, replayQueue::add);
assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, replayQueue::add, null);
Copy link

Copilot AI Jan 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test passes null for the subscription parameter, which means the new logic for handling missing chunk0 (lines 97-108 in SharedConsumerAssignor.java) is not covered by existing tests. Consider adding test cases that:

  1. Verify non-first chunks are properly acknowledged when chunk0 is missing
  2. Test the behavior when subscription is a PulsarCompactorSubscription
  3. Ensure that acknowledged chunks don't get dispatched to consumers

Copilot uses AI. Check for mistakes.
final AtomicLong entryId = new AtomicLong(0L);
final MockProducer producerA = new MockProducer("A", entryId, entryAndMetadataList);
final MockProducer producerB = new MockProducer("B", entryId, entryAndMetadataList);
Expand Down
Loading