From 978724b6ef6e1f3d16493bb0829ca5efbb489f21 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 16 Jun 2026 18:58:47 +0800 Subject: [PATCH] Fix idempotent table cache update handling --- .../table/DataNodeTableCache.java | 37 +++++++++++--- .../table/DataNodeTableCacheTest.java | 48 +++++++++++++++++++ 2 files changed, 79 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java index d8d8f4fe19b95..11e80f5a63e8c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java @@ -179,7 +179,14 @@ public void rollbackUpdateTable(String database, final String tableName, final S // If rename table if (Objects.nonNull(oldName)) { // Equals to commit update - final TsTable oldTable = preUpdateTableMap.get(database).get(oldName).getLeft(); + final TsTable oldTable = getTableFromPreUpdateMap(database, oldName); + if (Objects.isNull(oldTable)) { + LOGGER.info( + "Skip rollback renaming old table {}.{} because it has been handled.", + database, + oldName); + return; + } // Cannot be rolled back, consider: // 1. Fetched a written CN table // 2. CN rollback because of timeout @@ -200,24 +207,42 @@ public void rollbackUpdateTable(String database, final String tableName, final S } private void removeTableFromPreUpdateMap(final String database, final String tableName) { - preUpdateTableMap.compute( + preUpdateTableMap.computeIfPresent( database, (k, v) -> { - if (v == null) { - throw new IllegalStateException(); + final Pair tableVersionPair = v.get(tableName); + if (Objects.nonNull(tableVersionPair)) { + tableVersionPair.setLeft(null); } - v.get(tableName).setLeft(null); return v; }); } + private @Nullable TsTable getTableFromPreUpdateMap( + final String database, final String tableName) { + final Map> tableMap = preUpdateTableMap.get(database); + if (Objects.isNull(tableMap)) { + return null; + } + final Pair tableVersionPair = tableMap.get(tableName); + return Objects.nonNull(tableVersionPair) ? tableVersionPair.getLeft() : null; + } + @Override public void commitUpdateTable( String database, final String tableName, final @Nullable String oldName) { database = PathUtils.unQualifyDatabaseName(database); readWriteLock.writeLock().lock(); try { - final TsTable newTable = preUpdateTableMap.get(database).get(tableName).getLeft(); + final TsTable newTable = getTableFromPreUpdateMap(database, tableName); + if (Objects.isNull(newTable)) { + LOGGER.info( + "Skip commit-update table {}.{} because it has been handled.", database, tableName); + if (Objects.nonNull(oldName)) { + removeTableFromPreUpdateMap(database, oldName); + } + return; + } // Cannot be committed, consider: // 1. Fetched a non-changed CN table // 2. CN is changed diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java index 2d6114363a509..4b33991e3d7ed 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java @@ -19,6 +19,12 @@ package org.apache.iotdb.db.schemaengine.table; +import org.apache.iotdb.commons.schema.table.TsTable; +import org.apache.iotdb.commons.schema.table.column.FieldColumnSchema; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.enums.CompressionType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.junit.Assert; import org.junit.Test; @@ -28,6 +34,8 @@ public class DataNodeTableCacheTest { private static final String DATABASE = "interrupted_fetch_database"; + private static final String TABLE_CACHE_TEST_DATABASE = "root.table_cache_test"; + private static final String TABLE_NAME = "table1"; @Test public void interruptedFetchDoesNotLeakSemaphorePermit() throws Exception { @@ -50,9 +58,49 @@ public void interruptedFetchDoesNotLeakSemaphorePermit() throws Exception { } } + @Test + public void commitUpdateTableIsIdempotent() { + final DataNodeTableCache cache = DataNodeTableCache.getInstance(); + cache.invalid(TABLE_CACHE_TEST_DATABASE); + try { + cache.preUpdateTable(TABLE_CACHE_TEST_DATABASE, createTable(TABLE_NAME), null); + + cache.commitUpdateTable(TABLE_CACHE_TEST_DATABASE, TABLE_NAME, null); + cache.commitUpdateTable(TABLE_CACHE_TEST_DATABASE, TABLE_NAME, null); + + Assert.assertEquals( + TABLE_NAME, cache.getTable(TABLE_CACHE_TEST_DATABASE, TABLE_NAME).getTableName()); + } finally { + cache.invalid(TABLE_CACHE_TEST_DATABASE); + } + } + + @Test + public void commitAfterRollbackUpdateTableIsIgnored() { + final DataNodeTableCache cache = DataNodeTableCache.getInstance(); + cache.invalid(TABLE_CACHE_TEST_DATABASE); + try { + cache.preUpdateTable(TABLE_CACHE_TEST_DATABASE, createTable(TABLE_NAME), null); + + cache.rollbackUpdateTable(TABLE_CACHE_TEST_DATABASE, TABLE_NAME, null); + cache.commitUpdateTable(TABLE_CACHE_TEST_DATABASE, TABLE_NAME, null); + + Assert.assertNull(cache.getTable(TABLE_CACHE_TEST_DATABASE, TABLE_NAME, false)); + } finally { + cache.invalid(TABLE_CACHE_TEST_DATABASE); + } + } + private Semaphore getFetchTableSemaphore(final DataNodeTableCache cache) throws Exception { final Field field = DataNodeTableCache.class.getDeclaredField("fetchTableSemaphore"); field.setAccessible(true); return (Semaphore) field.get(cache); } + + private TsTable createTable(final String tableName) { + final TsTable table = new TsTable(tableName); + table.addColumnSchema( + new FieldColumnSchema("s1", TSDataType.INT32, TSEncoding.RLE, CompressionType.GZIP)); + return table; + } }