Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions python/pyspark/sql/tests/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,47 @@ def test_catalog_analyze_table(self):
spark.sql(f"INSERT INTO {t} VALUES (1)")
spark.catalog.analyzeTable(t, noScan=True)

def test_path_current_path_disabled(self):
# current_path() is a regular builtin and resolves even when
# spark.sql.path.enabled is false. The DataFrame and SQL surfaces must agree.
from pyspark.sql.functions import current_path

spark = self.spark
with self.sql_conf({"spark.sql.path.enabled": False}):
sql_form = spark.sql("SELECT current_path()").collect()[0][0]
self.assertIsInstance(sql_form, str)
self.assertNotEqual(sql_form, "")
api_form = spark.range(1).select(current_path()).collect()[0][0]
self.assertEqual(sql_form, api_form)

def test_path_set_path_and_current_path(self):
# SET PATH is parsed and applied; current_path() reflects it
# over both the SQL and DataFrame surfaces. Restores DEFAULT_PATH on exit.
from pyspark.sql.functions import current_path

spark = self.spark
with self.sql_conf({"spark.sql.path.enabled": True}):
try:
spark.sql("SET PATH = spark_catalog.default, system.builtin")
sql_form = spark.sql("SELECT current_path()").collect()[0][0]
self.assertEqual(sql_form, "spark_catalog.default,system.builtin")
api_form = spark.range(1).select(current_path()).collect()[0][0]
self.assertEqual(sql_form, api_form)
finally:
spark.sql("SET PATH = DEFAULT_PATH")

def test_path_set_path_rejected_when_disabled(self):
# SET PATH must raise UNSUPPORTED_FEATURE.SET_PATH_WHEN_DISABLED
# when the feature flag is off (covers both classic and Connect error paths).
spark = self.spark
with self.sql_conf({"spark.sql.path.enabled": False}):
with self.assertRaises(AnalysisException) as ctx:
spark.sql("SET PATH = spark_catalog.default")
self.assertEqual(
ctx.exception.getCondition(),
"UNSUPPORTED_FEATURE.SET_PATH_WHEN_DISABLED",
)


class CatalogTests(CatalogTestsMixin, ReusedSQLTestCase):
pass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ singleTableSchema
: colTypeList EOF
;

singlePathElementList
: pathElement (COMMA pathElement)* EOF
;

singleRoutineParamList
: colDefinitionList EOF
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@
* views, and functions.
* <p>
* Catalog implementations must implement this marker interface to be loaded by
* {@link Catalogs#load(String, SQLConf)}. The loader will instantiate catalog classes using the
* {@link org.apache.spark.sql.connector.catalog.Catalogs#load(String,SQLConf)}.
* The loader will instantiate catalog classes using the
* required public no-arg constructor. After creating an instance, it will be configured by calling
* {@link #initialize(String, CaseInsensitiveStringMap)}.
* {@link #initialize(String,CaseInsensitiveStringMap)}.
* <p>
* Catalog implementations are registered to a name by adding a configuration option to Spark:
* {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. All configuration properties
Expand All @@ -56,8 +57,8 @@ public interface CatalogPlugin {
/**
* Called to get this catalog's name.
* <p>
* This method is only called after {@link #initialize(String, CaseInsensitiveStringMap)} is
* called to pass the catalog's name.
* This method is only called after
* {@link #initialize(String,CaseInsensitiveStringMap)} is called to pass the catalog's name.
*/
String name();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,27 +338,37 @@ class Analyzer(
AnalysisContext.reset()
try {
AnalysisHelper.markInAnalyzer {
sessionConf match {
case Some(c) => SQLConf.withExistingConf(c) { runAnalysis() }
case None => runAnalysis()
}
runWithSessionConf(runAnalysis())
}
} finally {
AnalysisContext.reset()
}
} else {
AnalysisContext.withNewAnalysisContext {
AnalysisHelper.markInAnalyzer {
sessionConf match {
case Some(c) => SQLConf.withExistingConf(c) { runAnalysis() }
case None => runAnalysis()
}
runWithSessionConf(runAnalysis())
}
}
}
}
}

/**
* Runs `thunk` under the analyzer's [[sessionConf]] for analyzer isolation, but yields to any
* outer [[SQLConf.withExistingConf]] scope (e.g. a SQL UDF / view body that pinned the
* creation-time configs). Falls through unchanged when [[sessionConf]] is unset, or when the
* outer scope already installed a different conf -- otherwise the outer scope's conf would be
* silently clobbered.
*/
private def runWithSessionConf[T](thunk: => T): T = sessionConf match {
case None => thunk
case Some(c) =>
SQLConf.getExistingConfIfSet match {
case Some(outer) if outer ne c => thunk
case _ => SQLConf.withExistingConf(c) { thunk }
}
}

/**
* Returns a copy of this analyzer that uses the given [[CatalogManager]] for all catalog
* lookups. All other configuration (extended rules, checks, etc.) is preserved. Used by
Expand Down Expand Up @@ -392,13 +402,8 @@ class Analyzer(
}
}

private def executeSameContext(plan: LogicalPlan): LogicalPlan = sessionConf match {
// Respect explicit nested SQLConf overrides (e.g. persisted SQL UDF/view configs).
// Otherwise, run analysis with the captured session conf for analyzer isolation.
case Some(c) if SQLConf.get ne c => super.execute(plan)
case Some(c) => SQLConf.withExistingConf(c) { super.execute(plan) }
case None => super.execute(plan)
}
private def executeSameContext(plan: LogicalPlan): LogicalPlan =
runWithSessionConf(super.execute(plan))

def resolver: Resolver = conf.resolver

Expand Down Expand Up @@ -1977,14 +1982,15 @@ class Analyzer(
* This is used for special syntax transformations (e.g., COUNT(*) -> COUNT(1)) that
* should only apply to builtin functions, not to user-defined functions.
*
* In legacy mode (sessionOrder="first"), temp functions shadow builtins, so an
* unqualified name that matches a temp function should NOT be treated as builtin.
* When the effective SQL PATH puts `system.session` before `system.builtin`, temp
* functions shadow builtins, so an unqualified name that matches a temp function
* should NOT be treated as builtin.
*/
private def matchesFunctionName(nameParts: Seq[String], expectedName: String): Boolean = {
if (!FunctionResolution.isUnqualifiedOrBuiltinFunctionName(nameParts, expectedName)) {
return false
}
if (nameParts.size == 1 && conf.sessionFunctionResolutionOrder == "first") {
if (nameParts.size == 1 && functionResolution.isSessionBeforeBuiltinInPath) {
val v1Catalog = catalogManager.v1SessionCatalog
!v1Catalog.isTemporaryFunction(FunctionIdentifier(nameParts.head))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,27 @@ class FunctionResolution(

private val trimWarningEnabled = new AtomicBoolean(true)

/** Returns the current catalog path, preferring the view's context if resolving a view. */
private def currentCatalogPath: Seq[String] = {
val ctx = AnalysisContext.get.catalogAndNamespace
if (ctx.nonEmpty) ctx
else (Seq(catalogManager.currentCatalog.name) ++ catalogManager.currentNamespace).toSeq
}

/** True if nameParts is 3-part and the first part is the system catalog name. */
private def isSystemCatalogQualified(nameParts: Seq[String]): Boolean =
nameParts.length == 3 &&
nameParts.head.equalsIgnoreCase(CatalogManager.SYSTEM_CATALOG_NAME)

/**
* True iff `system.session` is searched before `system.builtin` in the effective SQL PATH.
*
* Drives the `count(*) -> count(1)` rewrite (which must skip transformation when a temp
* `count` shadows the builtin) and the `SessionCatalog` security check that blocks creating
* a temp function with a builtin's name. Reads the live PATH via `CatalogManager` and
* applies the same kinds extraction that drives `SessionCatalog`'s fast-path provider, so
* the predicate stays in sync with the lookup loop's actual order. Uses the consolidated
* snapshot helper (SPARK-56939) so the (catalog, namespace, path) triple is observed
* atomically.
*/
def isSessionBeforeBuiltinInPath: Boolean = {
catalogManager.sessionFunctionKindsForUnqualifiedResolution().headOption
.contains(org.apache.spark.sql.catalyst.catalog.SessionCatalog.Temp)
}

/**
* Produces the ordered list of candidate names for resolution. Expansion happens in two cases:
*
Expand All @@ -101,18 +110,10 @@ class FunctionResolution(
* directly, matching [[RelationResolution.relationResolutionEntries]] so routine order stays
* aligned with relation order.
*/
private[analysis] def sqlResolutionPathEntriesForAnalysis: Seq[Seq[String]] = {
AnalysisContext.get.resolutionPathEntries match {
case Some(entries) if conf.pathEnabled => entries
case _ =>
val pathDefault = currentCatalogPath
catalogManager.sqlResolutionPathEntries(
pathDefault.head,
pathDefault.tail.toSeq,
catalogManager.currentCatalog.name,
catalogManager.currentNamespace.toSeq)
}
}
private[analysis] def sqlResolutionPathEntriesForAnalysis: Seq[Seq[String]] =
catalogManager.resolutionPathEntriesForAnalysis(
AnalysisContext.get.resolutionPathEntries,
AnalysisContext.get.catalogAndNamespace)

private def resolutionCandidates(nameParts: Seq[String]): Seq[Seq[String]] = {
if (nameParts.size == 1) {
Expand Down Expand Up @@ -370,7 +371,20 @@ class FunctionResolution(
if (nameParts.length == 1) {
// Must match [[resolutionCandidates]] / [[resolveFunction]]: single-part names use PATH +
// session order, not only the current namespace (LookupCatalog single-part rule).
for (candidate <- resolutionCandidates(nameParts)) {
// `system.session.<name>` and `system.builtin.<name>` candidates were already resolved by
// [[lookupBuiltinOrTempFunction]] / [[lookupBuiltinOrTempTableFunction]] above (they
// route through `identifierFromSystemNameParts`, which only accepts those two
// namespaces); skip them here to avoid redundant catalog calls. Other `system.<x>`
// namespaces -- if any are ever added -- still go through persistent lookup.
val persistentCandidates = resolutionCandidates(nameParts).filterNot { c =>
c.length >= 2 &&
c.head.equalsIgnoreCase(CatalogManager.SYSTEM_CATALOG_NAME) && {
val ns = c(1)
ns.equalsIgnoreCase(CatalogManager.SESSION_NAMESPACE) ||
ns.equalsIgnoreCase(CatalogManager.BUILTIN_NAMESPACE)
}
}
for (candidate <- persistentCandidates) {
try {
candidate match {
case CatalogAndIdentifier(catalog, ident) =>
Expand All @@ -380,7 +394,12 @@ class FunctionResolution(
case _ =>
}
} catch {
case NonFatal(_) =>
// Only treat explicit "not found" / "forbidden" signals as a miss. Any other failure
// (e.g. permission denied, transient catalog error) propagates.
case _: NoSuchFunctionException
| _: NoSuchNamespaceException
| _: CatalogNotFoundException =>
case e: AnalysisException if e.getCondition == "FORBIDDEN_OPERATION" =>
}
}
return FunctionType.NotFound
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,25 +130,9 @@ class RelationResolution(
* When PATH is disabled, legacy resolution rules apply.
*/
private def relationResolutionEntries: Seq[Seq[String]] = {
val pinned = AnalysisContext.get.resolutionPathEntries
if (pinned.isDefined && conf.pathEnabled) {
pinned.get
} else {
val expandCatalog = catalogManager.currentCatalog.name
val expandNamespace = catalogManager.currentNamespace.toSeq
val (pathCatalog, pathNamespace) =
if (isResolvingView) {
val p = AnalysisContext.get.catalogAndNamespace
(p.head, p.tail.toSeq)
} else {
(expandCatalog, expandNamespace)
}
catalogManager.sqlResolutionPathEntries(
pathCatalog,
pathNamespace,
expandCatalog,
expandNamespace)
}
catalogManager.resolutionPathEntriesForAnalysis(
AnalysisContext.get.resolutionPathEntries,
AnalysisContext.get.catalogAndNamespace)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case c @ CreateVariable(identifiers, _, _) =>
// We resolve only UnresolvedIdentifiers, and pass on the other nodes
val resolved = identifiers.map {
case UnresolvedIdentifier(nameParts, _) =>
case u @ UnresolvedIdentifier(nameParts, _) =>
if (withinLocalVariableScope) {
if (c.replace) {
throw new AnalysisException(
Expand All @@ -67,26 +67,22 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
val resolvedIdentifier
= catalogManager.tempVariableManager.qualify(nameParts.last)

assertValidSessionVariableNameParts(nameParts, resolvedIdentifier)
assertValidSessionVariableNameParts(nameParts, resolvedIdentifier, u.origin)
resolvedIdentifier
}
case plan => plan
}
c.copy(names = resolved)

case d @ DropVariable(UnresolvedIdentifier(nameParts, _), _) =>
case d @ DropVariable(u @ UnresolvedIdentifier(nameParts, _), _) =>
if (withinLocalVariableScope) {
throw new AnalysisException(
"UNSUPPORTED_FEATURE.SQL_SCRIPTING_DROP_TEMPORARY_VARIABLE", Map.empty)
}
if (nameParts.length == 1 &&
!catalogManager.sessionScopeUnqualifiedAllowed(
catalogManager.currentCatalog.name(),
catalogManager.currentNamespace.toSeq)) {
throw QueryCompilationErrors.unresolvedVariableError(nameParts, Seq("SYSTEM", "SESSION"))
}
// DDL on session variables targets `system.session` directly; the SQL path only applies
// to DML (see [[VariableResolution.allowUnqualifiedSessionTempVariableLookup]]).
val resolved = catalogManager.tempVariableManager.qualify(nameParts.last)
assertValidSessionVariableNameParts(nameParts, resolved)
assertValidSessionVariableNameParts(nameParts, resolved, u.origin)
d.copy(name = resolved)

case CreateFunction(UnresolvedIdentifier(nameParts, _), _, _, _, _)
Expand Down Expand Up @@ -221,13 +217,15 @@ class ResolveCatalogs(val catalogManager: CatalogManager)

private def assertValidSessionVariableNameParts(
nameParts: Seq[String],
resolvedIdentifier: ResolvedIdentifier): Unit = {
resolvedIdentifier: ResolvedIdentifier,
origin: Origin): Unit = {
if (!validSessionVariableName(nameParts)) {
throw QueryCompilationErrors.unresolvedVariableError(
nameParts,
Seq(
Seq(Seq(
resolvedIdentifier.catalog.name(),
resolvedIdentifier.identifier.namespace().head)
resolvedIdentifier.identifier.namespace().head)),
origin
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class ResolveFetchCursor(val catalogManager: CatalogManager) extends Rule[Logica
nameParts = u.nameParts
) match {
case Some(variable) => variable.copy(canFold = false)
case _ => throw unresolvedVariableError(u.nameParts, Seq("SYSTEM", "SESSION"))
case _ => throw unresolvedVariableError(
u.nameParts, variableResolution.searchPathEntriesForError, u.origin)
}

case other => throw SparkException.internalError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ class ResolveSetVariable(val catalogManager: CatalogManager) extends Rule[Logica
nameParts = u.nameParts
) match {
case Some(variable) => variable.copy(canFold = false)
case _ => throw unresolvedVariableError(u.nameParts, Seq("SYSTEM", "SESSION"))
case _ =>
throw unresolvedVariableError(
u.nameParts,
variableResolution.searchPathEntriesForError,
u.origin)
}

case other => throw SparkException.internalError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,25 @@ class VariableResolution(
* (PATH enabled and explicitly set).
*/
private def allowUnqualifiedSessionTempVariableLookup(nameParts: Seq[String]): Boolean = {
if (nameParts.length != 1) return true
catalogManager.sessionScopeUnqualifiedAllowed(
catalogManager.currentCatalog.name(),
catalogManager.currentNamespace.toSeq)
nameParts.length != 1 || catalogManager.isSystemSessionOnPath
}

/**
* Search-path entries to report in `UNRESOLVED_VARIABLE` for DML lookups (`SET VAR`,
* `FETCH ... INTO`). The full SQL path is reported regardless of how the name was
* qualified, matching the convention used by `TABLE_OR_VIEW_NOT_FOUND` and
* `UNRESOLVED_ROUTINE`. Keeping the rendering qualification-independent also avoids
* re-shaping the error if Spark ever grows struct-field assignment, where 2-part forms
* become genuinely ambiguous.
*
* DDL paths (`DECLARE` / `DROP` name validation in
* [[org.apache.spark.sql.catalyst.analysis.ResolveCatalogs]]) do not consult the SQL path
* and report `[system.session]` directly at their throw site.
*/
def searchPathEntriesForError: Seq[Seq[String]] = {
catalogManager.resolutionPathEntriesForAnalysis(
AnalysisContext.get.resolutionPathEntries,
AnalysisContext.get.catalogAndNamespace)
}

/**
Expand Down
Loading