Skip to content

Commit ed425ba

Browse files
gaurav-narulachia7712
authored andcommitted
KAFKA-19990 gracefully handle exceptions when handling AllocateProducerIdsResponse (#21135)
The handler in `RPCProducerIdManager` doesn't handle authentication exception and version mismatch exceptions gracefully. This change ensures we retry on such failures and adds unit tests for these scenarios. Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 0f9bd91 commit ed425ba

File tree

2 files changed

+122
-25
lines changed

2 files changed

+122
-25
lines changed

transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ public class RPCProducerIdManager implements ProducerIdManager {
5151
private final String logPrefix;
5252

5353
private final int brokerId;
54-
private final Time time;
54+
// Visible for testing
55+
final Time time;
5556
private final Supplier<Long> brokerEpochSupplier;
5657
private final NodeToControllerChannelManager controllerChannel;
5758

@@ -129,9 +130,7 @@ protected void sendRequest() {
129130

130131
@Override
131132
public void onComplete(ClientResponse response) {
132-
if (response.responseBody() instanceof AllocateProducerIdsResponse) {
133-
handleAllocateProducerIdsResponse((AllocateProducerIdsResponse) response.responseBody());
134-
}
133+
handleAllocateProducerIdsResponse(response);
135134
}
136135

137136
@Override
@@ -142,7 +141,30 @@ public void onTimeout() {
142141
});
143142
}
144143

145-
protected void handleAllocateProducerIdsResponse(AllocateProducerIdsResponse response) {
144+
private void handleUnsuccessfulResponse() {
145+
// There is no need to compare and set because only one thread
146+
// handles the AllocateProducerIds response.
147+
backoffDeadlineMs.set(time.milliseconds() + RETRY_BACKOFF_MS);
148+
requestInFlight.set(false);
149+
}
150+
151+
protected void handleAllocateProducerIdsResponse(ClientResponse clientResponse) {
152+
if (clientResponse.authenticationException() != null) {
153+
log.error("{} Unable to allocate producer id because of an authentication exception", logPrefix, clientResponse.authenticationException());
154+
handleUnsuccessfulResponse();
155+
return;
156+
}
157+
if (clientResponse.versionMismatch() != null) {
158+
log.error("{} Unable to allocate producer id because of a version mismatch exception", logPrefix, clientResponse.versionMismatch());
159+
handleUnsuccessfulResponse();
160+
return;
161+
}
162+
if (!clientResponse.hasResponse()) {
163+
log.error("{} Unable to allocate producer id because of empty response from controller", logPrefix);
164+
handleUnsuccessfulResponse();
165+
return;
166+
}
167+
AllocateProducerIdsResponse response = (AllocateProducerIdsResponse) clientResponse.responseBody();
146168
var data = response.data();
147169
var successfulResponse = false;
148170
var errors = Errors.forCode(data.errorCode());
@@ -161,10 +183,7 @@ protected void handleAllocateProducerIdsResponse(AllocateProducerIdsResponse res
161183
log.error("{} Received error code {} from the controller.", logPrefix, errors);
162184
}
163185
if (!successfulResponse) {
164-
// There is no need to compare and set because only one thread
165-
// handles the AllocateProducerIds response.
166-
backoffDeadlineMs.set(time.milliseconds() + RETRY_BACKOFF_MS);
167-
requestInFlight.set(false);
186+
handleUnsuccessfulResponse();
168187
}
169188
}
170189

transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/ProducerIdManagerTest.java

Lines changed: 94 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616
*/
1717
package org.apache.kafka.coordinator.transaction;
1818

19+
import org.apache.kafka.clients.ClientResponse;
20+
import org.apache.kafka.common.errors.AuthenticationException;
1921
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
22+
import org.apache.kafka.common.errors.UnsupportedVersionException;
2023
import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
2124
import org.apache.kafka.common.protocol.Errors;
2225
import org.apache.kafka.common.requests.AllocateProducerIdsResponse;
@@ -61,43 +64,82 @@ class MockProducerIdManager extends RPCProducerIdManager {
6164
private final ExecutorService brokerToControllerRequestExecutor = Executors.newSingleThreadExecutor();
6265
private final int idLen;
6366
private Long idStart;
67+
private boolean hasAuthenticationException;
68+
private boolean hasVersionMismatch;
69+
private boolean hasNoResponse;
6470

6571
MockProducerIdManager(int brokerId,
6672
long idStart,
6773
int idLen,
6874
Queue<Errors> errorQueue,
6975
boolean isErroneousBlock,
70-
Time time) {
76+
Time time,
77+
boolean hasAuthenticationException,
78+
boolean hasVersionMismatch,
79+
boolean hasNoResponse) {
7180
super(brokerId, time, () -> 1L, brokerToController);
7281
this.idStart = idStart;
7382
this.idLen = idLen;
7483
this.errorQueue = errorQueue;
7584
this.isErroneousBlock = isErroneousBlock;
85+
this.hasAuthenticationException = hasAuthenticationException;
86+
this.hasVersionMismatch = hasVersionMismatch;
87+
this.hasNoResponse = hasNoResponse;
88+
}
89+
90+
private ClientResponse createClientResponse(
91+
AuthenticationException authenticationException,
92+
UnsupportedVersionException versionException,
93+
AllocateProducerIdsResponse response
94+
) {
95+
return new ClientResponse(null, null, null, time.milliseconds(), time.milliseconds(),
96+
false, versionException, authenticationException, response);
7697
}
7798

7899
@Override
79100
protected void sendRequest() {
80101
brokerToControllerRequestExecutor.submit(() -> {
102+
if (hasAuthenticationException) {
103+
handleAllocateProducerIdsResponse(createClientResponse(new AuthenticationException("Auth Failure"), null, null));
104+
hasAuthenticationException = false; // reset so retry works
105+
return;
106+
}
107+
if (hasVersionMismatch) {
108+
handleAllocateProducerIdsResponse(createClientResponse(null, new UnsupportedVersionException("Version Mismatch"), null));
109+
hasVersionMismatch = false; // reset so retry works
110+
return;
111+
}
112+
if (hasNoResponse) {
113+
handleAllocateProducerIdsResponse(createClientResponse(null, null, null));
114+
hasNoResponse = false; // reset so retry works
115+
return;
116+
}
81117
Errors error = errorQueue.poll();
82118
if (error == null || error == Errors.NONE) {
83-
handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse(
84-
new AllocateProducerIdsResponseData()
85-
.setProducerIdStart(idStart)
86-
.setProducerIdLen(idLen)
87-
));
119+
handleAllocateProducerIdsResponse(createClientResponse(
120+
null,
121+
null,
122+
new AllocateProducerIdsResponse(
123+
new AllocateProducerIdsResponseData()
124+
.setProducerIdStart(idStart)
125+
.setProducerIdLen(idLen)
126+
)));
88127
if (!isErroneousBlock) {
89128
idStart += idLen;
90129
}
91130
} else {
92-
handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse(
93-
new AllocateProducerIdsResponseData().setErrorCode(error.code())
94-
));
131+
handleAllocateProducerIdsResponse(createClientResponse(
132+
null,
133+
null,
134+
new AllocateProducerIdsResponse(
135+
new AllocateProducerIdsResponseData().setErrorCode(error.code())
136+
)));
95137
}
96138
}, 0);
97139
}
98140

99141
@Override
100-
protected void handleAllocateProducerIdsResponse(AllocateProducerIdsResponse response) {
142+
protected void handleAllocateProducerIdsResponse(ClientResponse response) {
101143
super.handleAllocateProducerIdsResponse(response);
102144
capturedFailure.set(nextProducerIdBlock.get() == null);
103145
}
@@ -112,7 +154,7 @@ public void testConcurrentGeneratePidRequests(int idBlockLen) throws Interrupted
112154
var numThreads = 5;
113155
var latch = new CountDownLatch(idBlockLen * 3);
114156
var manager = new MockProducerIdManager(0, 0, idBlockLen,
115-
new ConcurrentLinkedQueue<>(), false, Time.SYSTEM);
157+
new ConcurrentLinkedQueue<>(), false, Time.SYSTEM, false, false, false);
116158
var requestHandlerThreadPool = Executors.newFixedThreadPool(numThreads);
117159
Map<Long, Integer> pidMap = new ConcurrentHashMap<>();
118160

@@ -149,7 +191,7 @@ public void testConcurrentGeneratePidRequests(int idBlockLen) throws Interrupted
149191
@EnumSource(value = Errors.class, names = {"UNKNOWN_SERVER_ERROR", "INVALID_REQUEST"})
150192
public void testUnrecoverableErrors(Errors error) throws Exception {
151193
var time = new MockTime();
152-
var manager = new MockProducerIdManager(0, 0, 1, queue(Errors.NONE, error), false, time);
194+
var manager = new MockProducerIdManager(0, 0, 1, queue(Errors.NONE, error), false, time, false, false, false);
153195
verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0);
154196
verifyFailureWithoutGenerateProducerId(manager);
155197

@@ -159,20 +201,56 @@ public void testUnrecoverableErrors(Errors error) throws Exception {
159201

160202
@Test
161203
public void testInvalidRanges() throws InterruptedException {
162-
var manager = new MockProducerIdManager(0, -1, 10, new ConcurrentLinkedQueue<>(), true, Time.SYSTEM);
204+
var manager = new MockProducerIdManager(0, -1, 10, new ConcurrentLinkedQueue<>(), true, Time.SYSTEM, false, false, false);
163205
verifyFailure(manager);
164206

165-
manager = new MockProducerIdManager(0, 0, -1, new ConcurrentLinkedQueue<>(), true, Time.SYSTEM);
207+
manager = new MockProducerIdManager(0, 0, -1, new ConcurrentLinkedQueue<>(), true, Time.SYSTEM, false, false, false);
166208
verifyFailure(manager);
167209

168-
manager = new MockProducerIdManager(0, Long.MAX_VALUE - 1, 10, new ConcurrentLinkedQueue<>(), true, Time.SYSTEM);
210+
manager = new MockProducerIdManager(0, Long.MAX_VALUE - 1, 10, new ConcurrentLinkedQueue<>(), true, Time.SYSTEM, false, false, false);
169211
verifyFailure(manager);
170212
}
171213

172214
@Test
173215
public void testRetryBackoff() throws Exception {
174216
var time = new MockTime();
175-
var manager = new MockProducerIdManager(0, 0, 1, queue(Errors.UNKNOWN_SERVER_ERROR), false, time);
217+
var manager = new MockProducerIdManager(0, 0, 1, queue(Errors.UNKNOWN_SERVER_ERROR), false, time, false, false, false);
218+
219+
verifyFailure(manager);
220+
221+
assertThrows(CoordinatorLoadInProgressException.class, manager::generateProducerId);
222+
time.sleep(RETRY_BACKOFF_MS);
223+
verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0);
224+
}
225+
226+
@Test
227+
public void testRetryBackoffOnAuthException() throws Exception {
228+
var time = new MockTime();
229+
var manager = new MockProducerIdManager(0, 0, 1, new ConcurrentLinkedQueue<>(), false, time, true, false, false);
230+
231+
verifyFailure(manager);
232+
233+
assertThrows(CoordinatorLoadInProgressException.class, manager::generateProducerId);
234+
time.sleep(RETRY_BACKOFF_MS);
235+
verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0);
236+
}
237+
238+
@Test
239+
public void testRetryBackoffOnVersionMismatch() throws Exception {
240+
var time = new MockTime();
241+
var manager = new MockProducerIdManager(0, 0, 1, new ConcurrentLinkedQueue<>(), false, time, false, true, false);
242+
243+
verifyFailure(manager);
244+
245+
assertThrows(CoordinatorLoadInProgressException.class, manager::generateProducerId);
246+
time.sleep(RETRY_BACKOFF_MS);
247+
verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0);
248+
}
249+
250+
@Test
251+
public void testRetryBackoffOnNoResponse() throws Exception {
252+
var time = new MockTime();
253+
var manager = new MockProducerIdManager(0, 0, 1, new ConcurrentLinkedQueue<>(), false, time, false, false, true);
176254

177255
verifyFailure(manager);
178256

0 commit comments

Comments
 (0)