From e7d15181bf6b1003245d7c5bbcccd12cbd5939e0 Mon Sep 17 00:00:00 2001 From: Dmitry Kasimovskiy Date: Fri, 26 Jun 2026 14:00:31 +0300 Subject: [PATCH 1/2] fix(tests): stabilize TQEClusterIntegrationTest against queue dedup The TQE queue config (both 2.x and 3.x) sets deduplication_mode: keep_latest, so duplicate (age, name) payloads collapse on the broker. The previous Instancio generator with allowEmpty().nullable() names periodically produced collisions (1 in ~30 runs), leaving the consumer unable to ever receive USERS_COUNT messages and timing out at 60s. Generate strictly-unique deterministic users so dedup is a no-op. Also subscribe before publishing so the consumer stream is active when messages are produced, avoiding reliance on cursor="" replay while the broker is settling. Co-Authored-By: Claude Opus 4.7 --- .../tqe/TQEClusterIntegrationTest.java | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterIntegrationTest.java b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterIntegrationTest.java index 74b17f42..42b1affb 100644 --- a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterIntegrationTest.java +++ b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterIntegrationTest.java @@ -5,15 +5,13 @@ package org.testcontainers.containers.integration.tqe; +import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; -import org.instancio.Instancio; -import org.instancio.Select; -import org.instancio.generators.Generators; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.params.ParameterizedTest; @@ -36,20 +34,23 @@ void testPublishAndConsumeData(TQEVersion version) { try (TQEClusterFixture fx = new TQEClusterFixture(version)) { final List users = generateUsers(); + // Subscribe before publishing so the consumer stream is active when messages are + // produced — avoids relying on cursor="" replay, which is unreliable while the + // TQE 3.x broker is still settling. + final Set result = new CopyOnWriteArraySet<>(); Unreliables.retryUntilSuccess( RETRY_TIMEOUT_SECONDS, TimeUnit.SECONDS, () -> { - fx.publisherClient().publish(users, QUEUE_NAME); + fx.consumerClient().subscribe(QUEUE_NAME, result::add); return true; }); - final Set result = new CopyOnWriteArraySet<>(); Unreliables.retryUntilSuccess( RETRY_TIMEOUT_SECONDS, TimeUnit.SECONDS, () -> { - fx.consumerClient().subscribe(QUEUE_NAME, result::add); + fx.publisherClient().publish(users, QUEUE_NAME); return true; }); @@ -60,12 +61,13 @@ void testPublishAndConsumeData(TQEVersion version) { }); } + // Queue config sets deduplication_mode: keep_latest, so duplicate (age, name) payloads collapse + // and the consumer can never receive USERS_COUNT messages. Generate strictly-unique users. private static List generateUsers() { - return Instancio.ofList(User.class) - .size(USERS_COUNT) - .generate( - Select.field(User::getName), g -> g.string().alphaNumeric().allowEmpty().nullable()) - .generate(Select.field(User::getAge), Generators::ints) - .create(); + List users = new ArrayList<>(USERS_COUNT); + for (int i = 0; i < USERS_COUNT; i++) { + users.add(new User(i, "user-" + i)); + } + return users; } } From 8b3a02dd5adce7d0d8ca67563c49b979e27ff977 Mon Sep 17 00:00:00 2001 From: Dmitry Kasimovskiy Date: Fri, 26 Jun 2026 14:11:27 +0300 Subject: [PATCH 2/2] refactor(tests): drop no-op retry wrapper around TQE subscribe MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit subscribe() returns void immediately after kicking off the async stream, so retryUntilSuccess cannot retry a failure — the wrapper is dead weight. Co-Authored-By: Claude Opus 4.7 --- .../integration/tqe/TQEClusterIntegrationTest.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterIntegrationTest.java b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterIntegrationTest.java index 42b1affb..ba9fdfb7 100644 --- a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterIntegrationTest.java +++ b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterIntegrationTest.java @@ -38,13 +38,7 @@ void testPublishAndConsumeData(TQEVersion version) { // produced — avoids relying on cursor="" replay, which is unreliable while the // TQE 3.x broker is still settling. final Set result = new CopyOnWriteArraySet<>(); - Unreliables.retryUntilSuccess( - RETRY_TIMEOUT_SECONDS, - TimeUnit.SECONDS, - () -> { - fx.consumerClient().subscribe(QUEUE_NAME, result::add); - return true; - }); + fx.consumerClient().subscribe(QUEUE_NAME, result::add); Unreliables.retryUntilSuccess( RETRY_TIMEOUT_SECONDS,