Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@

package org.testcontainers.containers.integration.tqe;

import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;

import org.instancio.Instancio;
import org.instancio.Select;
import org.instancio.generators.Generators;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -36,20 +34,17 @@ void testPublishAndConsumeData(TQEVersion version) {
try (TQEClusterFixture fx = new TQEClusterFixture(version)) {
final List<User> users = generateUsers();

Unreliables.retryUntilSuccess(
RETRY_TIMEOUT_SECONDS,
TimeUnit.SECONDS,
() -> {
fx.publisherClient().publish(users, QUEUE_NAME);
return true;
});

// Subscribe before publishing so the consumer stream is active when messages are
// produced — avoids relying on cursor="" replay, which is unreliable while the
// TQE 3.x broker is still settling.
final Set<User> result = new CopyOnWriteArraySet<>();
fx.consumerClient().subscribe(QUEUE_NAME, result::add);

Unreliables.retryUntilSuccess(
RETRY_TIMEOUT_SECONDS,
TimeUnit.SECONDS,
() -> {
fx.consumerClient().subscribe(QUEUE_NAME, result::add);
fx.publisherClient().publish(users, QUEUE_NAME);
return true;
});

Expand All @@ -60,12 +55,13 @@ void testPublishAndConsumeData(TQEVersion version) {
});
}

// Queue config sets deduplication_mode: keep_latest, so duplicate (age, name) payloads collapse
// and the consumer can never receive USERS_COUNT messages. Generate strictly-unique users.
private static List<User> generateUsers() {
return Instancio.ofList(User.class)
.size(USERS_COUNT)
.generate(
Select.field(User::getName), g -> g.string().alphaNumeric().allowEmpty().nullable())
.generate(Select.field(User::getAge), Generators::ints)
.create();
List<User> users = new ArrayList<>(USERS_COUNT);
for (int i = 0; i < USERS_COUNT; i++) {
users.add(new User(i, "user-" + i));
}
return users;
}
}
Loading