diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index fa4c13bc24af3..9c4fbd719a966 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -459,6 +459,23 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString messageParameters = Map("name" -> "IDENTIFIER", "expr" -> p.identifierExpr.sql) ) + case c: CacheTableAsSelect if c.tempViewName.resolved => + // The parser builds `tempViewName` as either a `Literal[StringType]` (for direct + // identifiers and `IDENTIFIER('literal')`) or an `ExpressionWithUnresolvedIdentifier` + // that resolves to such a Literal. Validate the post-analysis shape so any future + // construction path that violates the invariant fails loudly here, not deep inside + // execution via `tempViewNameString`. The `resolved` guard ensures that when the + // IDENTIFIER expression itself failed to resolve (e.g. `IDENTIFIER()`), + // we fall through to the catch-all `LogicalPlan` case so the user sees the proper + // `UNRESOLVED_COLUMN` error rather than an internal error. + c.tempViewName match { + case Literal(value, _: StringType) if value != null => // OK + case other => + throw SparkException.internalError( + "CacheTableAsSelect.tempViewName must be a non-null string literal after " + + s"analysis, but got: ${other.sql}") + } + case operator: LogicalPlan => operator transformExpressionsDown { case hof: HigherOrderFunction if hof.arguments.exists { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala index 7150c81ad64ec..cfa6f33588062 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala @@ -19,8 +19,9 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression, VariableReference} -import org.apache.spark.sql.catalyst.plans.logical.{CreateView, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{CreateView, InsertIntoStatement, LogicalPlan, V2WriteCommand} import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.errors.QueryCompilationErrors @@ -70,6 +71,39 @@ class ResolveIdentifierClause(earlyBatches: Seq[RuleExecutor[LogicalPlan]#Batch] executor.execute(p.planBuilder.apply( IdentifierResolution.evalIdentifierExpr(p.identifierExpr), p.children)) + // `InsertIntoStatement.table` and `V2WriteCommand.table` are non-child LogicalPlan slots + // (`child = query`), so the standard `resolveOperatorsUp` traversal never visits + // placeholders inside them. Materialize them explicitly. Only `InsertIntoStatement` and + // `OverwriteByExpression` carry a parse-time placeholder today, but matching the + // `V2WriteCommand` trait keeps the rule consistent across the family. + case i: InsertIntoStatement if i.table.isInstanceOf[PlanWithUnresolvedIdentifier] => + val p = i.table.asInstanceOf[PlanWithUnresolvedIdentifier] + if (p.identifierExpr.resolved && p.childrenResolved) { + if (referredTempVars.isDefined) { + referredTempVars.get ++= collectTemporaryVariablesInLogicalPlan(p) + } + i.copy(table = executor.execute(p.planBuilder.apply( + IdentifierResolution.evalIdentifierExpr(p.identifierExpr), p.children))) + } else { + i + } + case w: V2WriteCommand if w.table.isInstanceOf[PlanWithUnresolvedIdentifier] => + val p = w.table.asInstanceOf[PlanWithUnresolvedIdentifier] + if (p.identifierExpr.resolved && p.childrenResolved) { + if (referredTempVars.isDefined) { + referredTempVars.get ++= collectTemporaryVariablesInLogicalPlan(p) + } + executor.execute(p.planBuilder.apply( + IdentifierResolution.evalIdentifierExpr(p.identifierExpr), p.children)) match { + case nr: NamedRelation => w.withNewTable(nr) + case other => + throw SparkException.internalError( + "PlanWithUnresolvedIdentifier in V2WriteCommand.table must materialize " + + s"into a NamedRelation, but got: ${other.getClass.getName}") + } + } else { + w + } case other => other.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_IDENTIFIER)) { case e: ExpressionWithUnresolvedIdentifier if e.identifierExpr.resolved => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala index bf9acb775ce10..31c835986e20d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression, SubqueryExpression, Unevaluable} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SupervisingCommand} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, LogicalPlan, SupervisingCommand, V2WriteCommand} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMAND, PARAMETER, PARAMETERIZED_QUERY, TreePattern, UNRESOLVED_WITH} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase} @@ -179,9 +179,30 @@ object BindParameters extends Rule[LogicalPlan] with QueryErrorsBase { p0.resolveOperatorsDownWithPruning(_.containsPattern(PARAMETER) && !stop) { case p1 => stop = p1.isInstanceOf[ParameterizedQuery] - p1.transformExpressionsWithPruning(_.containsPattern(PARAMETER)) (f orElse { - case sub: SubqueryExpression => sub.withNewPlan(bind(sub.plan)(f)) - }) + // `InsertIntoStatement.table` and `V2WriteCommand.table` are non-child LogicalPlan + // slots, so the standard `resolveOperatorsDown` traversal never visits parameter + // markers inside them. Recurse explicitly so `INSERT ... IDENTIFIER(:p)` and + // `INSERT INTO IDENTIFIER(:p) REPLACE WHERE ...` resolve under the legacy + // parameter-substitution mode (SPARK-46625). Today only the `OverwriteByExpression` + // variant of `V2WriteCommand` is parser-built with a placeholder in `table`; the trait + // match keeps the rule consistent for any future analyzer-built node in the same shape. + val withBoundTable = p1 match { + case i: InsertIntoStatement if i.table.containsPattern(PARAMETER) => + i.copy(table = bind(i.table)(f)) + case w: V2WriteCommand if w.table.containsPattern(PARAMETER) => + bind(w.table)(f) match { + case nr: NamedRelation => w.withNewTable(nr) + case other => + throw SparkException.internalError( + "Parameter binding on V2WriteCommand.table must preserve " + + s"NamedRelation, but got: ${other.getClass.getName}") + } + case other => other + } + withBoundTable.transformExpressionsWithPruning(_.containsPattern(PARAMETER)) ( + f orElse { + case sub: SubqueryExpression => sub.withNewPlan(bind(sub.plan)(f)) + }) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index bac5651265d94..a5b467d0f0816 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -59,12 +59,22 @@ trait UnresolvedUnaryNode extends UnaryNode with UnresolvedNode /** * A logical plan placeholder that holds the identifier clause string expression. It will be * replaced by the actual logical plan with the evaluated identifier string. + * + * Extends `NamedRelation` so it can occupy a `NamedRelation`-typed slot (e.g. + * `OverwriteByExpression.table`) directly at parse time, instead of wrapping the whole command. + * + * The parser always places this node inside the command's identifier slot (a child slot for + * DELETE/UPDATE/MERGE/CTAS/RTAS, or a non-child slot for `InsertIntoStatement.table` and + * `OverwriteByExpression.table` -- handled via explicit cases in `ResolveIdentifierClause` and + * `BindParameters`). It is never the substitution root of a `WITH ... ` subtree, so + * `CTEInChildren` semantics are not needed: any surrounding `WithCTE` produced by + * `CTESubstitution` targets the inner command directly. */ case class PlanWithUnresolvedIdentifier( identifierExpr: Expression, children: Seq[LogicalPlan], planBuilder: (Seq[String], Seq[LogicalPlan]) => LogicalPlan) - extends UnresolvedNode { + extends UnresolvedNode with NamedRelation { def this(identifierExpr: Expression, planBuilder: Seq[String] => LogicalPlan) = { this(identifierExpr, Nil, (ident, _) => planBuilder(ident)) @@ -72,6 +82,12 @@ case class PlanWithUnresolvedIdentifier( final override val nodePatterns: Seq[TreePattern] = Seq(PLAN_WITH_UNRESOLVED_IDENTIFIER) + // Placeholder name used by error paths that render `NamedRelation.name` for an unresolved + // table reference -- e.g. `SparkStrategies.extractTableNameForError` and the `r: NamedRelation` + // fallback in `QueryCompilationErrors`. Renders as the SQL text of the identifier expression + // (e.g. `IDENTIFIER(:p)` or `concat('a', 'b')`) so error messages remain informative. + override def name: String = identifierExpr.sql + override protected def withNewChildrenInternal( newChildren: IndexedSeq[LogicalPlan]): LogicalPlan = copy(identifierExpr, newChildren, planBuilder) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index a79f64cf53d94..6b08a9281f83a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -933,32 +933,31 @@ class AstBuilder extends DataTypeAstBuilder query: LogicalPlan, queryAliasCtx: TableAliasContext): LogicalPlan = withOrigin(ctx) { ctx match { - // We cannot push withIdentClause() into the write command because: - // 1. `PlanWithUnresolvedIdentifier` is not a NamedRelation - // 2. Write commands do not hold the table logical plan as a child, and we need to add - // additional resolution code to resolve identifiers inside the write commands. + // For all `InsertIntoStatement` / `OverwriteByExpression`-producing branches, build the + // `table` slot directly via `buildWriteTableSlot` so that any + // `PlanWithUnresolvedIdentifier` lives *inside* the command's identifier slot. This + // preserves the `CTEInChildren` shape and lets `CTESubstitution` place `WithCTE` on the + // command's children correctly (SPARK-46625). case table: InsertIntoTableContext => val insertParams = visitInsertIntoTable(table) - withIdentClause(insertParams.relationCtx, Seq(query), (ident, otherPlans) => { - createInsertIntoStatement( - insertParams = insertParams, - ident = ident, - query = otherPlans.head, - overwrite = false, - writePrivileges = Set(TableWritePrivilege.INSERT), - withSchemaEvolution = table.EVOLUTION() != null) - }) + val privileges = Set(TableWritePrivilege.INSERT) + createInsertIntoStatement( + insertParams = insertParams, + tableSlot = buildWriteTableSlot( + insertParams.relationCtx, insertParams.options, privileges), + query = query, + overwrite = false, + withSchemaEvolution = table.EVOLUTION() != null) case table: InsertOverwriteTableContext => val insertParams = visitInsertOverwriteTable(table) - withIdentClause(insertParams.relationCtx, Seq(query), (ident, otherPlans) => { - createInsertIntoStatement( - insertParams = insertParams, - ident = ident, - query = otherPlans.head, - overwrite = true, - writePrivileges = Set(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE), - withSchemaEvolution = table.EVOLUTION() != null) - }) + val privileges = Set(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE) + createInsertIntoStatement( + insertParams = insertParams, + tableSlot = buildWriteTableSlot( + insertParams.relationCtx, insertParams.options, privileges), + query = query, + overwrite = true, + withSchemaEvolution = table.EVOLUTION() != null) case ctx: InsertIntoReplaceBooleanCondContext => // Although REPLACE WHERE and REPLACE ON share a unified grammar rule, they have // different SQL semantics: @@ -969,62 +968,56 @@ class AstBuilder extends DataTypeAstBuilder val isInsertReplaceWhere = ctx.WHERE() != null if (isInsertReplaceWhere) { val options = Option(ctx.optionsClause()) - withIdentClause(ctx.identifierReference, Seq(query), (ident, otherPlans) => { - val table = createUnresolvedRelation( - ctx = ctx.identifierReference, - ident = ident, - optionsClause = options, - writePrivileges = Set(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE), - isStreaming = false) - val deleteExpr = expression(ctx.replaceCondition) - val isByName = ctx.NAME() != null - if (isByName) { - OverwriteByExpression.byName( - table, - df = otherPlans.head, - deleteExpr, - withSchemaEvolution = ctx.EVOLUTION() != null) - } else { - OverwriteByExpression.byPosition( - table, - query = otherPlans.head, - deleteExpr, - withSchemaEvolution = ctx.EVOLUTION() != null) - } - }) - } else { - val insertParams = visitInsertIntoReplaceOn(ctx) - withIdentClause(insertParams.relationCtx, Seq(query), (ident, otherPlans) => { - val query = { - val queryAliasOpt = - getTableAliasWithoutColumnAlias(queryAliasCtx, "INSERT REPLACE ON") - - queryAliasOpt.map { queryAlias => - withOrigin(queryAliasCtx) { - SubqueryAlias(queryAlias, child = otherPlans.head) - } - }.getOrElse(otherPlans.head) - } - createInsertIntoStatement( - insertParams = insertParams, - ident = ident, + val privileges = Set(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE) + // `PlanWithUnresolvedIdentifier` is a `NamedRelation`, so it can occupy + // `OverwriteByExpression.table` directly; the materialization happens in + // `ResolveIdentifierClause` via its `OverwriteByExpression` special-case. + val table = buildWriteTableSlot(ctx.identifierReference, options, privileges) + val deleteExpr = expression(ctx.replaceCondition) + val isByName = ctx.NAME() != null + if (isByName) { + OverwriteByExpression.byName( + table, + df = query, + deleteExpr, + withSchemaEvolution = ctx.EVOLUTION() != null) + } else { + OverwriteByExpression.byPosition( + table, query = query, - overwrite = true, - writePrivileges = Set(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE), + deleteExpr, withSchemaEvolution = ctx.EVOLUTION() != null) - }) - } - case ctx: InsertIntoReplaceUsingContext => - val insertParams = visitInsertIntoReplaceUsing(ctx) - withIdentClause(insertParams.relationCtx, Seq(query), (ident, otherPlans) => { + } + } else { + val insertParams = visitInsertIntoReplaceOn(ctx) + val privileges = Set(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE) + val finalQuery = { + val queryAliasOpt = + getTableAliasWithoutColumnAlias(queryAliasCtx, "INSERT REPLACE ON") + queryAliasOpt.map { queryAlias => + withOrigin(queryAliasCtx) { + SubqueryAlias(queryAlias, child = query) + } + }.getOrElse(query) + } createInsertIntoStatement( insertParams = insertParams, - ident = ident, - query = otherPlans.head, + tableSlot = buildWriteTableSlot( + insertParams.relationCtx, insertParams.options, privileges), + query = finalQuery, overwrite = true, - writePrivileges = Set(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE), withSchemaEvolution = ctx.EVOLUTION() != null) - }) + } + case ctx: InsertIntoReplaceUsingContext => + val insertParams = visitInsertIntoReplaceUsing(ctx) + val privileges = Set(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE) + createInsertIntoStatement( + insertParams = insertParams, + tableSlot = buildWriteTableSlot( + insertParams.relationCtx, insertParams.options, privileges), + query = query, + overwrite = true, + withSchemaEvolution = ctx.EVOLUTION() != null) case dir: InsertOverwriteDirContext => val (isLocal, storage, provider) = visitInsertOverwriteDir(dir) InsertIntoDir(isLocal, storage, provider, query, overwrite = true) @@ -1153,18 +1146,12 @@ class AstBuilder extends DataTypeAstBuilder */ private def createInsertIntoStatement( insertParams: InsertTableParams, - ident: Seq[String], + tableSlot: LogicalPlan, query: LogicalPlan, overwrite: Boolean, - writePrivileges: Set[TableWritePrivilege], withSchemaEvolution: Boolean): InsertIntoStatement = { InsertIntoStatement( - table = createUnresolvedRelation( - ctx = insertParams.relationCtx, - ident = ident, - optionsClause = insertParams.options, - writePrivileges = writePrivileges, - isStreaming = false), + table = tableSlot, partitionSpec = insertParams.partitionSpec, userSpecifiedCols = insertParams.userSpecifiedCols, query = query, @@ -1175,6 +1162,27 @@ class AstBuilder extends DataTypeAstBuilder withSchemaEvolution = withSchemaEvolution) } + /** + * Build the `table` slot of a write command. If the identifier reference is a constant string, + * returns an [[UnresolvedRelation]] directly; otherwise returns a + * [[PlanWithUnresolvedIdentifier]] that materializes into an [[UnresolvedRelation]] once the + * identifier expression is resolved. Both branches produce a [[NamedRelation]], so the result + * fits `NamedRelation`-typed slots (e.g. `OverwriteByExpression.table`) as well as the more + * general `LogicalPlan` slot of `InsertIntoStatement.table`. + * + * Placing the placeholder in the identifier slot (rather than wrapping the entire write command) + * preserves the `CTEInChildren` shape at parse time, so `CTESubstitution` places `WithCTE` on the + * command's children correctly. See SPARK-46625. + */ + private def buildWriteTableSlot( + ctx: IdentifierReferenceContext, + optionsClause: Option[OptionsClauseContext], + writePrivileges: Set[TableWritePrivilege]): NamedRelation = { + withIdentClause(ctx, parts => + createUnresolvedRelation(ctx, parts, optionsClause, writePrivileges, isStreaming = false)) + .asInstanceOf[NamedRelation] + } + /** * Write to a directory, returning a [[InsertIntoDir]] logical plan. */ @@ -5687,42 +5695,45 @@ class AstBuilder extends DataTypeAstBuilder bucketSpec.map(_.asTransform) ++ clusterBySpec.map(_.asTransform) - val asSelectPlan = Option(ctx.query).map(plan).toSeq - withIdentClause(identifierContext, asSelectPlan, (identifiers, otherPlans) => { - val namedConstraints = - constraints.map(c => c.withTableName(identifiers.last)) - val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment, - collation, serdeInfo, external, namedConstraints) - val identifier = withOrigin(identifierContext) { - UnresolvedIdentifier(identifiers) - } - otherPlans.headOption match { - case Some(_) if columns.nonEmpty => + Option(ctx.query).map(plan) match { + case Some(query) => + // CTAS path: push the identifier placeholder into the `name` slot so that + // `CTESubstitution` sees the `CreateTableAsSelect` (a `CTEInChildren`) directly + // and places `WithCTE` on its children (SPARK-46625). CTAS disallows constraints / + // user-specified columns / non-reference partition columns, so we don't need the + // identifier parts at parse time. + if (columns.nonEmpty) { operationNotAllowed( - "Schema may not be specified in a Create Table As Select (CTAS) statement", - ctx) - - case Some(_) if partCols.nonEmpty => - // non-reference partition columns are not allowed because schema can't be specified + "Schema may not be specified in a Create Table As Select (CTAS) statement", ctx) + } + if (partCols.nonEmpty) { operationNotAllowed( - "Partition column types may not be specified in Create Table As Select (CTAS)", - ctx) - - case Some(_) if constraints.nonEmpty => + "Partition column types may not be specified in Create Table As Select (CTAS)", ctx) + } + if (constraints.nonEmpty) { operationNotAllowed( - "Constraints may not be specified in a Create Table As Select (CTAS) statement", - ctx) - - case Some(query) => - CreateTableAsSelect(identifier, partitioning, query, tableSpec, Map.empty, ifNotExists) - - case _ => + "Constraints may not be specified in a Create Table As Select (CTAS) statement", ctx) + } + val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment, + collation, serdeInfo, external, constraints = Nil) + val nameSlot = withIdentClause(identifierContext, identifiers => + withOrigin(identifierContext) { UnresolvedIdentifier(identifiers) }) + CreateTableAsSelect(nameSlot, partitioning, query, tableSpec, Map.empty, ifNotExists) + case None => + withIdentClause(identifierContext, identifiers => { + val namedConstraints = + constraints.map(c => c.withTableName(identifiers.last)) + val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment, + collation, serdeInfo, external, namedConstraints) + val identifier = withOrigin(identifierContext) { + UnresolvedIdentifier(identifiers) + } // Note: table schema includes both the table columns list and the partition columns // with data type. val allColumns = columns ++ partCols CreateTable(identifier, allColumns, partitioning, tableSpec, ignoreIfExists = ifNotExists) - } - }) + }) + } } /** @@ -5771,43 +5782,42 @@ class AstBuilder extends DataTypeAstBuilder clusterBySpec.map(_.asTransform) val identifierContext = ctx.replaceTableHeader().identifierReference() - val asSelectPlan = Option(ctx.query).map(plan).toSeq - withIdentClause(identifierContext, asSelectPlan, (identifiers, otherPlans) => { - val namedConstraints = - constraints.map(c => c.withTableName(identifiers.last)) - val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment, - collation, serdeInfo, external = false, namedConstraints) - val identifier = withOrigin(identifierContext) { - UnresolvedIdentifier(identifiers) - } - otherPlans.headOption match { - case Some(_) if columns.nonEmpty => + Option(ctx.query).map(plan) match { + case Some(query) => + // RTAS path: push the identifier placeholder into the `name` slot (see CTAS above). + if (columns.nonEmpty) { operationNotAllowed( - "Schema may not be specified in a Replace Table As Select (RTAS) statement", - ctx) - - case Some(_) if partCols.nonEmpty => - // non-reference partition columns are not allowed because schema can't be specified + "Schema may not be specified in a Replace Table As Select (RTAS) statement", ctx) + } + if (partCols.nonEmpty) { operationNotAllowed( - "Partition column types may not be specified in Replace Table As Select (RTAS)", - ctx) - - case Some(_) if constraints.nonEmpty => + "Partition column types may not be specified in Replace Table As Select (RTAS)", ctx) + } + if (constraints.nonEmpty) { operationNotAllowed( - "Constraints may not be specified in a Replace Table As Select (RTAS) statement", - ctx) - - case Some(query) => - ReplaceTableAsSelect(identifier, partitioning, query, tableSpec, - writeOptions = Map.empty, orCreate = orCreate) - - case _ => + "Constraints may not be specified in a Replace Table As Select (RTAS) statement", ctx) + } + val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment, + collation, serdeInfo, external = false, constraints = Nil) + val nameSlot = withIdentClause(identifierContext, identifiers => + withOrigin(identifierContext) { UnresolvedIdentifier(identifiers) }) + ReplaceTableAsSelect(nameSlot, partitioning, query, tableSpec, + writeOptions = Map.empty, orCreate = orCreate) + case None => + withIdentClause(identifierContext, identifiers => { + val namedConstraints = + constraints.map(c => c.withTableName(identifiers.last)) + val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment, + collation, serdeInfo, external = false, namedConstraints) + val identifier = withOrigin(identifierContext) { + UnresolvedIdentifier(identifiers) + } // Note: table schema includes both the table columns list and the partition columns // with data type. val allColumns = columns ++ partCols ReplaceTable(identifier, allColumns, partitioning, tableSpec, orCreate = orCreate) - } - }) + }) + } } /** @@ -6549,35 +6559,74 @@ class AstBuilder extends DataTypeAstBuilder * }}} */ override def visitCacheTable(ctx: CacheTableContext): LogicalPlan = withOrigin(ctx) { - val query = Option(ctx.query).map(plan) - withIdentClause(ctx.identifierReference, query.toSeq, (ident, children) => { - if (query.isDefined && ident.length > 1) { - val catalogAndNamespace = ident.init - throw QueryParsingErrors.addCatalogInCacheTableAsSelectNotAllowedError( - catalogAndNamespace.quoted, ctx) - } - val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) - val isLazy = ctx.LAZY != null - if (query.isDefined) { + val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) + val isLazy = ctx.LAZY != null + Option(ctx.query).map(plan) match { + case Some(query) => // Disallow parameter markers in the query of the cache. // We need this limitation because we store the original query text, pre substitution. - // To lift this we would need to reconstitute the query with parameter markers replaced with - // the values given at CACHE TABLE time, or we would need to store the parameter values - // alongside the text. - // The same rule can be found in CREATE VIEW builder. - checkInvalidParameter(query.get, "the query of CACHE TABLE") - CacheTableAsSelect(ident.head, children.head, source(ctx.query()), isLazy, options) - } else { - CacheTable( - createUnresolvedRelation( - ctx.identifierReference, - ident, - None, - writePrivileges = Set.empty, - isStreaming = false), - ident, isLazy, options) + // To lift this we would need to reconstitute the query with parameter markers replaced + // with the values given at CACHE TABLE time, or we would need to store the parameter + // values alongside the text. The same rule can be found in CREATE VIEW builder. + checkInvalidParameter(query, "the query of CACHE TABLE") + // `CacheTableAsSelect.tempViewName` is an `Expression` slot: a `Literal` for direct + // identifiers and `IDENTIFIER('literal-string')`, or an + // `ExpressionWithUnresolvedIdentifier` for `IDENTIFIER()`. Building the name + // as an expression avoids the wrap-the-whole-command form (where the + // `PlanWithUnresolvedIdentifier` would wrap the entire `CacheTableAsSelect`), which is the + // last shape that motivated the `WithCTE(, _)` workaround chain in SPARK-46625. + val nameExpr = buildCacheTableAsSelectName(ctx.identifierReference, ctx) + CacheTableAsSelect(nameExpr, query, source(ctx.query()), isLazy, options) + case None => + withIdentClause(ctx.identifierReference, ident => { + CacheTable( + createUnresolvedRelation( + ctx.identifierReference, + ident, + None, + writePrivileges = Set.empty, + isStreaming = false), + ident, isLazy, options) + }) + } + } + + /** + * Build the `tempViewName` expression for a `CACHE TABLE ... AS SELECT` command from an + * `identifierReference` context. + * + * `CacheTableAsSelect` requires a single-part temp view name (no catalog/namespace). For direct + * identifiers and `IDENTIFIER('literal-string')` we validate this at parse time and produce a + * non-null string `Literal`. For `IDENTIFIER()` we emit an + * `ExpressionWithUnresolvedIdentifier` whose builder validates the single-part invariant when + * the identifier expression is resolved. + */ + private def buildCacheTableAsSelectName( + ctx: IdentifierReferenceContext, + parentCtx: CacheTableContext): Expression = { + // Use the outer `parentCtx` for the multi-part error so the query context points at the + // whole `CACHE TABLE ... AS ...` statement, not just the identifier reference. The caller + // (`visitCacheTable`) already has `withOrigin(parentCtx)` in scope. + def singlePart(parts: Seq[String]): String = { + if (parts.length > 1) { + throw QueryParsingErrors.addCatalogInCacheTableAsSelectNotAllowedError( + parts.init.quoted, parentCtx) } - }) + parts.head + } + val exprCtx = ctx.expression + if (exprCtx != null) { + expression(exprCtx) match { + case Literal(value, _: StringType) if value != null => + Literal(singlePart(parseMultipartIdentifier(value.toString))) + case expr => + new ExpressionWithUnresolvedIdentifier( + withOrigin(exprCtx) { expr }, + parts => Literal(singlePart(parts))) + } + } else { + Literal(singlePart(visitMultipartIdentifier(ctx.multipartIdentifier))) + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 774c783ecf8a2..4fbe71ed7d3e5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.trees.{LeafLike, UnaryLike} import org.apache.spark.sql.connector.catalog.ColumnDefaultValue import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types.DataType +import org.apache.spark.util.collection.BitSet /** * A logical plan node that contains exactly what was parsed from SQL. @@ -210,6 +211,16 @@ case class InsertIntoStatement( override def child: LogicalPlan = query override protected def withNewChildInternal(newChild: LogicalPlan): InsertIntoStatement = copy(query = newChild) + + // `table` is a non-child LogicalPlan slot (`child = query`), so the default tree-pattern + // propagation in TreeNode/QueryPlan does not see patterns inside it. Add `table`'s bits here + // so that `containsPattern(...)` pruning correctly reports patterns living in `table` + // (e.g. `PARAMETER`, `PLAN_WITH_UNRESOLVED_IDENTIFIER`). + override protected def getDefaultTreePatternBits: BitSet = { + val bits = super.getDefaultTreePatternBits + bits.union(table.treePatternBits) + bits + } } sealed abstract class InsertReplaceCriteria extends Expression with Unevaluable { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index b1ab46ee94817..5dd2f10c89cbe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -44,6 +44,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BooleanType, DataType, IntegerType, MapType, MetadataBuilder, StringType, StructType} import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.Utils +import org.apache.spark.util.collection.BitSet // For v2 DML commands, it may end up with the v1 fallback code path and need to build a DataFrame // which is required by the DS v1 API. We need to keep the analyzed input query plan to build @@ -106,6 +107,18 @@ trait V2WriteCommand override def child: LogicalPlan = query + // `table` is a non-child slot, so the default tree-pattern propagation in TreeNode/QueryPlan + // does not see patterns inside it. Add `table`'s bits so that `containsPattern(...)` pruning + // correctly reports patterns living in `table` (e.g. `PLAN_WITH_UNRESOLVED_IDENTIFIER`, + // `PARAMETER`). Only `OverwriteByExpression` is constructed at parse time with a placeholder + // in `table`, but applying this uniformly across all `V2WriteCommand`s keeps the invariant + // consistent for any future analyzer-built node that lands a placeholder in the same slot. + override protected def getDefaultTreePatternBits: BitSet = { + val bits = super.getDefaultTreePatternBits + bits.union(table.treePatternBits) + bits + } + override lazy val resolved: Boolean = table.resolved && query.resolved && outputResolved def outputResolved: Boolean = { @@ -1955,7 +1968,7 @@ case class CacheTable( * The logical plan of the CACHE TABLE ... AS SELECT command. */ case class CacheTableAsSelect( - tempViewName: String, + tempViewName: Expression, plan: LogicalPlan, originalText: String, isLazy: Boolean, @@ -1963,6 +1976,19 @@ case class CacheTableAsSelect( isAnalyzed: Boolean = false, referredTempFunctions: Seq[String] = Seq.empty) extends AnalysisOnlyCommand with CTEInChildren { + + /** + * Returns the temp view name string. Must only be called after analysis, when `tempViewName` + * has been resolved to a non-null string `Literal`. `CheckAnalysis` enforces this invariant. + */ + def tempViewNameString: String = tempViewName match { + case Literal(value, _: StringType) if value != null => value.toString + case other => + throw SparkException.internalError( + "CacheTableAsSelect.tempViewName must be a non-null string literal after analysis, " + + s"but got: ${other.sql}") + } + override protected def withNewChildrenInternal( newChildren: IndexedSeq[LogicalPlan]): CacheTableAsSelect = { assert(!isAnalyzed) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 1ac417ddc9376..4f4aa6e24c5aa 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -2761,7 +2761,7 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan("CACHE TABLE t AS SELECT * FROM testData"), CacheTableAsSelect( - "t", + Literal("t"), Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("testData"))), "SELECT * FROM testData", false, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 3c58298ec9211..ed067a3f00d1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -778,7 +778,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case r: CacheTableAsSelect => CacheTableAsSelectExec( - r.tempViewName, r.plan, r.originalText, r.isLazy, r.options, r.referredTempFunctions) :: Nil + r.tempViewNameString, r.plan, r.originalText, r.isLazy, r.options, + r.referredTempFunctions) :: Nil case r: UncacheTable => def isTempView(table: LogicalPlan): Boolean = table match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala index d6b22431e854e..575fcc058169e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala @@ -20,10 +20,12 @@ package org.apache.spark.sql import java.time.{Instant, LocalDate, LocalDateTime, ZoneId} import org.apache.spark.sql.catalyst.ExtendedAnalysisException +import org.apache.spark.sql.catalyst.analysis.{BindParameters, CTESubstitution, ExpressionWithUnresolvedIdentifier, NameParameterizedQuery, PlanWithUnresolvedIdentifier} import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.catalyst.plans.logical.Limit +import org.apache.spark.sql.catalyst.plans.logical.{CacheTableAsSelect, CTEInChildren, Limit, OverwriteByExpression, ReplaceTableAsSelect, WithCTE} import org.apache.spark.sql.catalyst.trees.SQLQueryContext +import org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.functions.{array, call_function, lit, map, map_from_arrays, map_from_entries, str_to_map, struct} import org.apache.spark.sql.internal.SQLConf @@ -2460,4 +2462,206 @@ class ParametersSuite extends SharedSparkSession { spark.sql("SELECT 1", Array.empty[Any]), Row(1)) } + + // SPARK-46625: WITH ... SELECT ... FROM cte + // The placeholder is pushed into the command's identifier slot at parse time, so + // `CTESubstitution` sees the `CTEInChildren` directly and never produces the invalid + // `WithCTE(InsertIntoStatement, ...)` / `WithCTE(CreateTableAsSelect, ...)` shape. + private def assertNoWithCTEAroundCTEInChildren(df: DataFrame): Unit = { + df.queryExecution.analyzed.foreach { + case WithCTE(_: CTEInChildren, _) => + fail(s"Found invalid WithCTE(CTEInChildren, _) shape:\n${df.queryExecution.analyzed}") + case _ => + } + } + + test("SPARK-46625: WITH ... INSERT OVERWRITE TABLE IDENTIFIER(:p) SELECT ... FROM cte") { + withTable("t_cte_overwrite") { + sql("CREATE TABLE t_cte_overwrite (a INT) USING PARQUET") + sql("INSERT INTO t_cte_overwrite VALUES (10)") + val df = spark.sql( + """WITH transformation AS (SELECT 1 AS a) + |INSERT OVERWRITE TABLE IDENTIFIER(:tname) + |SELECT * FROM transformation""".stripMargin, + Map("tname" -> "t_cte_overwrite")) + assertNoWithCTEAroundCTEInChildren(df) + checkAnswer(spark.table("t_cte_overwrite"), Row(1)) + } + } + + test("SPARK-46625: WITH ... INSERT INTO IDENTIFIER(:p) SELECT ... FROM cte") { + withTable("t_cte_into") { + sql("CREATE TABLE t_cte_into (a INT) USING PARQUET") + val df = spark.sql( + """WITH transformation AS (SELECT 7 AS a) + |INSERT INTO IDENTIFIER(:tname) + |SELECT * FROM transformation""".stripMargin, + Map("tname" -> "t_cte_into")) + assertNoWithCTEAroundCTEInChildren(df) + checkAnswer(spark.table("t_cte_into"), Row(7)) + } + } + + test("SPARK-46625: CREATE TABLE IDENTIFIER(:p) AS WITH ... SELECT ... FROM cte") { + withTable("t_cte_ctas") { + val df = spark.sql( + """CREATE TABLE IDENTIFIER(:tname) USING PARQUET AS + |WITH transformation AS (SELECT 3 AS a) + |SELECT * FROM transformation""".stripMargin, + Map("tname" -> "t_cte_ctas")) + assertNoWithCTEAroundCTEInChildren(df) + checkAnswer(spark.table("t_cte_ctas"), Row(3)) + } + } + + // SPARK-46625: legacy parameter-substitution mode triggers the parameters.scala traversal + // path. The placeholder lives in `InsertIntoStatement.table`, which is *not* a child, so this + // exercises the `InsertIntoStatement` special-case in `BindParameters.bind` that recurses into + // the `table` slot, and the `getDefaultTreePatternBits` override on `InsertIntoStatement` that + // exposes `table`'s tree-pattern bits for pruning. + test("SPARK-46625: INSERT IDENTIFIER(:p) under legacy parameter substitution") { + withSQLConf(SQLConf.LEGACY_PARAMETER_SUBSTITUTION_CONSTANTS_ONLY.key -> "true") { + withTable("t_legacy_param") { + sql("CREATE TABLE t_legacy_param (a INT) USING PARQUET") + spark.sql( + """WITH transformation AS (SELECT 11 AS a) + |INSERT INTO IDENTIFIER(:tname) + |SELECT * FROM transformation""".stripMargin, + Map("tname" -> "t_legacy_param")) + checkAnswer(spark.table("t_legacy_param"), Row(11)) + } + } + } + + // SPARK-46625: INSERT INTO REPLACE WHERE goes through `OverwriteByExpression`, whose `table` + // slot is typed `NamedRelation`. `PlanWithUnresolvedIdentifier` extends `NamedRelation` so the + // placeholder sits in the slot directly. Verify on the parsed plan that the placeholder lives + // in `OverwriteByExpression.table` rather than wrapping the whole command -- running the + // analyzer fully would require a v2 catalog. + test("SPARK-46625: WITH ... INSERT INTO IDENTIFIER(:p) REPLACE WHERE ... parser") { + // Use a non-literal-string expression so `withIdentClause` produces + // `PlanWithUnresolvedIdentifier` rather than short-circuiting to `UnresolvedRelation`. + val parsedPlan = spark.sessionState.sqlParser.parsePlan( + """WITH transformation AS (SELECT 99 AS a) + |INSERT INTO IDENTIFIER('some' || '_table') REPLACE WHERE a = 10 + |SELECT * FROM transformation""".stripMargin) + val overwrite = parsedPlan.collectFirst { case o: OverwriteByExpression => o }.getOrElse( + fail(s"Expected OverwriteByExpression in parsed plan:\n$parsedPlan")) + assert(overwrite.table.isInstanceOf[PlanWithUnresolvedIdentifier], + s"Expected OverwriteByExpression.table to be PlanWithUnresolvedIdentifier, " + + s"got ${overwrite.table.getClass.getSimpleName}:\n$parsedPlan") + // After CTESubstitution runs, the CTE defs should land on the command's children (because + // OverwriteByExpression is a CTEInChildren) -- never as `WithCTE(OverwriteByExpression, _)`. + val substituted = CTESubstitution.apply(parsedPlan) + substituted.foreach { + case WithCTE(_: CTEInChildren, _) => + fail(s"Found invalid WithCTE(CTEInChildren, _) shape after CTESubstitution:\n$substituted") + case _ => + } + } + + // SPARK-46625: Parameter inside `IDENTIFIER(:p)` on REPLACE WHERE lives in + // `OverwriteByExpression.table`, which is a non-child slot. Verify that + // `BindParameters.bind` reaches into the slot via the explicit `OverwriteByExpression` + // recursion (parameters.scala) and that the `getDefaultTreePatternBits` override on + // `OverwriteByExpression` exposes the PARAMETER bit for pruning. Done at the rule level + // because driving REPLACE WHERE through full analysis would require a v2 catalog. + test("SPARK-46625: BindParameters recurses into OverwriteByExpression.table") { + val parsedPlan = spark.sessionState.sqlParser.parsePlan( + """INSERT INTO IDENTIFIER(:tname) REPLACE WHERE a = 10 + |SELECT 1 AS a""".stripMargin) + val overwrite = parsedPlan.collectFirst { case o: OverwriteByExpression => o }.getOrElse( + fail(s"Expected OverwriteByExpression in parsed plan:\n$parsedPlan")) + // Pruning prerequisite: the PARAMETER bit must be visible at the OverwriteByExpression + // level (it lives inside `table`, which is not a child); this exercises the + // `getDefaultTreePatternBits` override. + assert(overwrite.containsPattern(PARAMETER), + "OverwriteByExpression.getDefaultTreePatternBits must propagate `table`'s PARAMETER bit") + + val bound = BindParameters.apply( + NameParameterizedQuery(parsedPlan, Seq("tname"), Seq(Literal("foo_table")))) + val boundOverwrite = bound.collectFirst { case o: OverwriteByExpression => o }.getOrElse( + fail(s"Expected OverwriteByExpression in bound plan:\n$bound")) + assert(!boundOverwrite.table.containsPattern(PARAMETER), + s"Expected :tname inside OverwriteByExpression.table to be bound, got:\n$boundOverwrite") + } + + // SPARK-46625: `CacheTableAsSelect.tempViewName` is an `Expression` slot, so an + // `IDENTIFIER()` produces an `ExpressionWithUnresolvedIdentifier` there instead of + // wrapping the entire command in a `PlanWithUnresolvedIdentifier`. Verify on the parsed plan + // that the name slot holds the expression placeholder and no `WithCTE(CTEInChildren, _)` shape + // survives `CTESubstitution` (running the cache through full analysis would require the temp + // view machinery, so this is a parser-level test). + test("SPARK-46625: CACHE TABLE IDENTIFIER(...) AS WITH ... SELECT ... parser") { + val parsedPlan = spark.sessionState.sqlParser.parsePlan( + """CACHE TABLE IDENTIFIER('some' || '_view') AS + |WITH transformation AS (SELECT 4 AS a) + |SELECT * FROM transformation""".stripMargin) + val ctas = parsedPlan.collectFirst { case c: CacheTableAsSelect => c }.getOrElse( + fail(s"Expected CacheTableAsSelect in parsed plan:\n$parsedPlan")) + assert(ctas.tempViewName.isInstanceOf[ExpressionWithUnresolvedIdentifier], + s"Expected CacheTableAsSelect.tempViewName to be ExpressionWithUnresolvedIdentifier, " + + s"got ${ctas.tempViewName.getClass.getSimpleName}:\n$parsedPlan") + val substituted = CTESubstitution.apply(parsedPlan) + substituted.foreach { + case WithCTE(_: CTEInChildren, _) => + fail(s"Found invalid WithCTE(CTEInChildren, _) shape after CTESubstitution:\n$substituted") + case _ => + } + } + + // SPARK-46625: Regression for the `if c.tempViewName.resolved` guard in CheckAnalysis. When + // the IDENTIFIER expression itself fails to resolve (e.g. references an unresolved column), + // the guard skips the invariant-validation case so the catch-all `LogicalPlan` case can + // produce `UNRESOLVED_COLUMN`. Without the guard, the invariant case would pre-empt this + // path and throw a `SparkException internal error` instead. + test("SPARK-46625: CACHE TABLE IDENTIFIER() reports UNRESOLVED_COLUMN") { + val ex = intercept[AnalysisException] { + spark.sql("CACHE TABLE IDENTIFIER(unresolved_col) AS SELECT 1 AS a") + } + assert(ex.getCondition != null && ex.getCondition.startsWith("UNRESOLVED_COLUMN"), + s"Expected UNRESOLVED_COLUMN.*, got ${ex.getCondition}: ${ex.getMessage}") + assert(!ex.getMessage.contains("CacheTableAsSelect.tempViewName must be"), + s"Internal-error message leaked into user-facing error: ${ex.getMessage}") + } + + // SPARK-46625: End-to-end CACHE TABLE IDENTIFIER(:p) AS WITH ... SELECT ... -- exercises the + // `tempViewNameString` extraction in `DataSourceV2Strategy` and the `CheckAnalysis` invariant + // case for `CacheTableAsSelect.tempViewName`. The parser-level test above already verifies + // the placement and CTE shape; this one drives the full analysis + execution path. + test("SPARK-46625: CACHE TABLE IDENTIFIER(:p) AS WITH ... SELECT ...") { + withTempView("t_cte_cache") { + val df = spark.sql( + """CACHE TABLE IDENTIFIER(:tname) AS + |WITH transformation AS (SELECT 21 AS a) + |SELECT * FROM transformation""".stripMargin, + Map("tname" -> "t_cte_cache")) + assertNoWithCTEAroundCTEInChildren(df) + checkAnswer(spark.table("t_cte_cache"), Row(21)) + } + } + + // SPARK-46625: RTAS mirrors CTAS -- the placeholder goes into `ReplaceTableAsSelect.name` + // at parse time. Verify on the parsed plan that the placeholder lives in that slot and that + // no `WithCTE(CTEInChildren, _)` shape survives `CTESubstitution`. Running RTAS through full + // analysis would require a v2 catalog, so this is a parser-level test. + test("SPARK-46625: REPLACE TABLE IDENTIFIER(...) AS WITH ... SELECT ... parser") { + // Use a non-literal-string expression so `withIdentClause` produces + // `PlanWithUnresolvedIdentifier` rather than short-circuiting to `UnresolvedIdentifier`. + val parsedPlan = spark.sessionState.sqlParser.parsePlan( + """REPLACE TABLE IDENTIFIER('some' || '_table') USING PARQUET AS + |WITH transformation AS (SELECT 5 AS a) + |SELECT * FROM transformation""".stripMargin) + val rtas = parsedPlan.collectFirst { case r: ReplaceTableAsSelect => r }.getOrElse( + fail(s"Expected ReplaceTableAsSelect in parsed plan:\n$parsedPlan")) + assert(rtas.name.isInstanceOf[PlanWithUnresolvedIdentifier], + s"Expected ReplaceTableAsSelect.name to be PlanWithUnresolvedIdentifier, " + + s"got ${rtas.name.getClass.getSimpleName}:\n$parsedPlan") + val substituted = CTESubstitution.apply(parsedPlan) + substituted.foreach { + case WithCTE(_: CTEInChildren, _) => + fail(s"Found invalid WithCTE(CTEInChildren, _) shape after CTESubstitution:\n$substituted") + case _ => + } + } }