-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][broker]Fixed an issue where the entire subscription would be blocked when a chunk message with an ID of zero did not exist. #25120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -20,18 +20,24 @@ | |||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| import com.google.common.annotations.VisibleForTesting; | ||||||||||||||||||||||||||||||
| import java.util.ArrayList; | ||||||||||||||||||||||||||||||
| import java.util.Collections; | ||||||||||||||||||||||||||||||
| import java.util.IdentityHashMap; | ||||||||||||||||||||||||||||||
| import java.util.List; | ||||||||||||||||||||||||||||||
| import java.util.Map; | ||||||||||||||||||||||||||||||
| import java.util.concurrent.ConcurrentHashMap; | ||||||||||||||||||||||||||||||
| import java.util.function.Supplier; | ||||||||||||||||||||||||||||||
| import lombok.Getter; | ||||||||||||||||||||||||||||||
| import lombok.RequiredArgsConstructor; | ||||||||||||||||||||||||||||||
| import lombok.extern.slf4j.Slf4j; | ||||||||||||||||||||||||||||||
| import org.apache.pulsar.broker.service.persistent.PulsarCompactorSubscription; | ||||||||||||||||||||||||||||||
| import org.apache.pulsar.common.api.proto.CommandAck.AckType; | ||||||||||||||||||||||||||||||
| import org.apache.pulsar.common.api.proto.MessageMetadata; | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||
| * The assigner to assign entries to the proper {@link Consumer} in the shared subscription. | ||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| @Slf4j | ||||||||||||||||||||||||||||||
| @RequiredArgsConstructor | ||||||||||||||||||||||||||||||
| public class SharedConsumerAssignor { | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
|
|
@@ -50,6 +56,8 @@ public class SharedConsumerAssignor { | |||||||||||||||||||||||||||||
| // Process the unassigned messages, e.g. adding them to the replay queue | ||||||||||||||||||||||||||||||
| private final java.util.function.Consumer<EntryAndMetadata> unassignedMessageProcessor; | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| private final Subscription subscription; | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| public Map<Consumer, List<EntryAndMetadata>> assign(final List<EntryAndMetadata> entryAndMetadataList, | ||||||||||||||||||||||||||||||
| final int numConsumers) { | ||||||||||||||||||||||||||||||
| assert numConsumers >= 0; | ||||||||||||||||||||||||||||||
|
|
@@ -80,15 +88,41 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final List<EntryAndMetadata> | |||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| if (metadata == null || !metadata.hasUuid() || !metadata.hasChunkId() || !metadata.hasNumChunksFromMsg()) { | ||||||||||||||||||||||||||||||
| consumerToEntries.computeIfAbsent(consumer, __ -> new ArrayList<>()).add(entryAndMetadata); | ||||||||||||||||||||||||||||||
| availablePermits--; | ||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||
| final Consumer consumerForUuid = getConsumerForUuid(metadata, consumer, availablePermits); | ||||||||||||||||||||||||||||||
| final String uuid = metadata.getUuid(); | ||||||||||||||||||||||||||||||
| Consumer consumerForUuid = uuidToConsumer.get(uuid); | ||||||||||||||||||||||||||||||
| if (consumerForUuid == null) { | ||||||||||||||||||||||||||||||
| unassignedMessageProcessor.accept(entryAndMetadata); | ||||||||||||||||||||||||||||||
| continue; | ||||||||||||||||||||||||||||||
| if (metadata.getChunkId() != 0) { | ||||||||||||||||||||||||||||||
| if (subscription != null) { | ||||||||||||||||||||||||||||||
| log.warn("[{}][{}] Skip the message because of it not the first chunk." | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
| log.warn("[{}][{}] Skip the message because of it not the first chunk." | |
| log.warn("[{}][{}] Skip the message because it is not the first chunk." |
Copilot
AI
Jan 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message uses double concatenation with a plus sign outside the format arguments, which is unconventional and harder to read. The message should be formatted as a single string template or the concatenation should be inside the curly braces if necessary for readability.
| log.warn("[{}][{}] Skip the message because of it not the first chunk." | |
| + " Position: {}, UUID: {}, ChunkId: {}, NumChunksFromMsg: {}", | |
| subscription.getTopicName(), subscription.getName(), entryAndMetadata.getPosition(), | |
| metadata.getUuid(), metadata.getChunkId(), metadata.getNumChunksFromMsg()); | |
| // Directly ack the message | |
| if (!(subscription instanceof PulsarCompactorSubscription)) { | |
| log.warn("[{}][{}] Skip the message because of it not the first chunk. Position: {}, UUID: {}, ChunkId: {}, NumChunksFromMsg: {}", | |
| subscription.getTopicName(), subscription.getName(), entryAndMetadata.getPosition(), | |
| metadata.getUuid(), metadata.getChunkId(), metadata.getNumChunksFromMsg()); | |
| // Directly ack the message | |
| if (!(subscription instanceof PulsarCompactorSubscription)) { | |
| if (!(subscription instanceof PulsarCompactorSubscription)) { |
Copilot
AI
Jan 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When subscription is null or the subscription is a PulsarCompactorSubscription, non-first chunks without a cached consumer are not acknowledged or released (the if block at lines 97-108 doesn't execute acknowledgment). However, the code still proceeds to cache the UUID mapping (line 111) and add the message to consumerToEntries (line 120). This means subsequent chunks with the same UUID will be delivered to this consumer, even though chunk0 was never received. This partially defeats the purpose of the fix, as the subscription will still receive incomplete chunked messages in these cases.
| entryAndMetadata.release(); | |
| } | |
| } | |
| entryAndMetadata.release(); | |
| } else { | |
| // For compactor subscriptions, just release the entry without ack | |
| entryAndMetadata.release(); | |
| } | |
| } else { | |
| // No subscription available, just release the entry | |
| entryAndMetadata.release(); | |
| } | |
| // Do not cache UUID mapping or deliver this non-first chunk | |
| continue; |
Copilot
AI
Jan 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment states "we should remove the uuid" but should clarify "we should remove the uuid from the cache" for better clarity.
| // The last chunk is received, we should remove the uuid | |
| // The last chunk is received, we should remove the uuid from the cache |
Copilot
AI
Jan 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a non-first chunk is acknowledged and released (lines 104-106), the message should not continue to be processed. However, the code proceeds to add this already-released message to consumerToEntries (line 120) and cache its UUID mapping (line 111). This will cause the consumer to receive a message whose buffer has already been released, leading to potential reference counting errors or access to freed memory. After acknowledging and releasing the message, the code should continue to the next iteration of the loop without further processing.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,7 +58,7 @@ public void prepareData() { | |
| roundRobinConsumerSelector.clear(); | ||
| entryAndMetadataList.clear(); | ||
| replayQueue.clear(); | ||
| assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, replayQueue::add); | ||
| assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, replayQueue::add, null); | ||
|
||
| final AtomicLong entryId = new AtomicLong(0L); | ||
| final MockProducer producerA = new MockProducer("A", entryId, entryAndMetadataList); | ||
| final MockProducer producerB = new MockProducer("B", entryId, entryAndMetadataList); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an unnecessary blank line between the Javadoc comment and the class annotations. The annotations should immediately follow the Javadoc without a blank line in between.