Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 82 additions & 37 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public class MTMV extends OlapTable {
private MTMVCache cacheWithGuard;
// Cache without SessionVarGuardExpr: used when query session variables match MV creation variables
private MTMVCache cacheWithoutGuard;
// Increased every time rewrite cache is invalidated to prevent publishing stale in-flight cache builds.
private transient long rewriteCacheGeneration;
private long schemaChangeVersion;
@SerializedName(value = "sv")
private Map<String, String> sessionVariables;
Expand Down Expand Up @@ -212,22 +214,25 @@ public boolean addTaskResult(MTMVTask task, MTMVRelation relation,
MTMVCache mtmvCacheWithGuard = null;
MTMVCache mtmvCacheWithoutGuard = null;
boolean needUpdateCache = false;
long cacheGeneration = -1;
if (task.getStatus() == TaskStatus.SUCCESS && !Env.isCheckpointThread()
&& !Config.enable_check_compatibility_mode) {
needUpdateCache = true;
readMvLock();
try {
cacheGeneration = rewriteCacheGeneration;
} finally {
readMvUnlock();
}
try {
// The replay thread may not have initialized the catalog yet to avoid getting stuck due
// to connection issues such as S3, so it is directly set to null
if (!isReplay) {
ConnectContext currentContext = ConnectContext.get();
// shouldn't do this while holding mvWriteLock
// TODO: these two cache compute share something same, can be simplified in future
mtmvCacheWithGuard = MTMVCache.from(this.getQuerySql(),
MTMVPlanUtil.createMTMVContext(this, MTMVPlanUtil.DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE),
true, true, currentContext, true);
mtmvCacheWithoutGuard = MTMVCache.from(this.getQuerySql(),
MTMVPlanUtil.createMTMVContext(this, MTMVPlanUtil.DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE),
true, true, currentContext, false);
mtmvCacheWithGuard = createRewriteCache(currentContext, true, true);
mtmvCacheWithoutGuard = createRewriteCache(currentContext, true, false);
}
} catch (Throwable e) {
mtmvCacheWithGuard = null;
Expand All @@ -251,10 +256,12 @@ public boolean addTaskResult(MTMVTask task, MTMVRelation relation,
this.status.setRefreshState(MTMVRefreshState.SUCCESS);
this.relation = relation;
if (needUpdateCache) {
// Initialize cacheWithGuard, cacheWithoutGuard will be lazily generated when needed
this.cacheWithGuard = mtmvCacheWithGuard;
// Clear the other cache to ensure consistency
this.cacheWithoutGuard = mtmvCacheWithoutGuard;
if (cacheGeneration == rewriteCacheGeneration) {
// Initialize cacheWithGuard, cacheWithoutGuard will be lazily generated when needed
this.cacheWithGuard = mtmvCacheWithGuard;
// Clear the other cache to ensure consistency
this.cacheWithoutGuard = mtmvCacheWithoutGuard;
}
}
} else {
this.status.setRefreshState(MTMVRefreshState.FAIL);
Expand Down Expand Up @@ -370,7 +377,8 @@ public Set<TableNameInfo> getQueryRewriteConsistencyRelaxedTables() {
}

/**
* Called when in query, Should use one connection context in query
* Called when in query; should use one connection context for the query.
* Returns the rewrite cache matching the current session variables, rebuilding it on demand.
*/
public MTMVCache getOrGenerateCache(ConnectContext connectionContext) throws
org.apache.doris.nereids.exceptions.AnalysisException {
Expand All @@ -388,35 +396,38 @@ public MTMVCache getOrGenerateCache(ConnectContext connectionContext) throws
boolean sessionVarsMatch = SessionVarGuardRewriter.checkSessionVariablesMatch(
currentSessionVars, this.sessionVariables);

// Select appropriate cache based on session variable match
readMvLock();
try {
if (sessionVarsMatch && cacheWithoutGuard != null) {
return cacheWithoutGuard;
}
if (!sessionVarsMatch && cacheWithGuard != null) {
return cacheWithGuard;
while (true) {
long cacheGeneration;
// Select appropriate cache based on session variable match
readMvLock();
try {
MTMVCache cache = getCache(sessionVarsMatch);
if (cache != null) {
return cache;
}
cacheGeneration = rewriteCacheGeneration;
} finally {
readMvUnlock();
}
} finally {
readMvUnlock();
}

// Generate cache if not exists
// Concurrent situations may result in duplicate cache generation,
// but we tolerate this in order to prevent nested use of readLock and write MvLock for the table
MTMVCache mtmvCache = MTMVCache.from(this.getQuerySql(),
MTMVPlanUtil.createMTMVContext(this, MTMVPlanUtil.DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE),
true, false, connectionContext, !sessionVarsMatch);
writeMvLock();
try {
if (sessionVarsMatch) {
this.cacheWithoutGuard = mtmvCache;
} else {
this.cacheWithGuard = mtmvCache;
// Generate cache if not exists
// Concurrent situations may result in duplicate cache generation,
// but we tolerate this in order to prevent nested use of readLock and write MvLock for the table
MTMVCache mtmvCache = createRewriteCache(connectionContext, false, !sessionVarsMatch);
writeMvLock();
try {
MTMVCache cache = getCache(sessionVarsMatch);
if (cache != null) {
return cache;
}
if (cacheGeneration != rewriteCacheGeneration) {
continue;
}
setCache(sessionVarsMatch, mtmvCache);
return mtmvCache;
} finally {
writeMvUnlock();
}
return mtmvCache;
} finally {
writeMvUnlock();
}
}

Expand Down Expand Up @@ -446,6 +457,28 @@ public long getSchemaChangeVersion() {
}
}

/**
* Invalidate rewrite cache after metadata changes such as ADD/DROP CONSTRAINT.
* Bumping the generation prevents any cache built before this call from being published later.
*/
public void invalidateRewriteCache() {
writeMvLock();
try {
rewriteCacheGeneration++;
cacheWithGuard = null;
cacheWithoutGuard = null;
} finally {
writeMvUnlock();
}
}

protected MTMVCache createRewriteCache(ConnectContext currentContext, boolean needLock,
boolean addSessionVarGuard) {
return MTMVCache.from(this.getQuerySql(),
MTMVPlanUtil.createMTMVContext(this, MTMVPlanUtil.DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE),
true, needLock, currentContext, addSessionVarGuard);
}

/**
* generateMvPartitionDescs
*
Expand Down Expand Up @@ -552,6 +585,18 @@ public void writeMvUnlock() {
this.mvRwLock.writeLock().unlock();
}

private MTMVCache getCache(boolean sessionVarsMatch) {
return sessionVarsMatch ? cacheWithoutGuard : cacheWithGuard;
}

private void setCache(boolean sessionVarsMatch, MTMVCache cache) {
if (sessionVarsMatch) {
this.cacheWithoutGuard = cache;
} else {
this.cacheWithGuard = cache;
}
}

// toString() is not easy to find where to call the method
public String toInfoString() {
final StringBuilder sb = new StringBuilder("MTMV{");
Expand Down
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.info.TableNameInfo;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.CatalogMgr;
Expand Down Expand Up @@ -73,6 +74,16 @@ public BaseTableInfo(TableIf table) {
this.ctlName = catalog.getName();
}

public BaseTableInfo(TableNameInfo tableNameInfo) {
java.util.Objects.requireNonNull(tableNameInfo, "tableNameInfo is null");
java.util.Objects.requireNonNull(tableNameInfo.getTbl(), "tableName is null");
java.util.Objects.requireNonNull(tableNameInfo.getDb(), "dbName is null");
java.util.Objects.requireNonNull(tableNameInfo.getCtl(), "catalogName is null");
this.tableName = tableNameInfo.getTbl();
this.dbName = tableNameInfo.getDb();
this.ctlName = tableNameInfo.getCtl();
}

// for replay MTMV, can not use `table.getDatabase();`,because database not added to catalog
public BaseTableInfo(OlapTable table, long dbId) {
java.util.Objects.requireNonNull(table, "table is null");
Expand Down
72 changes: 66 additions & 6 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.constraint.Constraint;
import org.apache.doris.catalog.constraint.ForeignKeyConstraint;
import org.apache.doris.catalog.constraint.PrimaryKeyConstraint;
import org.apache.doris.catalog.info.TableNameInfo;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
Expand All @@ -42,13 +46,21 @@

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;

public class MTMVUtil {

private static final Logger LOG = LogManager.getLogger(MTMVUtil.class);

/**
* get Table by BaseTableInfo
*
Expand Down Expand Up @@ -98,6 +110,59 @@ public static MTMV getMTMV(long dbId, long mtmvId) throws DdlException, MetaNotF
return (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW);
}

public static List<MTMV> getDependentMtmvsByBaseTables(List<BaseTableInfo> baseTableInfos)
throws AnalysisException {
Set<BaseTableInfo> uniqueBases = new LinkedHashSet<>(baseTableInfos);
Map<Long, MTMV> mtmvById = new TreeMap<>();
for (BaseTableInfo base : uniqueBases) {
for (BaseTableInfo mtmvInfo
: Env.getCurrentEnv().getMtmvService().getRelationManager()
.getMtmvsByBaseTable(base)) {
try {
MTMV mtmv = MTMVUtil.getMTMV(mtmvInfo);
mtmvById.put(mtmv.getId(), mtmv);
} catch (AnalysisException e) {
LOG.warn("Skip stale dependent MTMV relation {} for base table {}",
mtmvInfo, base, e);
}
}
}
return new ArrayList<>(mtmvById.values());
}

public static List<BaseTableInfo> getConstraintRelatedBaseTables(
TableNameInfo tableNameInfo, Constraint constraint) {
List<BaseTableInfo> baseTables = new ArrayList<>();
baseTables.add(new BaseTableInfo(tableNameInfo));
if (constraint instanceof ForeignKeyConstraint) {
TableNameInfo referencedTableInfo = ((ForeignKeyConstraint) constraint).getReferencedTableName();
if (referencedTableInfo != null) {
baseTables.add(new BaseTableInfo(referencedTableInfo));
}
} else if (constraint instanceof PrimaryKeyConstraint) {
for (TableNameInfo fkTableInfo : ((PrimaryKeyConstraint) constraint).getForeignTableInfos()) {
baseTables.add(new BaseTableInfo(fkTableInfo));
}
}
return baseTables;
}

public static List<MTMV> getDependentMtmvsByConstraint(TableNameInfo tableNameInfo, Constraint constraint)
throws AnalysisException {
return getDependentMtmvsByBaseTables(getConstraintRelatedBaseTables(tableNameInfo, constraint));
}

public static void invalidateRewriteCachesBestEffort(List<MTMV> dependentMtmvs, String reason) {
for (MTMV dependentMtmv : dependentMtmvs) {
try {
dependentMtmv.invalidateRewriteCache();
} catch (Exception e) {
LOG.warn("Failed to invalidate rewrite cache for dependent MTMV {}: {}",
dependentMtmv.getName(), reason, e);
}
}
}

public static TableIf getTable(List<String> names) throws AnalysisException {
if (names == null || names.size() != 3) {
throw new AnalysisException("size of names need 3, but names is:" + names);
Expand All @@ -116,12 +181,7 @@ public static TableIf getTable(List<String> names) throws AnalysisException {
*/
public static boolean mtmvContainsExternalTable(MTMV mtmv) {
Set<BaseTableInfo> baseTables = mtmv.getRelation().getBaseTablesOneLevelAndFromView();
for (BaseTableInfo baseTableInfo : baseTables) {
if (!baseTableInfo.isInternalTable()) {
return true;
}
}
return false;
return baseTables.stream().anyMatch(baseTableInfo -> !baseTableInfo.isInternalTable());
}

/**
Expand Down
Loading
Loading