From 5f450677ab84e2ad7351ac2886844c81a02fc619 Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Tue, 19 May 2026 09:48:44 +0200 Subject: [PATCH 1/3] [SPARK-56939][SQL] Resolve deadlock between USE and function lookup ### What changes were proposed in this pull request? Break the SessionCatalog/CatalogManager lock-order inversion that can deadlock concurrent `USE SCHEMA` / `USE CATALOG` and unqualified function resolution on the same SparkSession. - `CatalogManager.setCurrentNamespace` / `setCurrentCatalog`: snapshot the dispatch decision under the manager's intrinsic lock, run the `v1SessionCatalog` callbacks OUTSIDE the lock, then publish the new state under the lock again. This stops the "manager lock then catalog lock" arm of the cycle. - Add `CatalogManager.sessionFunctionKindsForUnqualifiedResolution()` that snapshots `(currentCatalog, currentNamespace, sessionPath)` in a single critical section. The `v1SessionCatalog.getCurrentDatabase` read needed for the default-namespace fallback is taken BEFORE the manager lock, so the helper never re-introduces the deadlock cycle while still avoiding torn-state observations under racing path updates. - Route `SessionCatalog.sessionFunctionKindsInResolutionOrder` and `FunctionResolution.isSessionBeforeBuiltinInPath` through that single helper, so the lookup loop and the `session-before-builtin` predicate share one consistent snapshot. - Tighten the doc comments on the affected methods to document the locking contract and prevent future regressions. ### Why are the changes needed? After SPARK-56750 wired `CatalogManager` into `SessionCatalog` as the live source for path-driven session function kinds, two paths form a lock-order inversion: - Arm 1 (`SessionCatalog.synchronized` -> `CatalogManager.synchronized`): unqualified function resolution evaluating the live PATH reaches into `CatalogManager.sqlResolutionPathEntries` (which synchronizes on the manager) while holding the catalog's intrinsic lock at peer call sites. - Arm 2 (`CatalogManager.synchronized` -> `SessionCatalog.synchronized`): `setCurrentNamespace` / `setCurrentCatalog` hold the manager's lock and then call back into `v1SessionCatalog.setCurrentDatabase*`, which synchronizes on `SessionCatalog`. Two threads sharing a SparkSession -- one running any SQL with an unqualified function reference, the other running `USE SCHEMA` / `USE CATALOG` -- can wedge on each other's intrinsic locks. The hazard is independent of `spark.sql.functionResolution.sessionOrder`: Arm 1 still acquires the manager lock just to read what the order is, and Arm 2 has nothing to do with order at all. ### Does this PR introduce _any_ user-facing change? No. This is a concurrency fix; serial behavior is unchanged. The only observable difference under contention is that the session no longer deadlocks. ### How was this patch tested? - New regression test in `SetPathSuite`, `SPARK-56939: concurrent USE SCHEMA and unqualified function lookups do not deadlock`, that follows the JIRA's exact repro: one thread alternates `USE SCHEMA s1` / `USE SCHEMA s2`, another runs unqualified `count(*)` queries. Without the fix the threads hang on each other's intrinsic locks; with the fix the test completes within the 30s budget. - `build/sbt 'sql/testOnly *SetPathSuite'` -- 60/60 pass. - `build/sbt 'catalyst/testOnly *SessionCatalogSuite *CatalogManagerSuite *FunctionResolution*'` -- 119/119 pass. - `build/sbt 'sql/testOnly *SQLFunctionSuite *SQLViewSuite'` -- 70/70 pass. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Cursor / Claude Opus 4.7 --- .../analysis/FunctionResolution.scala | 8 +- .../sql/catalyst/catalog/SessionCatalog.scala | 10 ++- .../connector/catalog/CatalogManager.scala | 81 ++++++++++++++++--- .../org/apache/spark/sql/SetPathSuite.scala | 77 ++++++++++++++++++ 4 files changed, 161 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala index e3dbcc4b6ef7e..4f6aee03967cb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala @@ -83,12 +83,12 @@ class FunctionResolution( * `count` shadows the builtin) and the `SessionCatalog` security check that blocks creating * a temp function with a builtin's name. Reads the live PATH via `CatalogManager` and * applies the same kinds extraction that drives `SessionCatalog`'s fast-path provider, so - * the predicate stays in sync with the lookup loop's actual order. + * the predicate stays in sync with the lookup loop's actual order. Uses the consolidated + * snapshot helper (SPARK-56939) so the (catalog, namespace, path) triple is observed + * atomically. */ def isSessionBeforeBuiltinInPath: Boolean = { - val path = catalogManager.sqlResolutionPathEntries( - catalogManager.currentCatalog.name(), catalogManager.currentNamespace.toSeq) - CatalogManager.systemFunctionKindsFromPath(path).headOption + catalogManager.sessionFunctionKindsForUnqualifiedResolution().headOption .contains(org.apache.spark.sql.catalyst.catalog.SessionCatalog.Temp) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 71653eec139b3..7a04664359a68 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -154,13 +154,19 @@ class SessionCatalog( * Session function kinds in resolution order for unqualified lookups: test override if set, * else live PATH from [[catalogManagerForSessionFunctionKinds]], else * [[SQLConf.systemPathOrder]]. + * + * MUST NOT be called while holding [[SessionCatalog]]'s intrinsic lock (see SPARK-56939): + * the path-driven branch delegates to [[CatalogManager]], which has its own intrinsic lock + * and re-enters this catalog through `USE` paths, so nesting the two locks here would + * deadlock. */ private def sessionFunctionKindsInResolutionOrder: Seq[SessionFunctionKind] = sessionFunctionKindsTestOverride.getOrElse { catalogManagerForSessionFunctionKinds match { case Some(cm) => - CatalogManager.systemFunctionKindsFromPath( - cm.sqlResolutionPathEntries(cm.currentCatalog.name(), cm.currentNamespace.toSeq)) + // Use the consolidated helper so unqualified resolution observes a consistent + // (currentCatalog, currentNamespace, path) triple in a single critical section. + cm.sessionFunctionKindsForUnqualifiedResolution() case None => CatalogManager.systemFunctionKindsFromPath(conf.systemPathOrder) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index 9a39e4ebdd272..a6f8e9b2f1841 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -132,15 +132,25 @@ class CatalogManager( } } - def setCurrentNamespace(namespace: Array[String]): Unit = synchronized { - if (isSessionCatalog(currentCatalog) && namespace.length == 1) { + def setCurrentNamespace(namespace: Array[String]): Unit = { + // SPARK-56939: do NOT hold [[CatalogManager]]'s intrinsic lock across the callbacks below. + // [[v1SessionCatalog.setCurrentDatabaseWithNameCheck]] briefly synchronizes on + // [[SessionCatalog]], and concurrent unqualified function resolution acquires the + // [[SessionCatalog]] lock and then reaches into [[CatalogManager]] via + // [[sqlResolutionPathEntries]]; nesting the manager lock outside the catalog lock here + // would invert that order and deadlock. Snapshot the dispatch decision under the lock, + // run callbacks outside it, then publish the new namespace under the lock again. + val isSession = synchronized(isSessionCatalog(currentCatalog)) + if (isSession && namespace.length == 1) { v1SessionCatalog.setCurrentDatabaseWithNameCheck( namespace.head, _ => assertNamespaceExist(namespace)) } else { assertNamespaceExist(namespace) } - _currentNamespace = Some(namespace) + synchronized { + _currentNamespace = Some(namespace) + } } import CatalogManager.SessionPathEntry @@ -265,6 +275,42 @@ class CatalogManager( currentCatalog, currentNamespace, currentCatalog, currentNamespace) + /** + * Snapshot the live PATH-derived [[SessionCatalog.SessionFunctionKind]] order used by + * unqualified function/table-function resolution. + * + * The kinds list is computed by reading the current catalog, current namespace, and the + * effective session path together so that a concurrent USE / SET PATH cannot return a + * torn snapshot (catalog from one observation, namespace from another). The `v1` + * current-database read (used only when `currentCatalog` is the session catalog and no + * explicit namespace has been published) is taken OUTSIDE this manager's intrinsic lock + * to avoid the SPARK-56939 lock-order inversion with [[SessionCatalog.synchronized]]: + * the rest of the snapshot then runs under a single CM critical section. + * + * Callers (e.g. [[SessionCatalog.sessionFunctionKindsInResolutionOrder]], + * [[org.apache.spark.sql.catalyst.analysis.FunctionResolution.isSessionBeforeBuiltinInPath]]) + * MUST NOT hold [[SessionCatalog]]'s intrinsic lock when invoking this method. + */ + def sessionFunctionKindsForUnqualifiedResolution(): Seq[SessionCatalog.SessionFunctionKind] = { + // SPARK-56939: read v1's current database before taking the CM lock. We only need it + // if there is no explicit `_currentNamespace` AND the current catalog is the session + // catalog. The read might be unused; the alternative (reading it from inside + // synchronized) would re-introduce the deadlock cycle this helper is designed to avoid. + val v1CurrentDb = v1SessionCatalog.getCurrentDatabase + val pathEntries = synchronized { + val catName = currentCatalog.name() + val effectiveNs: Seq[String] = _currentNamespace.map(_.toSeq).getOrElse { + if (catName == SESSION_CATALOG_NAME) { + Seq(v1CurrentDb) + } else { + currentCatalog.defaultNamespace().toSeq + } + } + sqlResolutionPathEntries(catName, effectiveNs) + } + CatalogManager.systemFunctionKindsFromPath(pathEntries) + } + /** * True if `system.session` is on the SQL path. Only literal path entries can match: the * [[org.apache.spark.sql.connector.catalog.CatalogManager.CurrentSchemaEntry$]] marker expands to @@ -330,15 +376,32 @@ class CatalogManager( catalog(_currentCatalogName.getOrElse(conf.getConf(SQLConf.DEFAULT_CATALOG))) } - def setCurrentCatalog(catalogName: String): Unit = synchronized { - // `setCurrentCatalog` is noop if it doesn't switch to a different catalog. - if (currentCatalog.name() != catalogName) { - catalog(catalogName) - _currentCatalogName = Some(catalogName) - _currentNamespace = None + def setCurrentCatalog(catalogName: String): Unit = { + // SPARK-56939: see [[setCurrentNamespace]]. Avoid nesting [[CatalogManager]]'s lock + // across [[v1SessionCatalog.setCurrentDatabase]] (which synchronizes on + // [[SessionCatalog]]) to prevent a lock-order inversion with concurrent unqualified + // function resolution. + val needsSwitch = synchronized { + // `setCurrentCatalog` is noop if it doesn't switch to a different catalog. + if (currentCatalog.name() != catalogName) { + // Force-load the named catalog while holding the manager lock to keep the + // not-found error semantics; if loading fails, throw before mutating state. + catalog(catalogName) + true + } else { + false + } + } + if (needsSwitch) { // Reset the current database of v1 `SessionCatalog` when switching current catalog, so that // when we switch back to session catalog, the current namespace definitely is ["default"]. + // Run this BEFORE publishing the new catalog name so that if a reader observes the new + // catalog, the v1 state is already consistent with it. v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase) + synchronized { + _currentCatalogName = Some(catalogName) + _currentNamespace = None + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala index 245398a4694ec..057210582607e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala @@ -977,6 +977,83 @@ class SetPathSuite extends SharedSparkSession { } } + test("SPARK-56939: concurrent USE SCHEMA and unqualified function lookups do not deadlock") { + // Regression for SPARK-56939. Prior to the fix, [[CatalogManager.setCurrentNamespace]] + // held the manager's intrinsic lock while calling into + // [[SessionCatalog.setCurrentDatabaseWithNameCheck]] (which takes the catalog's + // intrinsic lock), while concurrent unqualified function resolution acquired the + // catalog's intrinsic lock and then reached back into the manager via + // [[CatalogManager.sqlResolutionPathEntries]]. That lock-order inversion deadlocked the + // session whenever a `USE SCHEMA` raced with any unqualified function reference. + // + // The hazard is independent of [[SQLConf.PATH_ENABLED]] and the + // resolution-order setting, so this test exercises the default configuration. + sql("CREATE SCHEMA IF NOT EXISTS spark_56939_s1") + sql("CREATE SCHEMA IF NOT EXISTS spark_56939_s2") + try { + val budget = 200 + val iterations = new java.util.concurrent.atomic.AtomicInteger(0) + val barrier = new java.util.concurrent.CyclicBarrier(2) + val errors = new java.util.concurrent.ConcurrentLinkedQueue[Throwable]() + + val useThread = new Thread(() => { + try { + barrier.await() + var i = 0 + while (i < budget && errors.isEmpty) { + sql(if ((i % 2) == 0) "USE SCHEMA spark_56939_s1" else "USE SCHEMA spark_56939_s2") + i += 1 + } + } catch { + case t: Throwable => errors.add(t) + } + }, "SPARK-56939-use-schema") + + val lookupThread = new Thread(() => { + try { + barrier.await() + var i = 0 + while (i < budget && errors.isEmpty) { + // Unqualified `count(*)` exercises the kinds-order provider that resolves + // against the live PATH via [[CatalogManager]] -- the side of the cycle + // that previously acquired the catalog lock first and then the manager lock. + val n = sql("SELECT count(*) FROM VALUES (1), (2), (3) AS t(a)") + .head().getLong(0) + assert(n == 3L, s"unexpected count: $n at iteration $i") + iterations.incrementAndGet() + i += 1 + } + } catch { + case t: Throwable => errors.add(t) + } + }, "SPARK-56939-lookup") + + useThread.start() + lookupThread.start() + + // Generous join: 30s is plenty for 200 cheap queries on either side and gives a + // clear failure signal if the implementation regresses into a deadlock. + val joinMillis = 30000L + useThread.join(joinMillis) + lookupThread.join(joinMillis) + + assert(!useThread.isAlive, + "USE SCHEMA thread did not finish; lock-order inversion between SessionCatalog and " + + "CatalogManager likely returned (SPARK-56939).") + assert(!lookupThread.isAlive, + "Lookup thread did not finish; lock-order inversion between SessionCatalog and " + + "CatalogManager likely returned (SPARK-56939).") + assert(errors.isEmpty, + s"Concurrent lookups raised unexpected errors: ${errors.toArray.mkString("; ")}") + assert(iterations.get() > 0, + "Lookup thread never completed a query; suspect contention or deadlock.") + } finally { + sql("USE SCHEMA default") + sql("DROP SCHEMA IF EXISTS spark_56939_s1 CASCADE") + sql("DROP SCHEMA IF EXISTS spark_56939_s2 CASCADE") + } + } + test("PATH enabled: concurrent SET PATH and unqualified lookups do not deadlock") { // SessionCatalog.lookupBuiltinOrTempFunction is intentionally NOT // synchronized on SessionCatalog because the path-driven kinds provider acquires From ecf913ab00d902d0ed7661f70de4927defc2ebe8 Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Tue, 19 May 2026 18:10:12 +0200 Subject: [PATCH 2/3] [SPARK-56939][SQL][FOLLOWUP] Address review feedback Per @cloud-fan's review on PR #55977: - Clarify the locking contract on `CatalogManager.sessionFunctionKindsForUnqualifiedResolution`: the `v1CurrentDb` read is intentionally racy w.r.t. a concurrent `USE SCHEMA`, and `systemFunctionKindsFromPath` only retains `system.builtin` / `system.session` entries -- so a stale `v1CurrentDb` cannot affect the returned kinds list. Move that justification into the method doc. - Expand the `setCurrentCatalog` and `setCurrentNamespace` comments to acknowledge the new concurrent torn-state windows the split-lock pattern admits (briefly invisible old namespace during a catalog switch; drifted `isSession` snapshot if a peer switches catalogs mid-call). Frame these explicitly as trade-offs against the deadlock alternative rather than unconditional improvements. - Apply the same split-lock pattern to `CatalogManager.reset()` so the locking contract is uniform across every CM mutator that calls back into `v1SessionCatalog`. `reset` is `private[sql]` and test-only today, but keeping the contract symmetric prevents future test helpers from reintroducing the SPARK-56939 cycle. - Document `currentPathString` as the only remaining intentional `CatalogManager -> SessionCatalog` nest in `CatalogManager`. It is safe today because the SPARK-56939 fix removed every reverse ordering; future changes that introduce a new SC->CM ordering must take this nest into account. - Refresh the doc comments on `SessionCatalog.lookupBuiltinOrTempTableFunction` and `lookupFunctionInfo` so they describe the post-SPARK-56939 invariant (catalog lock must never be held while reaching into `CatalogManager` from a function-resolution path) instead of citing the now-removed `setCurrentNamespace` counter-party. - Extend the regression test with a third thread that toggles between the session catalog and a registered v2 catalog, exercising the `setCurrentCatalog` arm symmetrically with the `setCurrentNamespace` arm. The `useSchemaThread` tolerates `NoSuchNamespaceException`: when the catalog thread happens to switch to the v2 testcat first, the v1 schemas don't exist there; that is an expected interleaving outcome, not a deadlock symptom. - Minor wording: "likely returned" -> "likely regressed" in the deadlock assertion messages. ### How was this patch tested? - `build/sbt 'sql/testOnly *SetPathSuite'` -- 60/60 pass (the extended regression test completes in ~5s). - `build/sbt 'catalyst/testOnly *SessionCatalogSuite *CatalogManagerSuite *FunctionResolution*'` -- 119/119 pass. - `build/sbt 'sql/testOnly *SQLFunctionSuite *SQLViewSuite'` -- 70/70 pass. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Cursor / Claude Opus 4.7 --- .../sql/catalyst/catalog/SessionCatalog.scala | 17 +++-- .../connector/catalog/CatalogManager.scala | 74 +++++++++++++----- .../org/apache/spark/sql/SetPathSuite.scala | 76 ++++++++++++++----- 3 files changed, 126 insertions(+), 41 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 7a04664359a68..9e5a2176612cd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -2571,11 +2571,13 @@ class SessionCatalog( * Resolution order follows the configured path (e.g. builtin then session). */ def lookupBuiltinOrTempTableFunction(name: String): Option[ExpressionInfo] = { - // Intentionally not `synchronized` on this [[SessionCatalog]]. Resolution order may call - // into [[CatalogManager]] (e.g. [[CatalogManager.sqlResolutionPathEntries]]), which can - // synchronize on the manager; another - // thread can hold that lock and call into this catalog (e.g. via `setCurrentNamespace`), - // which would deadlock if this method also synchronized on `this`. + // Intentionally not `synchronized` on this [[SessionCatalog]]: resolution order may call + // into [[CatalogManager]] (e.g. [[CatalogManager.sqlResolutionPathEntries]] via + // [[sessionFunctionKindsInResolutionOrder]]), which synchronizes on the manager. The + // SPARK-56939 fix removed the reverse `CatalogManager -> SessionCatalog` nest from the + // `USE`-style mutators that previously closed the deadlock cycle; keeping this method + // un-synchronized preserves the `SessionCatalog -> CatalogManager` direction as the + // single allowed ordering, so the invariant survives future regressions. lookupFunctionWithShadowing(name, tableFunctionRegistry, checkBuiltinOperators = false) } @@ -2730,8 +2732,9 @@ class SessionCatalog( def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = { // Intentionally not `synchronized` on this [[SessionCatalog]] (see // [[lookupBuiltinOrTempTableFunction]]): unqualified builtin/temp resolution uses - // [[sessionFunctionKindsInResolutionOrder]] / [[CatalogManager]] and must not run under - // this catalog's intrinsic lock. + // [[sessionFunctionKindsInResolutionOrder]] / [[CatalogManager]], and SPARK-56939 + // requires this catalog's intrinsic lock to NEVER be held when reaching into + // [[CatalogManager]] from a function-resolution path. if (name.database.isEmpty) { lookupBuiltinOrTempFunction(name.funcName) .orElse(lookupBuiltinOrTempTableFunction(name.funcName)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index a6f8e9b2f1841..fc754be3d9c1b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -140,6 +140,14 @@ class CatalogManager( // [[sqlResolutionPathEntries]]; nesting the manager lock outside the catalog lock here // would invert that order and deadlock. Snapshot the dispatch decision under the lock, // run callbacks outside it, then publish the new namespace under the lock again. + // + // Concurrency trade-off versus the pre-SPARK-56939 atomic version: the `isSession` + // snapshot can drift if a concurrent [[setCurrentCatalog]] switches to a v2 catalog + // between this read and the v1 callback below -- the callback would still touch + // `v1.currentDb` even though the active catalog is no longer the session catalog. A + // later switch back to the session catalog resets `v1.currentDb` to `default` (see + // [[setCurrentCatalog]]), so long-term state remains consistent; only the intermediate + // observation is novel. Acceptable trade-off against the deadlock alternative. val isSession = synchronized(isSessionCatalog(currentCatalog)) if (isSession && namespace.length == 1) { v1SessionCatalog.setCurrentDatabaseWithNameCheck( @@ -231,6 +239,15 @@ class CatalogManager( * When PATH is enabled and a session path is in effect (stored or via * [[SQLConf#DEFAULT_PATH]]), formats the resolved entries. Otherwise falls back to the legacy * resolutionSearchPath. + * + * SPARK-56939 note: this is currently the only intentional `CatalogManager.synchronized -> + * SessionCatalog.synchronized` nest left in this class. The transitive call into + * [[v1SessionCatalog.getCurrentDatabase]] happens via [[currentNamespace]], which fetches + * the v1 current database under the CM lock. It is safe today because no code path holds + * [[SessionCatalog]]'s intrinsic lock while waiting on [[CatalogManager]]'s -- the + * SPARK-56939 fix removed every such SC->CM ordering. Any future change that introduces a + * new SC->CM ordering must avoid resurrecting the deadlock by also taking + * `currentPathString` (or any other CM->SC nest) into account. */ def currentPathString: String = synchronized { import CatalogV2Implicits._ @@ -279,23 +296,29 @@ class CatalogManager( * Snapshot the live PATH-derived [[SessionCatalog.SessionFunctionKind]] order used by * unqualified function/table-function resolution. * - * The kinds list is computed by reading the current catalog, current namespace, and the - * effective session path together so that a concurrent USE / SET PATH cannot return a - * torn snapshot (catalog from one observation, namespace from another). The `v1` - * current-database read (used only when `currentCatalog` is the session catalog and no - * explicit namespace has been published) is taken OUTSIDE this manager's intrinsic lock - * to avoid the SPARK-56939 lock-order inversion with [[SessionCatalog.synchronized]]: - * the rest of the snapshot then runs under a single CM critical section. + * The `(currentCatalog, _currentNamespace, sessionPath)` triple is read together inside a + * single CM critical section so a concurrent `USE` / `SET PATH` cannot return a torn + * snapshot for those three fields (e.g. catalog from one observation, explicit namespace + * from another). + * + * The `v1SessionCatalog.getCurrentDatabase` read needed for the default-namespace fallback + * is taken OUTSIDE the CM lock and is therefore intentionally racy w.r.t. a concurrent + * `USE SCHEMA`. That staleness is harmless for this helper's output: this method consumes + * `effectiveNs` only to expand `CURRENT_SCHEMA` markers in the SQL path, and + * [[CatalogManager.systemFunctionKindsFromPath]] only retains literal `system.builtin` / + * `system.session` entries from the resolved path -- it never inspects any + * `(catalog, namespace)` derived from `v1`. So if `v1CurrentDb` lags by one `USE SCHEMA`, + * a `CURRENT_SCHEMA` entry might briefly resolve to the previous database, but the kinds + * list (the only thing returned here) is unaffected. Lifting the read inside the CM lock + * would re-introduce the SPARK-56939 lock-order inversion this helper exists to avoid. * * Callers (e.g. [[SessionCatalog.sessionFunctionKindsInResolutionOrder]], * [[org.apache.spark.sql.catalyst.analysis.FunctionResolution.isSessionBeforeBuiltinInPath]]) * MUST NOT hold [[SessionCatalog]]'s intrinsic lock when invoking this method. */ def sessionFunctionKindsForUnqualifiedResolution(): Seq[SessionCatalog.SessionFunctionKind] = { - // SPARK-56939: read v1's current database before taking the CM lock. We only need it - // if there is no explicit `_currentNamespace` AND the current catalog is the session - // catalog. The read might be unused; the alternative (reading it from inside - // synchronized) would re-introduce the deadlock cycle this helper is designed to avoid. + // SPARK-56939: read v1's current database before taking the CM lock; see the method + // doc for why the resulting staleness is harmless for the kinds list. val v1CurrentDb = v1SessionCatalog.getCurrentDatabase val pathEntries = synchronized { val catName = currentCatalog.name() @@ -397,6 +420,15 @@ class CatalogManager( // when we switch back to session catalog, the current namespace definitely is ["default"]. // Run this BEFORE publishing the new catalog name so that if a reader observes the new // catalog, the v1 state is already consistent with it. + // + // Concurrency trade-off versus the pre-SPARK-56939 atomic version: between this v1 write + // and the publish below, a concurrent reader of `currentNamespace` sees + // `(oldCatalog, v1.currentDb = default)`. When the old catalog is the session catalog + // (the common case for `USE CATALOG`), the user's previous namespace is briefly invisible + // to that reader until the new name is published. The opposite torn observation + // (`newCatalog`, stale `v1.currentDb`) is avoided by this ordering. This trade-off + // (transient invisibility instead of transient inconsistency, exchanged for breaking the + // deadlock cycle) is accepted; the long-term post-switch state is the same as before. v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase) synchronized { _currentCatalogName = Some(catalogName) @@ -411,12 +443,20 @@ class CatalogManager( } // Clear all the registered catalogs. Only used in tests. - private[sql] def reset(): Unit = synchronized { - catalogs.clear() - _currentNamespace = None - _currentCatalogName = None - _sessionPath = None - confDefaultPathCache.set(None) + // + // SPARK-56939: apply the same split-lock pattern as [[setCurrentNamespace]] / + // [[setCurrentCatalog]] so the locking contract is uniform across every CM mutator that + // calls back into [[v1SessionCatalog]]. Test-only callers don't race against unqualified + // function resolution today, but keeping the contract symmetric prevents future test + // helpers (e.g. session reset in a concurrent harness) from reintroducing the cycle. + private[sql] def reset(): Unit = { + synchronized { + catalogs.clear() + _currentNamespace = None + _currentCatalogName = None + _sessionPath = None + confDefaultPathCache.set(None) + } v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala index 057210582607e..238b52ab7cd93 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql import org.apache.spark.SparkIllegalArgumentException +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException +import org.apache.spark.sql.connector.catalog.InMemoryCatalog import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, LongType} @@ -977,31 +979,46 @@ class SetPathSuite extends SharedSparkSession { } } - test("SPARK-56939: concurrent USE SCHEMA and unqualified function lookups do not deadlock") { + test("SPARK-56939: concurrent USE SCHEMA / USE CATALOG and unqualified function lookups " + + "do not deadlock") { // Regression for SPARK-56939. Prior to the fix, [[CatalogManager.setCurrentNamespace]] - // held the manager's intrinsic lock while calling into - // [[SessionCatalog.setCurrentDatabaseWithNameCheck]] (which takes the catalog's - // intrinsic lock), while concurrent unqualified function resolution acquired the - // catalog's intrinsic lock and then reached back into the manager via + // (driven by `USE SCHEMA`) and [[CatalogManager.setCurrentCatalog]] (driven by + // `USE CATALOG`) both held the manager's intrinsic lock while calling into + // [[SessionCatalog.setCurrentDatabase*]] (which takes the catalog's intrinsic lock), + // while concurrent unqualified function resolution acquired the catalog's intrinsic lock + // and then reached back into the manager via // [[CatalogManager.sqlResolutionPathEntries]]. That lock-order inversion deadlocked the - // session whenever a `USE SCHEMA` raced with any unqualified function reference. + // session whenever a `USE`-style command raced with any unqualified function reference. // - // The hazard is independent of [[SQLConf.PATH_ENABLED]] and the - // resolution-order setting, so this test exercises the default configuration. + // The hazard is independent of [[SQLConf.PATH_ENABLED]] and the resolution-order setting, + // so this test exercises the default configuration. Both `setCurrentNamespace` and + // `setCurrentCatalog` were rewritten with the same split-lock pattern, so the test + // exercises both arms symmetrically: one thread toggles `USE SCHEMA`, another toggles + // `USE CATALOG` between the session catalog and a registered v2 catalog. + val v2Catalog = "spark_56939_testcat" + spark.conf.set(s"spark.sql.catalog.$v2Catalog", classOf[InMemoryCatalog].getName) sql("CREATE SCHEMA IF NOT EXISTS spark_56939_s1") sql("CREATE SCHEMA IF NOT EXISTS spark_56939_s2") try { val budget = 200 val iterations = new java.util.concurrent.atomic.AtomicInteger(0) - val barrier = new java.util.concurrent.CyclicBarrier(2) + val barrier = new java.util.concurrent.CyclicBarrier(3) val errors = new java.util.concurrent.ConcurrentLinkedQueue[Throwable]() - val useThread = new Thread(() => { + val useSchemaThread = new Thread(() => { try { barrier.await() var i = 0 while (i < budget && errors.isEmpty) { - sql(if ((i % 2) == 0) "USE SCHEMA spark_56939_s1" else "USE SCHEMA spark_56939_s2") + try { + sql(if ((i % 2) == 0) "USE SCHEMA spark_56939_s1" else "USE SCHEMA spark_56939_s2") + } catch { + // A concurrent `USE` from `useCatalogThread` may switch the current catalog + // to the v2 testcat, where these schemas don't exist; the resulting + // SCHEMA_NOT_FOUND is an expected interleaving and is unrelated to the + // deadlock this test guards against. + case _: NoSuchNamespaceException => () + } i += 1 } } catch { @@ -1009,6 +1026,24 @@ class SetPathSuite extends SharedSparkSession { } }, "SPARK-56939-use-schema") + val useCatalogThread = new Thread(() => { + try { + barrier.await() + var i = 0 + while (i < budget && errors.isEmpty) { + // Toggle between the session catalog and a v2 catalog so each iteration + // exercises `setCurrentCatalog` -- the arm that previously held the manager + // lock across `v1SessionCatalog.setCurrentDatabase(default)`. The grammar + // accepts `USE identifierReference`; a single identifier resolves to a + // catalog when one is registered under that name. + sql(if ((i % 2) == 0) s"USE $v2Catalog" else "USE spark_catalog") + i += 1 + } + } catch { + case t: Throwable => errors.add(t) + } + }, "SPARK-56939-use-catalog") + val lookupThread = new Thread(() => { try { barrier.await() @@ -1028,29 +1063,36 @@ class SetPathSuite extends SharedSparkSession { } }, "SPARK-56939-lookup") - useThread.start() + useSchemaThread.start() + useCatalogThread.start() lookupThread.start() - // Generous join: 30s is plenty for 200 cheap queries on either side and gives a + // Generous join: 30s is plenty for 200 cheap queries per thread and gives a // clear failure signal if the implementation regresses into a deadlock. val joinMillis = 30000L - useThread.join(joinMillis) + useSchemaThread.join(joinMillis) + useCatalogThread.join(joinMillis) lookupThread.join(joinMillis) - assert(!useThread.isAlive, + assert(!useSchemaThread.isAlive, "USE SCHEMA thread did not finish; lock-order inversion between SessionCatalog and " + - "CatalogManager likely returned (SPARK-56939).") + "CatalogManager likely regressed (SPARK-56939).") + assert(!useCatalogThread.isAlive, + "USE CATALOG thread did not finish; lock-order inversion between SessionCatalog and " + + "CatalogManager likely regressed (SPARK-56939).") assert(!lookupThread.isAlive, "Lookup thread did not finish; lock-order inversion between SessionCatalog and " + - "CatalogManager likely returned (SPARK-56939).") + "CatalogManager likely regressed (SPARK-56939).") assert(errors.isEmpty, s"Concurrent lookups raised unexpected errors: ${errors.toArray.mkString("; ")}") assert(iterations.get() > 0, "Lookup thread never completed a query; suspect contention or deadlock.") } finally { + sql("USE spark_catalog") sql("USE SCHEMA default") sql("DROP SCHEMA IF EXISTS spark_56939_s1 CASCADE") sql("DROP SCHEMA IF EXISTS spark_56939_s2 CASCADE") + spark.conf.unset(s"spark.sql.catalog.$v2Catalog") } } From f504667f46a21b02719d6e2194560a8a6e5aac46 Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Wed, 20 May 2026 11:39:05 +0200 Subject: [PATCH 3/3] [SPARK-56939][SQL][FOLLOWUP] Address review nits in CatalogManager comments Comment-only follow-up to PR #55977 addressing three non-blocking nits from @cloud-fan's re-review: - `setCurrentNamespace`: the prior comment documented only the v1-side `isSession`-snapshot drift. Restructured into two labeled cases so the second drift mode (sticky CM-side publish-overwrite race: a concurrent `setCurrentCatalog` completes between the v1 callback returning and this method's publish, leaving `_currentNamespace` published under a different `_currentCatalogName` than the one observed at snapshot time) is also acknowledged. Framed as last-writer-wins for racing `USE` commands. - `currentPathString` doc: inverted the clauses so "must take ... into account to avoid resurrecting the deadlock" parses unambiguously (previously "must avoid resurrecting the deadlock by also taking ... into account" was ambiguous w.r.t. what "by also taking" attaches to). - `sessionFunctionKindsForUnqualifiedResolution` doc: "Lifting the read inside the CM lock" -> "Moving the read inside the CM lock" -- "lifting" implies moving up/out, but the intent here is moving the read into the locked region. No behavioral change; comment-only. `build/sbt catalyst/compile` is clean. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Cursor / Claude Opus 4.7 --- .../connector/catalog/CatalogManager.scala | 34 +++++++++++++------ 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index fc754be3d9c1b..3aad52dbd1d01 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -141,13 +141,27 @@ class CatalogManager( // would invert that order and deadlock. Snapshot the dispatch decision under the lock, // run callbacks outside it, then publish the new namespace under the lock again. // - // Concurrency trade-off versus the pre-SPARK-56939 atomic version: the `isSession` - // snapshot can drift if a concurrent [[setCurrentCatalog]] switches to a v2 catalog - // between this read and the v1 callback below -- the callback would still touch - // `v1.currentDb` even though the active catalog is no longer the session catalog. A - // later switch back to the session catalog resets `v1.currentDb` to `default` (see - // [[setCurrentCatalog]]), so long-term state remains consistent; only the intermediate - // observation is novel. Acceptable trade-off against the deadlock alternative. + // Concurrency trade-offs versus the pre-SPARK-56939 atomic version (v1-side and + // CM-side drift modes): + // + // (a) v1-side drift. The `isSession` snapshot can drift if a concurrent + // [[setCurrentCatalog]] switches to a v2 catalog between this read and the v1 + // callback below -- the callback would still touch `v1.currentDb` even though + // the active catalog is no longer the session catalog. A later switch back to + // the session catalog resets `v1.currentDb` to `default` (see + // [[setCurrentCatalog]]), so long-term state remains consistent; only the + // intermediate observation is novel. + // + // (b) CM-side publish-overwrite drift (sticky). Between the v1 callback returning + // and the publish below, a concurrent [[setCurrentCatalog]] can complete fully + // -- switching `_currentCatalogName` to (say) a v2 catalog and clearing + // `_currentNamespace = None` -- before this method's publish overwrites that + // with `Some(namespace)`. End state: `_currentNamespace = Some(namespace)` is + // published under a different `_currentCatalogName` than the one observed when + // [[isSession]] was snapshotted at the top. Unlike (a) there is no analogous + // auto-recovery; the mismatch sticks until the next `USE`. This is still + // last-writer-wins for two racing `USE` commands, which is the conventional + // expectation, so it is accepted as a trade-off against the deadlock alternative. val isSession = synchronized(isSessionCatalog(currentCatalog)) if (isSession && namespace.length == 1) { v1SessionCatalog.setCurrentDatabaseWithNameCheck( @@ -246,8 +260,8 @@ class CatalogManager( * the v1 current database under the CM lock. It is safe today because no code path holds * [[SessionCatalog]]'s intrinsic lock while waiting on [[CatalogManager]]'s -- the * SPARK-56939 fix removed every such SC->CM ordering. Any future change that introduces a - * new SC->CM ordering must avoid resurrecting the deadlock by also taking - * `currentPathString` (or any other CM->SC nest) into account. + * new SC->CM ordering must take `currentPathString` (or any other CM->SC nest) into + * account to avoid resurrecting the deadlock. */ def currentPathString: String = synchronized { import CatalogV2Implicits._ @@ -309,7 +323,7 @@ class CatalogManager( * `system.session` entries from the resolved path -- it never inspects any * `(catalog, namespace)` derived from `v1`. So if `v1CurrentDb` lags by one `USE SCHEMA`, * a `CURRENT_SCHEMA` entry might briefly resolve to the previous database, but the kinds - * list (the only thing returned here) is unaffected. Lifting the read inside the CM lock + * list (the only thing returned here) is unaffected. Moving the read inside the CM lock * would re-introduce the SPARK-56939 lock-order inversion this helper exists to avoid. * * Callers (e.g. [[SessionCatalog.sessionFunctionKindsInResolutionOrder]],