Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.read.query.dataset.ResultSet;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.junit.Assert;

import java.time.Duration;
Expand All @@ -44,6 +46,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

final class ConsensusSubscriptionTableITSupport {
Expand All @@ -52,6 +55,7 @@ final class ConsensusSubscriptionTableITSupport {

private static final AtomicInteger IDENTIFIER = new AtomicInteger(0);
private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofSeconds(1);
private static final Duration DEFAULT_DRAIN_TIMEOUT = Duration.ofMinutes(2);
private static final int QUIET_ROUNDS_AFTER_DATA = 3;
private static final int QUIET_ROUNDS_WITHOUT_DATA = 8;

Expand Down Expand Up @@ -116,7 +120,7 @@ static void createConsensusTopic(
final String topicName,
final String databasePattern,
final String tablePattern,
final String columnPattern)
final String columnFilter)
throws Exception {
final String host = EnvFactory.getEnv().getIP();
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
Expand All @@ -131,13 +135,28 @@ static void createConsensusTopic(
config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE);
config.put(TopicConstant.DATABASE_KEY, databasePattern);
config.put(TopicConstant.TABLE_KEY, tablePattern);
if (columnPattern != null) {
config.put(TopicConstant.COLUMN_KEY, columnPattern);
if (columnFilter != null) {
config.put(TopicConstant.COLUMN_FILTER_KEY, columnFilter);
}
session.createTopic(topicName, config);
}
}

static void alterConsensusTopicColumnFilter(final String topicName, final String columnFilter)
throws Exception {
final String host = EnvFactory.getEnv().getIP();
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());

try (final ISubscriptionTableSession session =
new SubscriptionTableSessionBuilder().host(host).port(port).build()) {
session.open();

final Properties config = new Properties();
config.put(TopicConstant.COLUMN_FILTER_KEY, columnFilter);
session.alterTopic(topicName, config);
}
}

static SubscriptionTablePullConsumer createConsumer(
final String consumerId, final String consumerGroupId) throws Exception {
final SubscriptionTablePullConsumer consumer =
Expand Down Expand Up @@ -191,6 +210,49 @@ static Set<String> insertRows(
return rowKeys;
}

static Set<String> insertRows(
final String database,
final String tableName,
final long startTimestampInclusive,
final int rowCount,
final boolean includeS2,
final boolean includeS3,
final boolean flush)
throws Exception {
final Set<String> rowKeys = new LinkedHashSet<>();

try (final ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) {
session.executeNonQueryStatement("use " + database);
for (int row = 0; row < rowCount; row++) {
final long timestamp = startTimestampInclusive + row;
final StringBuilder columns = new StringBuilder("tag1, s1");
final StringBuilder values =
new StringBuilder(
String.format(
Locale.ROOT, "'%s', %d", tableName + "_tag_" + timestamp, timestamp * 10L));
if (includeS2) {
columns.append(", s2");
values.append(String.format(Locale.ROOT, ", %.1f", timestamp + 0.5d));
}
if (includeS3) {
columns.append(", s3");
values.append(timestamp % 2 == 0 ? ", true" : ", false");
}
columns.append(", time");
values.append(", ").append(timestamp);
session.executeNonQueryStatement(
String.format(
Locale.ROOT, "insert into %s(%s) values (%s)", tableName, columns, values));
rowKeys.add(rowKey(database, tableName, timestamp));
}
if (flush) {
session.executeNonQueryStatement("flush");
}
}

return rowKeys;
}

static ConsumedRecords pollAndCommitUntilAtLeast(
final SubscriptionTablePullConsumer consumer,
final int expectedUniqueRows,
Expand All @@ -207,28 +269,34 @@ static ConsumedRecords pollAndCommitUntilAtLeast(
final Duration pollTimeout)
throws Exception {
final ConsumedRecords consumed = new ConsumedRecords();
int emptyRounds = 0;

for (int round = 0; round < maxPollRounds; round++) {
final List<SubscriptionMessage> messages = consumer.poll(pollTimeout);
if (messages.isEmpty()) {
emptyRounds++;
if (consumed.getUniqueRowCount() >= expectedUniqueRows
&& emptyRounds >= QUIET_ROUNDS_AFTER_DATA) {
break;
}
if (consumed.getUniqueRowCount() == 0
&& expectedUniqueRows == 0
&& emptyRounds >= QUIET_ROUNDS_WITHOUT_DATA) {
break;
}
continue;
}
final AtomicInteger emptyRounds = new AtomicInteger(0);
awaitDrain(maxPollRounds, pollTimeout)
.untilAsserted(
() -> {
pollAndCommitOnce(consumer, pollTimeout, consumed, emptyRounds);
Assert.assertTrue(
atLeastTimeoutMessage(expectedUniqueRows, consumed),
hasDrainedAtLeast(consumed, expectedUniqueRows, emptyRounds.get()));
});

emptyRounds = 0;
consumed.merge(consumeMessages(messages));
consumer.commitSync(messages);
}
return consumed;
}

static ConsumedRecords pollAndCommitUntilContains(
final SubscriptionTablePullConsumer consumer,
final Set<String> expectedRowKeys,
final int maxPollRounds)
throws Exception {
final ConsumedRecords consumed = new ConsumedRecords();
final AtomicInteger emptyRounds = new AtomicInteger(0);
awaitDrain(maxPollRounds, DEFAULT_POLL_TIMEOUT)
.untilAsserted(
() -> {
pollAndCommitOnce(consumer, DEFAULT_POLL_TIMEOUT, consumed, emptyRounds);
Assert.assertTrue(
containsTimeoutMessage(expectedRowKeys, consumed),
hasDrainedExpectedKeys(consumed, expectedRowKeys, emptyRounds.get()));
});

return consumed;
}
Expand All @@ -251,24 +319,15 @@ static ConsumedRecords pollWithInfoAndCommitUntilAtLeast(
final Duration pollTimeout)
throws Exception {
final ConsumedRecords consumed = new ConsumedRecords();
int emptyRounds = 0;

for (int round = 0; round < maxPollRounds; round++) {
final PollResult pollResult = consumer.pollWithInfo(topicNames, pollTimeout.toMillis());
final List<SubscriptionMessage> messages = pollResult.getMessages();
if (messages.isEmpty()) {
emptyRounds++;
if (consumed.getUniqueRowCount() >= expectedUniqueRows
&& emptyRounds >= QUIET_ROUNDS_AFTER_DATA) {
break;
}
continue;
}

emptyRounds = 0;
consumed.merge(consumeMessages(messages));
consumer.commitSync(messages);
}
final AtomicInteger emptyRounds = new AtomicInteger(0);
awaitDrain(maxPollRounds, pollTimeout)
.untilAsserted(
() -> {
pollWithInfoAndCommitOnce(consumer, topicNames, pollTimeout, consumed, emptyRounds);
Assert.assertTrue(
atLeastTimeoutMessage(expectedUniqueRows, consumed),
hasDrainedAtLeast(consumed, expectedUniqueRows, emptyRounds.get()));
});

return consumed;
}
Expand All @@ -278,7 +337,8 @@ static void assertExactRowKeys(
Assert.assertTrue(
"Unexpected duplicate row keys: " + consumed.getDuplicateRowKeys(),
consumed.getDuplicateRowKeys().isEmpty());
Assert.assertEquals(expectedRowKeys, consumed.getRowKeys());
Assert.assertEquals(
rowKeyDiffMessage(expectedRowKeys, consumed), expectedRowKeys, consumed.getRowKeys());
Assert.assertEquals(expectedRowKeys.size(), consumed.getRowCount());
}

Expand Down Expand Up @@ -384,6 +444,110 @@ private static ConsumedRecords consumeMessages(final List<SubscriptionMessage> m
return consumed;
}

private static void pollAndCommitOnce(
final SubscriptionTablePullConsumer consumer,
final Duration pollTimeout,
final ConsumedRecords consumed,
final AtomicInteger emptyRounds)
throws Exception {
final List<SubscriptionMessage> messages = consumer.poll(pollTimeout);
if (messages.isEmpty()) {
emptyRounds.incrementAndGet();
return;
}

emptyRounds.set(0);
consumed.merge(consumeMessages(messages));
consumer.commitSync(messages);
}

private static void pollWithInfoAndCommitOnce(
final SubscriptionTablePullConsumer consumer,
final Set<String> topicNames,
final Duration pollTimeout,
final ConsumedRecords consumed,
final AtomicInteger emptyRounds)
throws Exception {
final PollResult pollResult = consumer.pollWithInfo(topicNames, pollTimeout.toMillis());
final List<SubscriptionMessage> messages = pollResult.getMessages();
if (messages.isEmpty()) {
emptyRounds.incrementAndGet();
return;
}

emptyRounds.set(0);
consumed.merge(consumeMessages(messages));
consumer.commitSync(messages);
}

private static ConditionFactory awaitDrain(
final int legacyMaxPollRounds, final Duration pollTimeout) {
final Duration drainTimeout = drainTimeout(legacyMaxPollRounds, pollTimeout);
return Awaitility.await()
.pollInSameThread()
.pollDelay(0, TimeUnit.MILLISECONDS)
.pollInterval(1, TimeUnit.MILLISECONDS)
.atMost(drainTimeout.toMillis(), TimeUnit.MILLISECONDS);
}

private static Duration drainTimeout(final int legacyMaxPollRounds, final Duration pollTimeout) {
final long legacyTimeoutMillis =
Math.max(0L, legacyMaxPollRounds) * Math.max(1L, pollTimeout.toMillis());
final Duration legacyTimeout = Duration.ofMillis(legacyTimeoutMillis);
return legacyTimeout.compareTo(DEFAULT_DRAIN_TIMEOUT) > 0
? legacyTimeout
: DEFAULT_DRAIN_TIMEOUT;
}

private static boolean hasDrainedAtLeast(
final ConsumedRecords consumed, final int expectedUniqueRows, final int emptyRounds) {
if (expectedUniqueRows == 0) {
return consumed.getUniqueRowCount() == 0 && emptyRounds >= QUIET_ROUNDS_WITHOUT_DATA;
}
return consumed.getUniqueRowCount() >= expectedUniqueRows
&& emptyRounds >= QUIET_ROUNDS_AFTER_DATA;
}

private static boolean hasDrainedExpectedKeys(
final ConsumedRecords consumed, final Set<String> expectedRowKeys, final int emptyRounds) {
return consumed.getRowKeys().containsAll(expectedRowKeys)
&& emptyRounds >= QUIET_ROUNDS_AFTER_DATA;
}

private static String atLeastTimeoutMessage(
final int expectedUniqueRows, final ConsumedRecords consumed) {
return "Expected at least "
+ expectedUniqueRows
+ " unique row keys before the subscription drain timeout, but collected "
+ consumed.getUniqueRowCount()
+ ". Consumed records: "
+ consumed;
}

private static String containsTimeoutMessage(
final Set<String> expectedRowKeys, final ConsumedRecords consumed) {
return "Expected row keys were not fully collected before the subscription drain timeout. "
+ rowKeyDiffMessage(expectedRowKeys, consumed);
}

private static String rowKeyDiffMessage(
final Set<String> expectedRowKeys, final ConsumedRecords consumed) {
final Set<String> missingRowKeys = new LinkedHashSet<>(expectedRowKeys);
missingRowKeys.removeAll(consumed.getRowKeys());
final Set<String> unexpectedRowKeys = new LinkedHashSet<>(consumed.getRowKeys());
unexpectedRowKeys.removeAll(expectedRowKeys);
return "expected="
+ expectedRowKeys
+ ", actual="
+ consumed.getRowKeys()
+ ", missing="
+ missingRowKeys
+ ", unexpected="
+ unexpectedRowKeys
+ ", consumed="
+ consumed;
}

static final class TestIdentifiers {

private final String database;
Expand Down Expand Up @@ -425,6 +589,14 @@ String database(final String suffix) {
String topic(final String suffix) {
return topic + "_" + suffix;
}

String consumerGroup(final String suffix) {
return consumerGroupId + "_" + suffix;
}

String consumer(final String suffix) {
return consumerId + "_" + suffix;
}
}

static final class ConsumedRecords {
Expand Down
Loading
Loading