Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,14 @@ public void rollbackUpdateTable(String database, final String tableName, final S
// If rename table
if (Objects.nonNull(oldName)) {
// Equals to commit update
final TsTable oldTable = preUpdateTableMap.get(database).get(oldName).getLeft();
final TsTable oldTable = getTableFromPreUpdateMap(database, oldName);
if (Objects.isNull(oldTable)) {
LOGGER.info(
"Skip rollback renaming old table {}.{} because it has been handled.",
database,
oldName);
return;
}
// Cannot be rolled back, consider:
// 1. Fetched a written CN table
// 2. CN rollback because of timeout
Expand All @@ -200,24 +207,42 @@ public void rollbackUpdateTable(String database, final String tableName, final S
}

private void removeTableFromPreUpdateMap(final String database, final String tableName) {
preUpdateTableMap.compute(
preUpdateTableMap.computeIfPresent(
database,
(k, v) -> {
if (v == null) {
throw new IllegalStateException();
final Pair<TsTable, Long> tableVersionPair = v.get(tableName);
if (Objects.nonNull(tableVersionPair)) {
tableVersionPair.setLeft(null);
}
v.get(tableName).setLeft(null);
return v;
});
}

private @Nullable TsTable getTableFromPreUpdateMap(
final String database, final String tableName) {
final Map<String, Pair<TsTable, Long>> tableMap = preUpdateTableMap.get(database);
if (Objects.isNull(tableMap)) {
return null;
}
final Pair<TsTable, Long> tableVersionPair = tableMap.get(tableName);
return Objects.nonNull(tableVersionPair) ? tableVersionPair.getLeft() : null;
}

@Override
public void commitUpdateTable(
String database, final String tableName, final @Nullable String oldName) {
database = PathUtils.unQualifyDatabaseName(database);
readWriteLock.writeLock().lock();
try {
final TsTable newTable = preUpdateTableMap.get(database).get(tableName).getLeft();
final TsTable newTable = getTableFromPreUpdateMap(database, tableName);
if (Objects.isNull(newTable)) {
LOGGER.info(
"Skip commit-update table {}.{} because it has been handled.", database, tableName);
if (Objects.nonNull(oldName)) {
removeTableFromPreUpdateMap(database, oldName);
}
return;
}
// Cannot be committed, consider:
// 1. Fetched a non-changed CN table
// 2. CN is changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@

package org.apache.iotdb.db.schemaengine.table;

import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.table.column.FieldColumnSchema;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -28,6 +34,8 @@
public class DataNodeTableCacheTest {

private static final String DATABASE = "interrupted_fetch_database";
private static final String TABLE_CACHE_TEST_DATABASE = "root.table_cache_test";
private static final String TABLE_NAME = "table1";

@Test
public void interruptedFetchDoesNotLeakSemaphorePermit() throws Exception {
Expand All @@ -50,9 +58,49 @@ public void interruptedFetchDoesNotLeakSemaphorePermit() throws Exception {
}
}

@Test
public void commitUpdateTableIsIdempotent() {
final DataNodeTableCache cache = DataNodeTableCache.getInstance();
cache.invalid(TABLE_CACHE_TEST_DATABASE);
try {
cache.preUpdateTable(TABLE_CACHE_TEST_DATABASE, createTable(TABLE_NAME), null);

cache.commitUpdateTable(TABLE_CACHE_TEST_DATABASE, TABLE_NAME, null);
cache.commitUpdateTable(TABLE_CACHE_TEST_DATABASE, TABLE_NAME, null);

Assert.assertEquals(
TABLE_NAME, cache.getTable(TABLE_CACHE_TEST_DATABASE, TABLE_NAME).getTableName());
} finally {
cache.invalid(TABLE_CACHE_TEST_DATABASE);
}
}

@Test
public void commitAfterRollbackUpdateTableIsIgnored() {
final DataNodeTableCache cache = DataNodeTableCache.getInstance();
cache.invalid(TABLE_CACHE_TEST_DATABASE);
try {
cache.preUpdateTable(TABLE_CACHE_TEST_DATABASE, createTable(TABLE_NAME), null);

cache.rollbackUpdateTable(TABLE_CACHE_TEST_DATABASE, TABLE_NAME, null);
cache.commitUpdateTable(TABLE_CACHE_TEST_DATABASE, TABLE_NAME, null);

Assert.assertNull(cache.getTable(TABLE_CACHE_TEST_DATABASE, TABLE_NAME, false));
} finally {
cache.invalid(TABLE_CACHE_TEST_DATABASE);
}
}

private Semaphore getFetchTableSemaphore(final DataNodeTableCache cache) throws Exception {
final Field field = DataNodeTableCache.class.getDeclaredField("fetchTableSemaphore");
field.setAccessible(true);
return (Semaphore) field.get(cache);
}

private TsTable createTable(final String tableName) {
final TsTable table = new TsTable(tableName);
table.addColumnSchema(
new FieldColumnSchema("s1", TSDataType.INT32, TSEncoding.RLE, CompressionType.GZIP));
return table;
}
}
Loading