diff --git a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterIntegrationTest.java b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterIntegrationTest.java index 74b17f42..ba9fdfb7 100644 --- a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterIntegrationTest.java +++ b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterIntegrationTest.java @@ -5,15 +5,13 @@ package org.testcontainers.containers.integration.tqe; +import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; -import org.instancio.Instancio; -import org.instancio.Select; -import org.instancio.generators.Generators; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.params.ParameterizedTest; @@ -36,20 +34,17 @@ void testPublishAndConsumeData(TQEVersion version) { try (TQEClusterFixture fx = new TQEClusterFixture(version)) { final List users = generateUsers(); - Unreliables.retryUntilSuccess( - RETRY_TIMEOUT_SECONDS, - TimeUnit.SECONDS, - () -> { - fx.publisherClient().publish(users, QUEUE_NAME); - return true; - }); - + // Subscribe before publishing so the consumer stream is active when messages are + // produced — avoids relying on cursor="" replay, which is unreliable while the + // TQE 3.x broker is still settling. final Set result = new CopyOnWriteArraySet<>(); + fx.consumerClient().subscribe(QUEUE_NAME, result::add); + Unreliables.retryUntilSuccess( RETRY_TIMEOUT_SECONDS, TimeUnit.SECONDS, () -> { - fx.consumerClient().subscribe(QUEUE_NAME, result::add); + fx.publisherClient().publish(users, QUEUE_NAME); return true; }); @@ -60,12 +55,13 @@ void testPublishAndConsumeData(TQEVersion version) { }); } + // Queue config sets deduplication_mode: keep_latest, so duplicate (age, name) payloads collapse + // and the consumer can never receive USERS_COUNT messages. Generate strictly-unique users. private static List generateUsers() { - return Instancio.ofList(User.class) - .size(USERS_COUNT) - .generate( - Select.field(User::getName), g -> g.string().alphaNumeric().allowEmpty().nullable()) - .generate(Select.field(User::getAge), Generators::ints) - .create(); + List users = new ArrayList<>(USERS_COUNT); + for (int i = 0; i < USERS_COUNT; i++) { + users.add(new User(i, "user-" + i)); + } + return users; } }