Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ class FunctionResolution(
* `count` shadows the builtin) and the `SessionCatalog` security check that blocks creating
* a temp function with a builtin's name. Reads the live PATH via `CatalogManager` and
* applies the same kinds extraction that drives `SessionCatalog`'s fast-path provider, so
* the predicate stays in sync with the lookup loop's actual order.
* the predicate stays in sync with the lookup loop's actual order. Uses the consolidated
* snapshot helper (SPARK-56939) so the (catalog, namespace, path) triple is observed
* atomically.
*/
def isSessionBeforeBuiltinInPath: Boolean = {
val path = catalogManager.sqlResolutionPathEntries(
catalogManager.currentCatalog.name(), catalogManager.currentNamespace.toSeq)
CatalogManager.systemFunctionKindsFromPath(path).headOption
catalogManager.sessionFunctionKindsForUnqualifiedResolution().headOption
.contains(org.apache.spark.sql.catalyst.catalog.SessionCatalog.Temp)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,19 @@ class SessionCatalog(
* Session function kinds in resolution order for unqualified lookups: test override if set,
* else live PATH from [[catalogManagerForSessionFunctionKinds]], else
* [[SQLConf.systemPathOrder]].
*
* MUST NOT be called while holding [[SessionCatalog]]'s intrinsic lock (see SPARK-56939):
* the path-driven branch delegates to [[CatalogManager]], which has its own intrinsic lock
* and re-enters this catalog through `USE` paths, so nesting the two locks here would
* deadlock.
*/
private def sessionFunctionKindsInResolutionOrder: Seq[SessionFunctionKind] =
sessionFunctionKindsTestOverride.getOrElse {
catalogManagerForSessionFunctionKinds match {
case Some(cm) =>
CatalogManager.systemFunctionKindsFromPath(
cm.sqlResolutionPathEntries(cm.currentCatalog.name(), cm.currentNamespace.toSeq))
// Use the consolidated helper so unqualified resolution observes a consistent
// (currentCatalog, currentNamespace, path) triple in a single critical section.
cm.sessionFunctionKindsForUnqualifiedResolution()
case None =>
CatalogManager.systemFunctionKindsFromPath(conf.systemPathOrder)
}
Expand Down Expand Up @@ -2565,11 +2571,13 @@ class SessionCatalog(
* Resolution order follows the configured path (e.g. builtin then session).
*/
def lookupBuiltinOrTempTableFunction(name: String): Option[ExpressionInfo] = {
// Intentionally not `synchronized` on this [[SessionCatalog]]. Resolution order may call
// into [[CatalogManager]] (e.g. [[CatalogManager.sqlResolutionPathEntries]]), which can
// synchronize on the manager; another
// thread can hold that lock and call into this catalog (e.g. via `setCurrentNamespace`),
// which would deadlock if this method also synchronized on `this`.
// Intentionally not `synchronized` on this [[SessionCatalog]]: resolution order may call
// into [[CatalogManager]] (e.g. [[CatalogManager.sqlResolutionPathEntries]] via
// [[sessionFunctionKindsInResolutionOrder]]), which synchronizes on the manager. The
// SPARK-56939 fix removed the reverse `CatalogManager -> SessionCatalog` nest from the
// `USE`-style mutators that previously closed the deadlock cycle; keeping this method
// un-synchronized preserves the `SessionCatalog -> CatalogManager` direction as the
// single allowed ordering, so the invariant survives future regressions.
lookupFunctionWithShadowing(name, tableFunctionRegistry, checkBuiltinOperators = false)
}

Expand Down Expand Up @@ -2724,8 +2732,9 @@ class SessionCatalog(
def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = {
// Intentionally not `synchronized` on this [[SessionCatalog]] (see
// [[lookupBuiltinOrTempTableFunction]]): unqualified builtin/temp resolution uses
// [[sessionFunctionKindsInResolutionOrder]] / [[CatalogManager]] and must not run under
// this catalog's intrinsic lock.
// [[sessionFunctionKindsInResolutionOrder]] / [[CatalogManager]], and SPARK-56939
// requires this catalog's intrinsic lock to NEVER be held when reaching into
// [[CatalogManager]] from a function-resolution path.
if (name.database.isEmpty) {
lookupBuiltinOrTempFunction(name.funcName)
.orElse(lookupBuiltinOrTempTableFunction(name.funcName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,47 @@ class CatalogManager(
}
}

def setCurrentNamespace(namespace: Array[String]): Unit = synchronized {
if (isSessionCatalog(currentCatalog) && namespace.length == 1) {
def setCurrentNamespace(namespace: Array[String]): Unit = {
// SPARK-56939: do NOT hold [[CatalogManager]]'s intrinsic lock across the callbacks below.
// [[v1SessionCatalog.setCurrentDatabaseWithNameCheck]] briefly synchronizes on
// [[SessionCatalog]], and concurrent unqualified function resolution acquires the
// [[SessionCatalog]] lock and then reaches into [[CatalogManager]] via
// [[sqlResolutionPathEntries]]; nesting the manager lock outside the catalog lock here
// would invert that order and deadlock. Snapshot the dispatch decision under the lock,
// run callbacks outside it, then publish the new namespace under the lock again.
//
// Concurrency trade-offs versus the pre-SPARK-56939 atomic version (v1-side and
// CM-side drift modes):
//
// (a) v1-side drift. The `isSession` snapshot can drift if a concurrent
// [[setCurrentCatalog]] switches to a v2 catalog between this read and the v1
// callback below -- the callback would still touch `v1.currentDb` even though
// the active catalog is no longer the session catalog. A later switch back to
// the session catalog resets `v1.currentDb` to `default` (see
// [[setCurrentCatalog]]), so long-term state remains consistent; only the
// intermediate observation is novel.
//
// (b) CM-side publish-overwrite drift (sticky). Between the v1 callback returning
// and the publish below, a concurrent [[setCurrentCatalog]] can complete fully
// -- switching `_currentCatalogName` to (say) a v2 catalog and clearing
// `_currentNamespace = None` -- before this method's publish overwrites that
// with `Some(namespace)`. End state: `_currentNamespace = Some(namespace)` is
// published under a different `_currentCatalogName` than the one observed when
// [[isSession]] was snapshotted at the top. Unlike (a) there is no analogous
// auto-recovery; the mismatch sticks until the next `USE`. This is still
// last-writer-wins for two racing `USE` commands, which is the conventional
// expectation, so it is accepted as a trade-off against the deadlock alternative.
val isSession = synchronized(isSessionCatalog(currentCatalog))
if (isSession && namespace.length == 1) {
v1SessionCatalog.setCurrentDatabaseWithNameCheck(
namespace.head,
_ => assertNamespaceExist(namespace))
} else {
assertNamespaceExist(namespace)
}
_currentNamespace = Some(namespace)
synchronized {
_currentNamespace = Some(namespace)
}
}

import CatalogManager.SessionPathEntry
Expand Down Expand Up @@ -221,6 +253,15 @@ class CatalogManager(
* When PATH is enabled and a session path is in effect (stored or via
* [[SQLConf#DEFAULT_PATH]]), formats the resolved entries. Otherwise falls back to the legacy
* resolutionSearchPath.
*
* SPARK-56939 note: this is currently the only intentional `CatalogManager.synchronized ->
* SessionCatalog.synchronized` nest left in this class. The transitive call into
* [[v1SessionCatalog.getCurrentDatabase]] happens via [[currentNamespace]], which fetches
* the v1 current database under the CM lock. It is safe today because no code path holds
* [[SessionCatalog]]'s intrinsic lock while waiting on [[CatalogManager]]'s -- the
* SPARK-56939 fix removed every such SC->CM ordering. Any future change that introduces a
* new SC->CM ordering must take `currentPathString` (or any other CM->SC nest) into
* account to avoid resurrecting the deadlock.
*/
def currentPathString: String = synchronized {
import CatalogV2Implicits._
Expand Down Expand Up @@ -265,6 +306,48 @@ class CatalogManager(
currentCatalog, currentNamespace,
currentCatalog, currentNamespace)

/**
* Snapshot the live PATH-derived [[SessionCatalog.SessionFunctionKind]] order used by
* unqualified function/table-function resolution.
*
* The `(currentCatalog, _currentNamespace, sessionPath)` triple is read together inside a
* single CM critical section so a concurrent `USE` / `SET PATH` cannot return a torn
* snapshot for those three fields (e.g. catalog from one observation, explicit namespace
* from another).
*
* The `v1SessionCatalog.getCurrentDatabase` read needed for the default-namespace fallback
* is taken OUTSIDE the CM lock and is therefore intentionally racy w.r.t. a concurrent
* `USE SCHEMA`. That staleness is harmless for this helper's output: this method consumes
* `effectiveNs` only to expand `CURRENT_SCHEMA` markers in the SQL path, and
* [[CatalogManager.systemFunctionKindsFromPath]] only retains literal `system.builtin` /
* `system.session` entries from the resolved path -- it never inspects any
* `(catalog, namespace)` derived from `v1`. So if `v1CurrentDb` lags by one `USE SCHEMA`,
* a `CURRENT_SCHEMA` entry might briefly resolve to the previous database, but the kinds
* list (the only thing returned here) is unaffected. Moving the read inside the CM lock
* would re-introduce the SPARK-56939 lock-order inversion this helper exists to avoid.
Comment thread
srielau marked this conversation as resolved.
*
* Callers (e.g. [[SessionCatalog.sessionFunctionKindsInResolutionOrder]],
* [[org.apache.spark.sql.catalyst.analysis.FunctionResolution.isSessionBeforeBuiltinInPath]])
* MUST NOT hold [[SessionCatalog]]'s intrinsic lock when invoking this method.
*/
def sessionFunctionKindsForUnqualifiedResolution(): Seq[SessionCatalog.SessionFunctionKind] = {
// SPARK-56939: read v1's current database before taking the CM lock; see the method
// doc for why the resulting staleness is harmless for the kinds list.
val v1CurrentDb = v1SessionCatalog.getCurrentDatabase
val pathEntries = synchronized {
val catName = currentCatalog.name()
val effectiveNs: Seq[String] = _currentNamespace.map(_.toSeq).getOrElse {
if (catName == SESSION_CATALOG_NAME) {
Seq(v1CurrentDb)
} else {
currentCatalog.defaultNamespace().toSeq
}
}
sqlResolutionPathEntries(catName, effectiveNs)
}
CatalogManager.systemFunctionKindsFromPath(pathEntries)
}

/**
* True if `system.session` is on the SQL path. Only literal path entries can match: the
* [[org.apache.spark.sql.connector.catalog.CatalogManager.CurrentSchemaEntry$]] marker expands to
Expand Down Expand Up @@ -330,15 +413,41 @@ class CatalogManager(
catalog(_currentCatalogName.getOrElse(conf.getConf(SQLConf.DEFAULT_CATALOG)))
}

def setCurrentCatalog(catalogName: String): Unit = synchronized {
// `setCurrentCatalog` is noop if it doesn't switch to a different catalog.
if (currentCatalog.name() != catalogName) {
catalog(catalogName)
_currentCatalogName = Some(catalogName)
_currentNamespace = None
def setCurrentCatalog(catalogName: String): Unit = {
// SPARK-56939: see [[setCurrentNamespace]]. Avoid nesting [[CatalogManager]]'s lock
// across [[v1SessionCatalog.setCurrentDatabase]] (which synchronizes on
// [[SessionCatalog]]) to prevent a lock-order inversion with concurrent unqualified
// function resolution.
val needsSwitch = synchronized {
// `setCurrentCatalog` is noop if it doesn't switch to a different catalog.
if (currentCatalog.name() != catalogName) {
// Force-load the named catalog while holding the manager lock to keep the
// not-found error semantics; if loading fails, throw before mutating state.
catalog(catalogName)
true
} else {
false
}
}
if (needsSwitch) {
// Reset the current database of v1 `SessionCatalog` when switching current catalog, so that
// when we switch back to session catalog, the current namespace definitely is ["default"].
// Run this BEFORE publishing the new catalog name so that if a reader observes the new
Comment thread
srielau marked this conversation as resolved.
// catalog, the v1 state is already consistent with it.
//
// Concurrency trade-off versus the pre-SPARK-56939 atomic version: between this v1 write
// and the publish below, a concurrent reader of `currentNamespace` sees
// `(oldCatalog, v1.currentDb = default)`. When the old catalog is the session catalog
// (the common case for `USE CATALOG`), the user's previous namespace is briefly invisible
// to that reader until the new name is published. The opposite torn observation
// (`newCatalog`, stale `v1.currentDb`) is avoided by this ordering. This trade-off
// (transient invisibility instead of transient inconsistency, exchanged for breaking the
// deadlock cycle) is accepted; the long-term post-switch state is the same as before.
v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase)
synchronized {
_currentCatalogName = Some(catalogName)
_currentNamespace = None
}
}
}

Expand All @@ -348,12 +457,20 @@ class CatalogManager(
}

// Clear all the registered catalogs. Only used in tests.
private[sql] def reset(): Unit = synchronized {
catalogs.clear()
_currentNamespace = None
_currentCatalogName = None
_sessionPath = None
confDefaultPathCache.set(None)
//
// SPARK-56939: apply the same split-lock pattern as [[setCurrentNamespace]] /
// [[setCurrentCatalog]] so the locking contract is uniform across every CM mutator that
// calls back into [[v1SessionCatalog]]. Test-only callers don't race against unqualified
// function resolution today, but keeping the contract symmetric prevents future test
// helpers (e.g. session reset in a concurrent harness) from reintroducing the cycle.
private[sql] def reset(): Unit = {
synchronized {
catalogs.clear()
_currentNamespace = None
_currentCatalogName = None
_sessionPath = None
confDefaultPathCache.set(None)
}
v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase)
}
}
Expand Down
Loading