From 85be70dea8bcb1bc80f4bacd48c8c23e6ca835bc Mon Sep 17 00:00:00 2001 From: yangtao555 Date: Wed, 15 Apr 2026 20:54:39 +0800 Subject: [PATCH 1/2] [fix](mv) Invalidate rewrite cache on constraint changes --- .../java/org/apache/doris/catalog/MTMV.java | 119 ++++++++++++------ .../org/apache/doris/mtmv/BaseTableInfo.java | 11 ++ .../java/org/apache/doris/mtmv/MTMVUtil.java | 67 +++++++++- .../plans/commands/AddConstraintCommand.java | 47 ++++--- .../plans/commands/DropConstraintCommand.java | 16 ++- .../org/apache/doris/persist/EditLog.java | 48 ++++--- .../constraint/ConstraintPersistTest.java | 91 ++++++++++++++ .../doris/nereids/mv/MTMVCacheTest.java | 61 +++++++++ ...test_constraint_change_rewrite_mtmv.groovy | 112 +++++++++++++++++ 9 files changed, 483 insertions(+), 89 deletions(-) create mode 100644 regression-test/suites/mtmv_p0/test_constraint_change_rewrite_mtmv.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java index 22f1cfdaef9ff6..b67894b2be9a98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java @@ -92,6 +92,8 @@ public class MTMV extends OlapTable { private MTMVCache cacheWithGuard; // Cache without SessionVarGuardExpr: used when query session variables match MV creation variables private MTMVCache cacheWithoutGuard; + // Increased every time rewrite cache is invalidated to prevent publishing stale in-flight cache builds. + private transient long rewriteCacheGeneration; private long schemaChangeVersion; @SerializedName(value = "sv") private Map sessionVariables; @@ -212,9 +214,16 @@ public boolean addTaskResult(MTMVTask task, MTMVRelation relation, MTMVCache mtmvCacheWithGuard = null; MTMVCache mtmvCacheWithoutGuard = null; boolean needUpdateCache = false; + long cacheGeneration = -1; if (task.getStatus() == TaskStatus.SUCCESS && !Env.isCheckpointThread() && !Config.enable_check_compatibility_mode) { needUpdateCache = true; + readMvLock(); + try { + cacheGeneration = rewriteCacheGeneration; + } finally { + readMvUnlock(); + } try { // The replay thread may not have initialized the catalog yet to avoid getting stuck due // to connection issues such as S3, so it is directly set to null @@ -222,12 +231,8 @@ public boolean addTaskResult(MTMVTask task, MTMVRelation relation, ConnectContext currentContext = ConnectContext.get(); // shouldn't do this while holding mvWriteLock // TODO: these two cache compute share something same, can be simplified in future - mtmvCacheWithGuard = MTMVCache.from(this.getQuerySql(), - MTMVPlanUtil.createMTMVContext(this, MTMVPlanUtil.DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE), - true, true, currentContext, true); - mtmvCacheWithoutGuard = MTMVCache.from(this.getQuerySql(), - MTMVPlanUtil.createMTMVContext(this, MTMVPlanUtil.DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE), - true, true, currentContext, false); + mtmvCacheWithGuard = createRewriteCache(currentContext, true, true); + mtmvCacheWithoutGuard = createRewriteCache(currentContext, true, false); } } catch (Throwable e) { mtmvCacheWithGuard = null; @@ -251,10 +256,12 @@ public boolean addTaskResult(MTMVTask task, MTMVRelation relation, this.status.setRefreshState(MTMVRefreshState.SUCCESS); this.relation = relation; if (needUpdateCache) { - // Initialize cacheWithGuard, cacheWithoutGuard will be lazily generated when needed - this.cacheWithGuard = mtmvCacheWithGuard; - // Clear the other cache to ensure consistency - this.cacheWithoutGuard = mtmvCacheWithoutGuard; + if (cacheGeneration == rewriteCacheGeneration) { + // Initialize cacheWithGuard, cacheWithoutGuard will be lazily generated when needed + this.cacheWithGuard = mtmvCacheWithGuard; + // Clear the other cache to ensure consistency + this.cacheWithoutGuard = mtmvCacheWithoutGuard; + } } } else { this.status.setRefreshState(MTMVRefreshState.FAIL); @@ -370,7 +377,8 @@ public Set getQueryRewriteConsistencyRelaxedTables() { } /** - * Called when in query, Should use one connection context in query + * Called when in query; should use one connection context for the query. + * Returns the rewrite cache matching the current session variables, rebuilding it on demand. */ public MTMVCache getOrGenerateCache(ConnectContext connectionContext) throws org.apache.doris.nereids.exceptions.AnalysisException { @@ -388,35 +396,38 @@ public MTMVCache getOrGenerateCache(ConnectContext connectionContext) throws boolean sessionVarsMatch = SessionVarGuardRewriter.checkSessionVariablesMatch( currentSessionVars, this.sessionVariables); - // Select appropriate cache based on session variable match - readMvLock(); - try { - if (sessionVarsMatch && cacheWithoutGuard != null) { - return cacheWithoutGuard; - } - if (!sessionVarsMatch && cacheWithGuard != null) { - return cacheWithGuard; + while (true) { + long cacheGeneration; + // Select appropriate cache based on session variable match + readMvLock(); + try { + MTMVCache cache = getCache(sessionVarsMatch); + if (cache != null) { + return cache; + } + cacheGeneration = rewriteCacheGeneration; + } finally { + readMvUnlock(); } - } finally { - readMvUnlock(); - } - // Generate cache if not exists - // Concurrent situations may result in duplicate cache generation, - // but we tolerate this in order to prevent nested use of readLock and write MvLock for the table - MTMVCache mtmvCache = MTMVCache.from(this.getQuerySql(), - MTMVPlanUtil.createMTMVContext(this, MTMVPlanUtil.DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE), - true, false, connectionContext, !sessionVarsMatch); - writeMvLock(); - try { - if (sessionVarsMatch) { - this.cacheWithoutGuard = mtmvCache; - } else { - this.cacheWithGuard = mtmvCache; + // Generate cache if not exists + // Concurrent situations may result in duplicate cache generation, + // but we tolerate this in order to prevent nested use of readLock and write MvLock for the table + MTMVCache mtmvCache = createRewriteCache(connectionContext, false, !sessionVarsMatch); + writeMvLock(); + try { + MTMVCache cache = getCache(sessionVarsMatch); + if (cache != null) { + return cache; + } + if (cacheGeneration != rewriteCacheGeneration) { + continue; + } + setCache(sessionVarsMatch, mtmvCache); + return mtmvCache; + } finally { + writeMvUnlock(); } - return mtmvCache; - } finally { - writeMvUnlock(); } } @@ -446,6 +457,28 @@ public long getSchemaChangeVersion() { } } + /** + * Invalidate rewrite cache after metadata changes such as ADD/DROP CONSTRAINT. + * Bumping the generation prevents any cache built before this call from being published later. + */ + public void invalidateRewriteCache() { + writeMvLock(); + try { + rewriteCacheGeneration++; + cacheWithGuard = null; + cacheWithoutGuard = null; + } finally { + writeMvUnlock(); + } + } + + protected MTMVCache createRewriteCache(ConnectContext currentContext, boolean needLock, + boolean addSessionVarGuard) { + return MTMVCache.from(this.getQuerySql(), + MTMVPlanUtil.createMTMVContext(this, MTMVPlanUtil.DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE), + true, needLock, currentContext, addSessionVarGuard); + } + /** * generateMvPartitionDescs * @@ -552,6 +585,18 @@ public void writeMvUnlock() { this.mvRwLock.writeLock().unlock(); } + private MTMVCache getCache(boolean sessionVarsMatch) { + return sessionVarsMatch ? cacheWithoutGuard : cacheWithGuard; + } + + private void setCache(boolean sessionVarsMatch, MTMVCache cache) { + if (sessionVarsMatch) { + this.cacheWithoutGuard = cache; + } else { + this.cacheWithGuard = cache; + } + } + // toString() is not easy to find where to call the method public String toInfoString() { final StringBuilder sb = new StringBuilder("MTMV{"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java index 9e80b5ed6e4a40..dfd44dfc7b2d1f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.info.TableNameInfo; import org.apache.doris.common.AnalysisException; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.CatalogMgr; @@ -73,6 +74,16 @@ public BaseTableInfo(TableIf table) { this.ctlName = catalog.getName(); } + public BaseTableInfo(TableNameInfo tableNameInfo) { + java.util.Objects.requireNonNull(tableNameInfo, "tableNameInfo is null"); + java.util.Objects.requireNonNull(tableNameInfo.getTbl(), "tableName is null"); + java.util.Objects.requireNonNull(tableNameInfo.getDb(), "dbName is null"); + java.util.Objects.requireNonNull(tableNameInfo.getCtl(), "catalogName is null"); + this.tableName = tableNameInfo.getTbl(); + this.dbName = tableNameInfo.getDb(); + this.ctlName = tableNameInfo.getCtl(); + } + // for replay MTMV, can not use `table.getDatabase();`,because database not added to catalog public BaseTableInfo(OlapTable table, long dbId) { java.util.Objects.requireNonNull(table, "table is null"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java index 5fb0da7469bb02..324aaf7cee3fd0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java @@ -23,6 +23,10 @@ import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.catalog.constraint.Constraint; +import org.apache.doris.catalog.constraint.ForeignKeyConstraint; +import org.apache.doris.catalog.constraint.PrimaryKeyConstraint; +import org.apache.doris.catalog.info.TableNameInfo; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; @@ -42,13 +46,21 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.TreeMap; public class MTMVUtil { + private static final Logger LOG = LogManager.getLogger(MTMVUtil.class); + /** * get Table by BaseTableInfo * @@ -98,6 +110,54 @@ public static MTMV getMTMV(long dbId, long mtmvId) throws DdlException, MetaNotF return (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW); } + public static List getDependentMtmvsByBaseTables(List baseTableInfos) + throws AnalysisException { + Set uniqueBases = new LinkedHashSet<>(baseTableInfos); + Map mtmvById = new TreeMap<>(); + for (BaseTableInfo base : uniqueBases) { + for (BaseTableInfo mtmvInfo + : Env.getCurrentEnv().getMtmvService().getRelationManager() + .getMtmvsByBaseTable(base)) { + try { + MTMV mtmv = MTMVUtil.getMTMV(mtmvInfo); + mtmvById.put(mtmv.getId(), mtmv); + } catch (AnalysisException e) { + LOG.warn("Skip stale dependent MTMV relation {} for base table {}", + mtmvInfo, base, e); + } + } + } + return new ArrayList<>(mtmvById.values()); + } + + public static List getConstraintRelatedBaseTables( + TableNameInfo tableNameInfo, Constraint constraint) { + List baseTables = new ArrayList<>(); + baseTables.add(new BaseTableInfo(tableNameInfo)); + if (constraint instanceof ForeignKeyConstraint) { + TableNameInfo referencedTableInfo = ((ForeignKeyConstraint) constraint).getReferencedTableName(); + if (referencedTableInfo != null) { + baseTables.add(new BaseTableInfo(referencedTableInfo)); + } + } else if (constraint instanceof PrimaryKeyConstraint) { + for (TableNameInfo fkTableInfo : ((PrimaryKeyConstraint) constraint).getForeignTableInfos()) { + baseTables.add(new BaseTableInfo(fkTableInfo)); + } + } + return baseTables; + } + + public static List getDependentMtmvsByConstraint(TableNameInfo tableNameInfo, Constraint constraint) + throws AnalysisException { + return getDependentMtmvsByBaseTables(getConstraintRelatedBaseTables(tableNameInfo, constraint)); + } + + public static void invalidateRewriteCaches(List dependentMtmvs) { + for (MTMV dependentMtmv : dependentMtmvs) { + dependentMtmv.invalidateRewriteCache(); + } + } + public static TableIf getTable(List names) throws AnalysisException { if (names == null || names.size() != 3) { throw new AnalysisException("size of names need 3, but names is:" + names); @@ -116,12 +176,7 @@ public static TableIf getTable(List names) throws AnalysisException { */ public static boolean mtmvContainsExternalTable(MTMV mtmv) { Set baseTables = mtmv.getRelation().getBaseTablesOneLevelAndFromView(); - for (BaseTableInfo baseTableInfo : baseTables) { - if (!baseTableInfo.isInternalTable()) { - return true; - } - } - return false; + return baseTables.stream().anyMatch(baseTableInfo -> !baseTableInfo.isInternalTable()); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AddConstraintCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AddConstraintCommand.java index 05b1149871d886..40de4a6f48fb83 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AddConstraintCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AddConstraintCommand.java @@ -18,6 +18,7 @@ package org.apache.doris.nereids.trees.plans.commands; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.constraint.ForeignKeyConstraint; import org.apache.doris.catalog.constraint.PrimaryKeyConstraint; @@ -25,6 +26,7 @@ import org.apache.doris.catalog.info.TableNameInfo; import org.apache.doris.common.Pair; import org.apache.doris.info.TableNameInfoUtils; +import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.properties.PhysicalProperties; @@ -44,6 +46,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.List; import java.util.Set; /** @@ -73,33 +76,43 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { table.getDatabase().getCatalog(), table.getDatabase(), table); ImmutableList columns = columnsAndTable.first; + Pair, TableNameInfo> referencedColumnsAndTable = null; if (constraint.isForeignKey()) { - Pair, TableIf> refColumnsAndTable - = extractColumnsAndTable(ctx, constraint.toReferenceProject()); + Pair, TableIf> refColumnsAndTable = + extractColumnsAndTable(ctx, constraint.toReferenceProject()); TableIf refTable = refColumnsAndTable.second; TableNameInfo refTableInfo = TableNameInfoUtils.fromCatalogDb( - refTable.getDatabase().getCatalog(), - refTable.getDatabase(), refTable); - ForeignKeyConstraint fkConstraint = new ForeignKeyConstraint( - name, columns, refTableInfo, refColumnsAndTable.first); - Env.getCurrentEnv().getConstraintManager().addConstraint( - tableNameInfo, name, fkConstraint, false); + refTable.getDatabase().getCatalog(), refTable.getDatabase(), refTable); + referencedColumnsAndTable = Pair.of(refColumnsAndTable.first, refTableInfo); + } + if (constraint.isForeignKey()) { + Preconditions.checkState(referencedColumnsAndTable != null); + addConstraintAndInvalidate(tableNameInfo, + new ForeignKeyConstraint(name, columns, + referencedColumnsAndTable.second, referencedColumnsAndTable.first)); } else if (constraint.isPrimaryKey()) { - PrimaryKeyConstraint pkConstraint = new PrimaryKeyConstraint( - name, ImmutableSet.copyOf(columns)); - Env.getCurrentEnv().getConstraintManager().addConstraint( - tableNameInfo, name, pkConstraint, false); + addConstraintAndInvalidate( + tableNameInfo, new PrimaryKeyConstraint(name, ImmutableSet.copyOf(columns))); } else if (constraint.isUnique()) { - UniqueConstraint uniqueConstraint = new UniqueConstraint( - name, ImmutableSet.copyOf(columns)); - Env.getCurrentEnv().getConstraintManager().addConstraint( - tableNameInfo, name, uniqueConstraint, false); + addConstraintAndInvalidate( + tableNameInfo, new UniqueConstraint(name, ImmutableSet.copyOf(columns))); + } else { + throw new AnalysisException("Unsupported constraint type: " + constraint); } } + private void addConstraintAndInvalidate( + TableNameInfo tableNameInfo, org.apache.doris.catalog.constraint.Constraint constraint) + throws Exception { + List dependentMtmvs = MTMVUtil.getDependentMtmvsByConstraint(tableNameInfo, constraint); + Env.getCurrentEnv().getConstraintManager().addConstraint(tableNameInfo, name, constraint, false); + MTMVUtil.invalidateRewriteCaches(dependentMtmvs); + } + private Pair, TableIf> extractColumnsAndTable(ConnectContext ctx, LogicalPlan plan) { NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext()); - Plan analyzedPlan = planner.planWithLock(plan, PhysicalProperties.ANY, ExplainLevel.ANALYZED_PLAN); + Plan analyzedPlan = planner.planWithLock( + plan, PhysicalProperties.ANY, ExplainLevel.ANALYZED_PLAN); Set logicalCatalogRelationSet = analyzedPlan .collect(LogicalCatalogRelation.class::isInstance); if (logicalCatalogRelationSet.size() != 1) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropConstraintCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropConstraintCommand.java index cdef6dce07c2b9..6ac600b007181b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropConstraintCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropConstraintCommand.java @@ -18,9 +18,12 @@ package org.apache.doris.nereids.trees.plans.commands; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.constraint.Constraint; import org.apache.doris.catalog.info.TableNameInfo; import org.apache.doris.info.TableNameInfoUtils; +import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.analyzer.UnboundRelation; import org.apache.doris.nereids.exceptions.AnalysisException; @@ -72,8 +75,14 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { + "falling back to name-based lookup: {}", name, e.getMessage()); tableNameInfo = extractTableNameFromPlan(ctx); } - Env.getCurrentEnv().getConstraintManager().dropConstraint( - tableNameInfo, name, false); + Constraint constraint = Env.getCurrentEnv().getConstraintManager().getConstraint(tableNameInfo, name); + if (constraint == null) { + throw new AnalysisException( + String.format("Unknown constraint %s on table %s.", name, tableNameInfo)); + } + List dependentMtmvs = MTMVUtil.getDependentMtmvsByConstraint(tableNameInfo, constraint); + Env.getCurrentEnv().getConstraintManager().dropConstraint(tableNameInfo, name, false); + MTMVUtil.invalidateRewriteCaches(dependentMtmvs); } private TableNameInfo extractTableNameFromPlan(ConnectContext ctx) { @@ -90,7 +99,8 @@ private TableNameInfo extractTableNameFromPlan(ConnectContext ctx) { // Fill in default catalog/db from connect context if not specified if (parts.size() == 1) { return new TableNameInfo(ctl, db, parts.get(0)); - } else if (parts.size() == 2) { + } + if (parts.size() == 2) { return new TableNameInfo(ctl, parts.get(0), parts.get(1)); } return new TableNameInfo(parts); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index a6ee2b743442b8..5724fd603e946e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -40,6 +40,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Function; import org.apache.doris.catalog.FunctionSearchDesc; +import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.constraint.Constraint; import org.apache.doris.catalog.info.TableNameInfo; @@ -91,6 +92,7 @@ import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.meta.MetaContext; import org.apache.doris.metric.MetricRepo; +import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.mysql.privilege.UserPropertyInfo; import org.apache.doris.plugin.PluginInfo; import org.apache.doris.policy.DropPolicyLog; @@ -1200,38 +1202,32 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) { } case OperationType.OP_ADD_CONSTRAINT: { final AlterConstraintLog log = (AlterConstraintLog) journal.getData(); - try { - TableNameInfo tni = log.getTableNameInfo(); - Constraint constraint = log.getConstraint(); - if (tni == null) { - LOG.warn("Failed to replay add constraint {}: " - + "table name could not be resolved", - constraint.getName()); - break; - } - env.getConstraintManager().addConstraint( - tni, constraint.getName(), constraint, true); - } catch (Exception e) { - LOG.warn("Failed to replay add constraint", e); + TableNameInfo tni = log.getTableNameInfo(); + Constraint constraint = log.getConstraint(); + if (tni == null) { + LOG.warn("Skip replaying add constraint {} because table name could not be resolved", + constraint.getName()); + break; } + List dependentMtmvs = MTMVUtil.getDependentMtmvsByConstraint(tni, constraint); + env.getConstraintManager().addConstraint( + tni, constraint.getName(), constraint, true); + MTMVUtil.invalidateRewriteCaches(dependentMtmvs); break; } case OperationType.OP_DROP_CONSTRAINT: { final AlterConstraintLog log = (AlterConstraintLog) journal.getData(); - try { - TableNameInfo tni = log.getTableNameInfo(); - Constraint constraint = log.getConstraint(); - if (tni == null) { - LOG.warn("Failed to replay drop constraint {}: " - + "table name could not be resolved", - constraint.getName()); - break; - } - env.getConstraintManager().dropConstraint( - tni, constraint.getName(), true); - } catch (Exception e) { - LOG.warn("Failed to replay drop constraint", e); + TableNameInfo tni = log.getTableNameInfo(); + Constraint constraint = log.getConstraint(); + if (tni == null) { + LOG.warn("Skip replaying drop constraint {} because table name could not be resolved", + constraint.getName()); + break; } + List dependentMtmvs = MTMVUtil.getDependentMtmvsByConstraint(tni, constraint); + env.getConstraintManager().dropConstraint( + tni, constraint.getName(), true); + MTMVUtil.invalidateRewriteCaches(dependentMtmvs); break; } case OperationType.OP_ALTER_USER: { diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/constraint/ConstraintPersistTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/constraint/ConstraintPersistTest.java index 16aba7f94f1dfd..a7d2a00e8e003f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/constraint/ConstraintPersistTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/constraint/ConstraintPersistTest.java @@ -19,13 +19,17 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.info.TableNameInfo; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.datasource.test.TestExternalCatalog; import org.apache.doris.journal.JournalEntity; +import org.apache.doris.mtmv.BaseTableInfo; +import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.nereids.util.PlanPatternMatchSupported; import org.apache.doris.nereids.util.RelationUtil; import org.apache.doris.persist.AlterConstraintLog; @@ -37,6 +41,8 @@ import com.google.common.collect.Maps; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -324,6 +330,91 @@ void backwardCompatAlterConstraintLogTest() throws Exception { Assertions.assertEquals("pk_compat", log.getConstraint().getName()); } + @Test + void liveConstraintShouldExposeDependentMtmvLookupFailureTest() throws Exception { + TableIf tableIf = RelationUtil.getTable( + RelationUtil.getQualifierName(connectContext, Lists.newArrayList("test", "t1")), + connectContext.getEnv(), Optional.empty()); + TableNameInfo tableNameInfo = new TableNameInfo(tableIf.getNameWithFullQualifiers()); + String pkName = "pk_live_lookup_failure"; + ConstraintManager mgr = Env.getCurrentEnv().getConstraintManager(); + + try (MockedStatic mtmvUtilMock = Mockito.mockStatic(MTMVUtil.class, Mockito.CALLS_REAL_METHODS)) { + mtmvUtilMock.when(() -> MTMVUtil.getDependentMtmvsByBaseTables(Mockito.anyList())) + .thenThrow(new AnalysisException("unexpected relation lookup failure")); + + Assertions.assertThrows(Exception.class, () -> addConstraint( + "alter table t1 add constraint " + pkName + " primary key (k1)")); + Assertions.assertNull(mgr.getConstraint(tableNameInfo, pkName)); + } + + addConstraint("alter table t1 add constraint " + pkName + " primary key (k1)"); + Assertions.assertNotNull(mgr.getConstraint(tableNameInfo, pkName)); + + try (MockedStatic mtmvUtilMock = Mockito.mockStatic(MTMVUtil.class, Mockito.CALLS_REAL_METHODS)) { + mtmvUtilMock.when(() -> MTMVUtil.getDependentMtmvsByBaseTables(Mockito.anyList())) + .thenThrow(new AnalysisException("unexpected relation lookup failure")); + + Assertions.assertThrows(Exception.class, () -> dropConstraint( + "alter table t1 drop constraint " + pkName)); + Assertions.assertNotNull(mgr.getConstraint(tableNameInfo, pkName)); + } finally { + if (mgr.getConstraint(tableNameInfo, pkName) != null) { + mgr.dropConstraint(tableNameInfo, pkName, true); + } + } + } + + @Test + void invalidateRewriteCachesShouldExposeFailureTest() { + MTMV dependentMtmv = Mockito.mock(MTMV.class); + Mockito.doThrow(new RuntimeException("invalidate failed")) + .when(dependentMtmv).invalidateRewriteCache(); + + Assertions.assertThrows(RuntimeException.class, + () -> MTMVUtil.invalidateRewriteCaches(Lists.newArrayList(dependentMtmv))); + } + + @Test + void replayConstraintShouldInvalidateDependentMtmvCacheTest() throws Exception { + TableIf tableIf = RelationUtil.getTable( + RelationUtil.getQualifierName(connectContext, Lists.newArrayList("test", "t1")), + connectContext.getEnv(), Optional.empty()); + TableNameInfo tableNameInfo = new TableNameInfo(tableIf.getNameWithFullQualifiers()); + PrimaryKeyConstraint pk = new PrimaryKeyConstraint("pk_replay_cache", + com.google.common.collect.ImmutableSet.of("k1")); + MTMV dependentMtmv = Mockito.mock(MTMV.class); + ConstraintManager mgr = Env.getCurrentEnv().getConstraintManager(); + + try (MockedStatic mtmvUtilMock = Mockito.mockStatic(MTMVUtil.class, Mockito.CALLS_REAL_METHODS)) { + mtmvUtilMock.when(() -> MTMVUtil.getDependentMtmvsByBaseTables(Mockito.anyList())) + .thenAnswer(invocation -> { + List baseTableInfos = invocation.getArgument(0); + Assertions.assertEquals(1, baseTableInfos.size()); + Assertions.assertEquals(tableNameInfo.getCtl(), baseTableInfos.get(0).getCtlName()); + Assertions.assertEquals(tableNameInfo.getDb(), baseTableInfos.get(0).getDbName()); + Assertions.assertEquals(tableNameInfo.getTbl(), baseTableInfos.get(0).getTableName()); + return Lists.newArrayList(dependentMtmv); + }); + + JournalEntity addJournal = new JournalEntity(); + addJournal.setData(new AlterConstraintLog(pk, tableNameInfo)); + addJournal.setOpCode(OperationType.OP_ADD_CONSTRAINT); + EditLog.loadJournal(Env.getCurrentEnv(), 0L, addJournal); + Mockito.verify(dependentMtmv).invalidateRewriteCache(); + + JournalEntity dropJournal = new JournalEntity(); + dropJournal.setData(new AlterConstraintLog(pk, tableNameInfo)); + dropJournal.setOpCode(OperationType.OP_DROP_CONSTRAINT); + EditLog.loadJournal(Env.getCurrentEnv(), 0L, dropJournal); + Mockito.verify(dependentMtmv, Mockito.times(2)).invalidateRewriteCache(); + } finally { + if (mgr.getConstraint(tableNameInfo, pk.getName()) != null) { + mgr.dropConstraint(tableNameInfo, pk.getName(), true); + } + } + } + public static class RefreshCatalogProvider implements TestExternalCatalog.TestCatalogProvider { public static final Map>> MOCKED_META; diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/MTMVCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/MTMVCacheTest.java index 629b988b984b05..08c545911926c6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/MTMVCacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/MTMVCacheTest.java @@ -27,13 +27,21 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; import org.apache.doris.nereids.util.PlanChecker; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SqlModeHelper; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * Relevant test case about mtmv cache. @@ -80,6 +88,15 @@ void testMTMVCacheIsCorrect() throws Exception { Assertions.assertTrue(aggregate.get().getOutputExpressions().stream() .noneMatch(expr -> expr.containsType(SessionVarGuardExpr.class))); + mtmv.invalidateRewriteCache(); + MTMVCache cacheAfterInvalidate = mtmv.getOrGenerateCache(connectContext); + Assertions.assertNotSame(cacheWithoutGuard, cacheAfterInvalidate); + Optional> aggregateAfterInvalidate = + cacheAfterInvalidate.getAllRulesRewrittenPlanAndStructInfo().key() + .collectFirst(LogicalAggregate.class::isInstance); + Assertions.assertTrue(aggregateAfterInvalidate.isPresent()); + + // set guard check session var connectContext.getSessionVariable().setSqlMode(SqlModeHelper.MODE_NO_UNSIGNED_SUBTRACTION); CascadesContext c2 = createCascadesContext( "select T1.id, sum(score) from T1 group by T1.id;", @@ -105,4 +122,48 @@ void testMTMVCacheIsCorrect() throws Exception { .anyMatch(expr -> expr.containsType(SessionVarGuardExpr.class))); dropMvByNereids("drop materialized view mv1"); } + + @Test + void testInvalidateShouldNotPublishInFlightRewriteCache() throws Exception { + ControlledCacheMTMV mtmv = new ControlledCacheMTMV(); + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + Future cacheFuture = executor.submit(() -> mtmv.getOrGenerateCache(connectContext)); + Assertions.assertTrue(mtmv.firstBuildStarted.await(5, TimeUnit.SECONDS)); + + mtmv.invalidateRewriteCache(); + mtmv.releaseFirstBuild.countDown(); + + MTMVCache generatedCache = cacheFuture.get(5, TimeUnit.SECONDS); + Assertions.assertNotSame(mtmv.firstCache, generatedCache); + Assertions.assertSame(mtmv.secondCache, generatedCache); + Assertions.assertSame(generatedCache, mtmv.getOrGenerateCache(connectContext)); + } finally { + executor.shutdownNow(); + } + } + + private static class ControlledCacheMTMV extends MTMV { + private final CountDownLatch firstBuildStarted = new CountDownLatch(1); + private final CountDownLatch releaseFirstBuild = new CountDownLatch(1); + private final AtomicInteger buildCount = new AtomicInteger(); + private final MTMVCache firstCache = new MTMVCache(null, null, null, Collections.emptyList()); + private final MTMVCache secondCache = new MTMVCache(null, null, null, Collections.emptyList()); + + @Override + protected MTMVCache createRewriteCache(ConnectContext currentContext, boolean needLock, + boolean addSessionVarGuard) { + if (buildCount.incrementAndGet() == 1) { + firstBuildStarted.countDown(); + try { + Assertions.assertTrue(releaseFirstBuild.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + return firstCache; + } + return secondCache; + } + } } diff --git a/regression-test/suites/mtmv_p0/test_constraint_change_rewrite_mtmv.groovy b/regression-test/suites/mtmv_p0/test_constraint_change_rewrite_mtmv.groovy new file mode 100644 index 00000000000000..9ae0679ca4b118 --- /dev/null +++ b/regression-test/suites/mtmv_p0/test_constraint_change_rewrite_mtmv.groovy @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_constraint_change_rewrite_mtmv", "mtmv") { + String db = context.config.getDbNameByFile(context.file) + String p = "test_constraint_change_rewrite_mtmv" + String tOrders = "${p}_orders" + String tLineitem = "${p}_lineitem" + String mvName = "${p}_mv" + + sql """use ${db}""" + sql """SET enable_nereids_planner=true""" + sql """SET enable_fallback_to_original_planner=false""" + sql """SET enable_materialized_view_rewrite=true""" + sql """SET enable_nereids_timeout = false""" + + sql """DROP MATERIALIZED VIEW IF EXISTS ${mvName}""" + sql """DROP TABLE IF EXISTS ${tOrders}""" + sql """DROP TABLE IF EXISTS ${tLineitem}""" + + sql """CREATE TABLE `${tOrders}` ( + `o_orderkey` BIGINT NOT NULL, + `o_custkey` INT NULL, + `o_comment` VARCHAR(79) NULL, + `o_orderdate` DATE NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`o_orderkey`) + auto partition by range (date_trunc(`o_orderdate`, 'day')) () + DISTRIBUTED BY HASH(`o_orderkey`) BUCKETS 1 + PROPERTIES ("replication_allocation" = "tag.location.default: 1");""" + + sql """CREATE TABLE `${tLineitem}` ( + `l_orderkey` BIGINT NOT NULL, + `l_linenumber` INT NULL, + `l_comment` VARCHAR(44) NULL, + `l_shipdate` DATE NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`l_orderkey`) + auto partition by range (date_trunc(`l_shipdate`, 'day')) () + DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 1 + PROPERTIES ("replication_allocation" = "tag.location.default: 1");""" + + sql """ + INSERT INTO ${tOrders} ( + o_orderkey, o_custkey, o_comment, o_orderdate + ) VALUES + (1001, 3001, 'order1001', '2024-01-15'), + (1002, 3002, 'order1002', '2024-02-20'), + (1003, 3003, 'order1003', '2024-03-05'); + """ + + sql """ + INSERT INTO ${tLineitem} ( + l_orderkey, l_linenumber, l_comment, l_shipdate + ) VALUES + (1001, 1, 'order1001_line1', '2024-01-18'), + (1002, 1, 'order1002_line1', '2024-02-22'), + (1003, 1, 'order1003_line1', '2024-03-08'); + """ + + sql """ANALYZE TABLE ${tLineitem} WITH SYNC;""" + sql """ANALYZE TABLE ${tOrders} WITH SYNC;""" + sql """ALTER TABLE ${tLineitem} MODIFY COLUMN l_comment SET STATS ('row_count'='3');""" + sql """ALTER TABLE ${tOrders} MODIFY COLUMN o_comment SET STATS ('row_count'='3');""" + + def mvSql = """ + SELECT + o_orderkey, o_custkey, o_comment, + l_orderkey, l_comment + FROM ${tOrders} + INNER JOIN ${tLineitem} + ON o_orderkey = l_orderkey + """ + create_async_mv(db, mvName, mvSql) + + def querySql = """SELECT o_orderkey, o_custkey, o_comment FROM ${tOrders}""" + // Phase 1: No PK/FK yet; the lineitem side of the MV join cannot be eliminated by PK-FK rules, rewrite should fail. + mv_rewrite_fail(querySql, mvName) + + // Phase 2: Add PK on lineitem and FK on orders referencing lineitem; query can match the MV safely, rewrite should succeed. + sql """ALTER TABLE ${tLineitem} ADD CONSTRAINT ${p}_lineitem_pk + PRIMARY KEY (l_orderkey)""" + sql """ALTER TABLE ${tOrders} ADD CONSTRAINT ${p}_orders_fk + FOREIGN KEY (o_orderkey) REFERENCES ${tLineitem}(l_orderkey)""" + mv_rewrite_success_without_check_chosen(querySql, mvName) + + // Phase 3: Drop the FK only; join elimination no longer has FK support, rewrite should fail again. + sql """ALTER TABLE ${tOrders} DROP CONSTRAINT ${p}_orders_fk""" + mv_rewrite_fail(querySql, mvName) + + // Phase 4: Re-add FK so rewrite succeeds again; then drop PK on lineitem (referenced side loses primary key), rewrite should fail; + sql """ALTER TABLE ${tOrders} ADD CONSTRAINT ${p}_orders_fk + FOREIGN KEY (o_orderkey) REFERENCES ${tLineitem}(l_orderkey)""" + mv_rewrite_success_without_check_chosen(querySql, mvName) + + sql """ALTER TABLE ${tLineitem} DROP CONSTRAINT ${p}_lineitem_pk""" + mv_rewrite_fail(querySql, mvName) +} From ebb4ed78c368a249b3c9bf071e33733386b4a5e0 Mon Sep 17 00:00:00 2001 From: yangtao555 Date: Fri, 17 Apr 2026 19:48:03 +0800 Subject: [PATCH 2/2] Avoid blocking constraint changes on MTMV cache invalidation failures --- .../java/org/apache/doris/mtmv/MTMVUtil.java | 9 ++- .../plans/commands/AddConstraintCommand.java | 7 +-- .../plans/commands/DropConstraintCommand.java | 3 +- .../org/apache/doris/persist/EditLog.java | 8 ++- .../constraint/ConstraintPersistTest.java | 61 ++++++++++++++++++- 5 files changed, 75 insertions(+), 13 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java index 324aaf7cee3fd0..db198546a5b9b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java @@ -152,9 +152,14 @@ public static List getDependentMtmvsByConstraint(TableNameInfo tableNameIn return getDependentMtmvsByBaseTables(getConstraintRelatedBaseTables(tableNameInfo, constraint)); } - public static void invalidateRewriteCaches(List dependentMtmvs) { + public static void invalidateRewriteCachesBestEffort(List dependentMtmvs, String reason) { for (MTMV dependentMtmv : dependentMtmvs) { - dependentMtmv.invalidateRewriteCache(); + try { + dependentMtmv.invalidateRewriteCache(); + } catch (Exception e) { + LOG.warn("Failed to invalidate rewrite cache for dependent MTMV {}: {}", + dependentMtmv.getName(), reason, e); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AddConstraintCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AddConstraintCommand.java index 40de4a6f48fb83..7f9d2c468f2fb1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AddConstraintCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AddConstraintCommand.java @@ -43,8 +43,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.util.List; import java.util.Set; @@ -54,8 +52,6 @@ */ public class AddConstraintCommand extends Command implements ForwardWithSync { - public static final Logger LOG = LogManager.getLogger(AddConstraintCommand.class); - private final String name; private final Constraint constraint; @@ -106,7 +102,8 @@ private void addConstraintAndInvalidate( throws Exception { List dependentMtmvs = MTMVUtil.getDependentMtmvsByConstraint(tableNameInfo, constraint); Env.getCurrentEnv().getConstraintManager().addConstraint(tableNameInfo, name, constraint, false); - MTMVUtil.invalidateRewriteCaches(dependentMtmvs); + MTMVUtil.invalidateRewriteCachesBestEffort(dependentMtmvs, + String.format("after add constraint %s on table %s", constraint.getName(), tableNameInfo)); } private Pair, TableIf> extractColumnsAndTable(ConnectContext ctx, LogicalPlan plan) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropConstraintCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropConstraintCommand.java index 6ac600b007181b..fb8d506ef97c75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropConstraintCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropConstraintCommand.java @@ -82,7 +82,8 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { } List dependentMtmvs = MTMVUtil.getDependentMtmvsByConstraint(tableNameInfo, constraint); Env.getCurrentEnv().getConstraintManager().dropConstraint(tableNameInfo, name, false); - MTMVUtil.invalidateRewriteCaches(dependentMtmvs); + MTMVUtil.invalidateRewriteCachesBestEffort(dependentMtmvs, + String.format("after drop constraint %s on table %s", constraint.getName(), tableNameInfo)); } private TableNameInfo extractTableNameFromPlan(ConnectContext ctx) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 5724fd603e946e..5ae2af5419f3c9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -1212,7 +1212,9 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) { List dependentMtmvs = MTMVUtil.getDependentMtmvsByConstraint(tni, constraint); env.getConstraintManager().addConstraint( tni, constraint.getName(), constraint, true); - MTMVUtil.invalidateRewriteCaches(dependentMtmvs); + MTMVUtil.invalidateRewriteCachesBestEffort(dependentMtmvs, + String.format("when replaying add constraint %s on table %s", + constraint.getName(), tni)); break; } case OperationType.OP_DROP_CONSTRAINT: { @@ -1227,7 +1229,9 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) { List dependentMtmvs = MTMVUtil.getDependentMtmvsByConstraint(tni, constraint); env.getConstraintManager().dropConstraint( tni, constraint.getName(), true); - MTMVUtil.invalidateRewriteCaches(dependentMtmvs); + MTMVUtil.invalidateRewriteCachesBestEffort(dependentMtmvs, + String.format("when replaying drop constraint %s on table %s", + constraint.getName(), tni)); break; } case OperationType.OP_ALTER_USER: { diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/constraint/ConstraintPersistTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/constraint/ConstraintPersistTest.java index a7d2a00e8e003f..7a40236d1334b8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/constraint/ConstraintPersistTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/constraint/ConstraintPersistTest.java @@ -366,13 +366,33 @@ void liveConstraintShouldExposeDependentMtmvLookupFailureTest() throws Exception } @Test - void invalidateRewriteCachesShouldExposeFailureTest() { + void liveConstraintShouldIgnoreDependentMtmvInvalidateFailureTest() throws Exception { + TableIf tableIf = RelationUtil.getTable( + RelationUtil.getQualifierName(connectContext, Lists.newArrayList("test", "t1")), + connectContext.getEnv(), Optional.empty()); + TableNameInfo tableNameInfo = new TableNameInfo(tableIf.getNameWithFullQualifiers()); + String pkName = "pk_live_invalidate_failure"; + ConstraintManager mgr = Env.getCurrentEnv().getConstraintManager(); MTMV dependentMtmv = Mockito.mock(MTMV.class); Mockito.doThrow(new RuntimeException("invalidate failed")) .when(dependentMtmv).invalidateRewriteCache(); - Assertions.assertThrows(RuntimeException.class, - () -> MTMVUtil.invalidateRewriteCaches(Lists.newArrayList(dependentMtmv))); + try (MockedStatic mtmvUtilMock = Mockito.mockStatic(MTMVUtil.class, Mockito.CALLS_REAL_METHODS)) { + mtmvUtilMock.when(() -> MTMVUtil.getDependentMtmvsByBaseTables(Mockito.anyList())) + .thenReturn(Lists.newArrayList(dependentMtmv)); + + Assertions.assertDoesNotThrow(() -> addConstraint( + "alter table t1 add constraint " + pkName + " primary key (k1)")); + Assertions.assertNotNull(mgr.getConstraint(tableNameInfo, pkName)); + + Assertions.assertDoesNotThrow(() -> dropConstraint( + "alter table t1 drop constraint " + pkName)); + Assertions.assertNull(mgr.getConstraint(tableNameInfo, pkName)); + } finally { + if (mgr.getConstraint(tableNameInfo, pkName) != null) { + mgr.dropConstraint(tableNameInfo, pkName, true); + } + } } @Test @@ -415,6 +435,41 @@ void replayConstraintShouldInvalidateDependentMtmvCacheTest() throws Exception { } } + @Test + void replayConstraintShouldIgnoreDependentMtmvInvalidateFailureTest() throws Exception { + TableIf tableIf = RelationUtil.getTable( + RelationUtil.getQualifierName(connectContext, Lists.newArrayList("test", "t1")), + connectContext.getEnv(), Optional.empty()); + TableNameInfo tableNameInfo = new TableNameInfo(tableIf.getNameWithFullQualifiers()); + PrimaryKeyConstraint pk = new PrimaryKeyConstraint("pk_replay_invalidate_failure", + com.google.common.collect.ImmutableSet.of("k1")); + MTMV dependentMtmv = Mockito.mock(MTMV.class); + Mockito.doThrow(new RuntimeException("invalidate failed")) + .when(dependentMtmv).invalidateRewriteCache(); + ConstraintManager mgr = Env.getCurrentEnv().getConstraintManager(); + + try (MockedStatic mtmvUtilMock = Mockito.mockStatic(MTMVUtil.class, Mockito.CALLS_REAL_METHODS)) { + mtmvUtilMock.when(() -> MTMVUtil.getDependentMtmvsByBaseTables(Mockito.anyList())) + .thenReturn(Lists.newArrayList(dependentMtmv)); + + JournalEntity addJournal = new JournalEntity(); + addJournal.setData(new AlterConstraintLog(pk, tableNameInfo)); + addJournal.setOpCode(OperationType.OP_ADD_CONSTRAINT); + Assertions.assertDoesNotThrow(() -> EditLog.loadJournal(Env.getCurrentEnv(), 0L, addJournal)); + Assertions.assertNotNull(mgr.getConstraint(tableNameInfo, pk.getName())); + + JournalEntity dropJournal = new JournalEntity(); + dropJournal.setData(new AlterConstraintLog(pk, tableNameInfo)); + dropJournal.setOpCode(OperationType.OP_DROP_CONSTRAINT); + Assertions.assertDoesNotThrow(() -> EditLog.loadJournal(Env.getCurrentEnv(), 0L, dropJournal)); + Assertions.assertNull(mgr.getConstraint(tableNameInfo, pk.getName())); + } finally { + if (mgr.getConstraint(tableNameInfo, pk.getName()) != null) { + mgr.dropConstraint(tableNameInfo, pk.getName(), true); + } + } + } + public static class RefreshCatalogProvider implements TestExternalCatalog.TestCatalogProvider { public static final Map>> MOCKED_META;