Skip to content

[SPARK-56918][CORE] Add ManagedConsumer SPI for shrinkable external storage memory#55953

Open
baibaichen wants to merge 1 commit into
apache:masterfrom
baibaichen:managed-consumer
Open

[SPARK-56918][CORE] Add ManagedConsumer SPI for shrinkable external storage memory#55953
baibaichen wants to merge 1 commit into
apache:masterfrom
baibaichen:managed-consumer

Conversation

@baibaichen
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Today Spark's storage memory pool is implicitly owned by BlockManager via MemoryStore. There is no mechanism for an external cache (e.g., Velox AsyncDataCache via Gluten, or other per-executor native columnar caches) to:

  1. reserve bytes from the storage pool,
  2. be asked to release bytes under pressure, or
  3. grow into transient free capacity when storage is idle.

The existing UnmanagedMemoryConsumer SPI (SPARK-53001) is informational-only — Spark subtracts the reported usage from effectiveMaxMemory but cannot ask the consumer to release, and the consumer cannot ask Spark for more.

This PR adds the ManagedConsumer SPI and wires it into UnifiedMemoryManager.

New SPI (core/src/main/scala/org/apache/spark/memory/ManagedConsumer.scala)

trait ManagedConsumer {
  def name: String                                 // registry key; unique within JVM
  def memoryMode: MemoryMode = MemoryMode.OFF_HEAP
  def getShrinkableMemoryBytes: Long
  def shrink(numBytes: Long): Long   // returns bytes actually released
}

name IS the registry key — there is no separate Id type. Names must be unique within the JVM (ON_HEAP and OFF_HEAP share one namespace); registering a different instance under an already-used name throws IllegalArgumentException. memoryMode defaults to OFF_HEAP, matching the canonical use case (native off-heap caches sharing spark.memory.offHeap.size with Spark's MemoryStore).

MemoryManager additions

  • registerManagedConsumer(c) / unregisterManagedConsumer(c)
  • shrinkExternal(requested, mode, exclude): Long — generic orchestrator.
    Framework-owned pool accounting: after shrink() returns released, the framework deducts exactly released bytes from the storage pool via pool.releaseMemory(). Consumers MUST NOT call releaseStorageMemory themselves on the freed amount.

UnifiedMemoryManager wiring

Acquire site Order
acquireStorageMemory(blockId, ...) (BlockManager path) borrow-from-execution → shrinkExternal → LRU eviction
acquireStorageMemory(self: ManagedConsumer, ...) (new overload) borrow-from-execution → shrinkExternal(exclude = self) → LRU eviction
maybeGrowExecutionPool borrow back from storage, capped by memoryReclaimableFromStorage before shrinkExternal — this protects the storage region from being indirectly enlarged beyond storageRegionSize

Self-exclusion by reference equality prevents a consumer from being asked to shrink in service of its own grow request.

New configs (default-off, opt-in)

Key Default Purpose
spark.memory.managedConsumer.enabled false Gate the entire SPI
spark.memory.managedConsumer.shrinkWarnThresholdMs 100 Log a WARN when a single shrink() call exceeds this threshold

JIRA: SPARK-56918

Why are the changes needed?

A fourth category of executor off-heap memory is emerging and has no home today: shrinkable external caches that are per-executor singletons, serving the whole executor.

Category Owner Spark accounting Releasable on demand?
Execution Spark task allocations Tracked, arbitrated Yes (spill)
Storage Spark RDD cache / broadcast (MemoryStore) Tracked, arbitrated Yes (evict)
Unmanaged RocksDB state store (SPARK-53001) Reported only (pull-mode poll) No — informational
Shrinkable external cache (this PR) Per-executor singleton (e.g., Velox AsyncDataCache) Tracked, arbitrated Yes (evict cold pages)

For these consumers both existing options are wrong:

  • Treating them as storage memory via MemoryStore doesn't work — MemoryStore is bound to BlockManager, SerializerManager, and BlockEvictionHandler, which is exactly the limitation SPARK-48694 called out.
  • Treating them as unmanaged memory via UnmanagedMemoryConsumer is informational only — Spark cannot ask them to release when storage pressure rises, and they cannot ask Spark for more when storage is idle.

The result today is static partitioning of off-heap between Spark storage and the external cache, defeating the purpose of unified memory management.

Does this PR introduce any user-facing change?

No. The SPI is opt-in via spark.memory.managedConsumer.enabled (default false). Spark ships only the SPI; no in-tree implementation. Existing BlockManager / MemoryStore / UnmanagedMemoryConsumer behavior is unchanged when the gate is off.

How was this patch tested?

New unit tests (all in core/src/test/scala/org/apache/spark/memory/):

  • ManagedConsumerSuite — 5 trait-level cases covering the SPI shape, name override for logs, independence from UnmanagedMemoryConsumer, the empty default of getShrinkableConsumers on non-Unified backends, and consumerLogName fallback for blank name.
  • UnifiedMemoryManagerSuite — 26 new integration cases covering:
    • borrow-from-execution → shrinkExternal → LRU eviction ordering
    • storage-region protection in maybeGrowExecutionPool
    • self-exclusion on the acquireStorageMemory(self) overload
    • framework-owned pool accounting (consumer over-/under-reporting)
    • idempotent registration; rejection of different-instance / null / empty name; stale name-collider unregister is a safe no-op
    • multi-consumer round-robin fairness
    • shrinkWarnThresholdMs log emission
    • require() guards on the acquireStorageMemory(self) overload — null self, mismatched memoryMode, negative numBytes
    • cross-SPI WARN when the same object is registered as both ManagedConsumer and UnmanagedMemoryConsumer

Run:

build/sbt 'core/testOnly *ManagedConsumerSuite *UnifiedMemoryManagerSuite *MemoryManagerSuite'

All 67 tests pass (55 Scala + 12 Java).

Was this patch authored or co-authored using generative AI tooling?

Yes, Generated-by: GitHub Copilot.

@baibaichen
Copy link
Copy Markdown
Contributor Author

@JoshRosen @dongjoon-hyun would you please help me to review this PR?

…torage memory

### What changes were proposed in this pull request?

Add a push-mode SPI so external caches (e.g. Velox AsyncDataCache via
Gluten) can share Spark's storage memory pool: reserve bytes, be asked
to release on pressure, and grow into transient free capacity. The
existing `UnmanagedMemoryConsumer` (SPARK-53001) is informational-only
and does not fit caches that shrink on demand.

- `trait ManagedConsumer { name, memoryMode (default OFF_HEAP),
  getShrinkableMemoryBytes, shrink(numBytes): Long }`. `name` is the
  registry key, JVM-unique across ON_HEAP and OFF_HEAP.
- `MemoryManager.{register,unregister}ManagedConsumer`; re-registering
  a different instance under an existing name throws.
- `MemoryManager.shrinkExternal(requested, mode, exclude)` orchestrates
  shrinks and owns pool accounting: the framework deducts the returned
  bytes from the storage pool, so consumers MUST NOT call
  `releaseStorageMemory` themselves.
- `UnifiedMemoryManager.acquireStorageMemory` (existing `BlockId`
  overload and new `(self: ManagedConsumer, ...)` overload) invokes
  `shrinkExternal` AFTER borrow-from-execution and BEFORE LRU eviction;
  self-exclusion by reference equality prevents asking a consumer to
  shrink for its own grow.
- `maybeGrowExecutionPool` also invokes `shrinkExternal`, capping
  `targetReclaim` by `memoryReclaimableFromStorage` BEFORE shrink so
  shrunk bytes cannot enlarge the protected storage region.

New configs (opt-in, default-off):
- `spark.memory.managedConsumer.enabled` (default false)
- `spark.memory.managedConsumer.shrinkWarnThresholdMs` (default 100)

### Why are the changes needed?

See SPARK-56918. Without this SPI, operators must statically partition
off-heap memory between Spark and external caches, defeating unified
memory management.

### Does this PR introduce _any_ user-facing change?

No. SPI only, opt-in, no in-tree implementation.

### How was this patch tested?

New unit tests under `core/src/test/scala/org/apache/spark/memory/`:

- `ManagedConsumerSuite`: 5 trait-level cases covering SPI shape,
  `name` override for logs, independence from `UnmanagedMemoryConsumer`,
  empty default of `getShrinkableConsumers` on non-Unified backends,
  and `consumerLogName` fallback for blank `name`.
- `UnifiedMemoryManagerSuite`: 26 new integration cases covering
  borrow-then-shrink ordering, storage-region protection, self-
  exclusion, framework-owned pool accounting, idempotent registration,
  name-collision and null/empty rejection, stale-collider safe
  unregister, multi-consumer fairness, `shrinkWarnThresholdMs` log
  emission, `require()` guards on the `acquireStorageMemory(self)`
  overload, and cross-SPI WARN when an object is registered as both
  `ManagedConsumer` and `UnmanagedMemoryConsumer`.

Run: `build/sbt 'core/testOnly *ManagedConsumerSuite \
*UnifiedMemoryManagerSuite *MemoryManagerSuite'` — 67 pass
(55 Scala + 12 Java).

### Was this patch authored or co-authored using generative AI tooling?

Yes. Generated-by: GitHub Copilot.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* and partial release are fine; negative return is a contract violation. Exceptions
* are caught and treated as 0-byte release.
*/
trait ManagedConsumer {
Copy link
Copy Markdown
Member

@zhztheplayer zhztheplayer May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the trait name clear? Existing MemoryConsumer is already managed by Spark.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point — "managed vs unmanaged" is a weak axis here (everything in Spark is managed in some sense). Better to name on the differentiating capability:

Trait Layer Core verb
MemoryConsumer (Java) task spill()
UnmanagedMemoryConsumer executor getMemBytesUsed
this PR executor shrink()

Proposal: rename to ShrinkableMemoryConsumer. Matches the SPI's defining verb (shrink()), and forms a clean three-way distinction from the two existing traits — no overlap with either. WDYT?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more piece of evidence: "shrinkable" is already a first-class concept in the API surface — getShrinkableMemoryBytes is the cheap snapshot the framework uses to skip consumers with nothing to give back. A consumer that always returns 0 is effectively un-shrinkable. So the trait name ShrinkableMemoryConsumer just makes explicit what the method names already imply.

*/
def acquireMemoryForManagedConsumer(numBytes: Long): Boolean = lock.synchronized {
acquireMemoryInternal(None, numBytes, math.max(0L, numBytes - memoryFree))
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the API have to be bridged with storage memory pool?

IIUC Gluten demands a global memory area in the UMM that is not accounted to particular tasks. Would it be simpler to start from the unmanaged memory API added in #51778?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the look! Two points:

1. UnmanagedMemoryConsumer is not real-time — wrong shape for caches.

UMC is purely pull-mode: getMemBytesUsed: Long is polled every spark.memory.unmanagedMemoryPollingInterval (default 0s = disabled) and subtracted from effectiveMaxMemory. Readings lag real usage, and there is no callback to ask the consumer to release. External caches need to evict synchronously under pressure and grow synchronously when storage is idle — interval-bounded staleness can't deliver that.

2. The storage pool is the master ledger — must bridge into it.

MemoryStore, execution borrow-from-storage, and maybeGrowExecutionPool all arbitrate against storage pool accounting. To share spark.memory.offHeap.size with MemoryStore and participate in the same borrow/spill/evict ordering, an external cache has to be visible in the pool, not "outside it" via UMC subtraction.

On "global, not per-task" — agreed, that's what this SPI delivers: ManagedConsumer is registered on UnifiedMemoryManager (executor singleton), no per-task accounting.

WDYT?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants