Skip to content

feat: accelerate Iceberg RewriteDataFiles reads via Comet native scan#4251

Open
jordepic wants to merge 2 commits intoapache:mainfrom
jordepic:main
Open

feat: accelerate Iceberg RewriteDataFiles reads via Comet native scan#4251
jordepic wants to merge 2 commits intoapache:mainfrom
jordepic:main

Conversation

@jordepic
Copy link
Copy Markdown

@jordepic jordepic commented May 6, 2026

Which issue does this PR close?

Closes #4250.

Rationale for this change

A large number of query resources are devoted across the industry to rewriting data files using spark procedures for iceberg tables. Using native code here where possible can significantly speed up this process!

What changes are included in this PR?

Detect spark scans (SparkStagedScan) that are created during the RewriteDataFilesSparkAction and replace them with comet scans. Extract their associated tasks and pass in the lack of filter (see SparkStagedScan line 50 in the apache iceberg project).

Note that some things are NOT included in this PR:
Datafusion-comet does not yet support writing iceberg data. Once it does, we can "comet-ify" this whole pipeline!

How are these changes tested?

We write two tests to inspect the spark plan associated with rewriting data files and ensure that the operators get replaced. Before this change is merged I can also try to run it locally and pick up some benchmarks for table compactions (on tables that are only data files, and those with delete files associated).

I have also added tests to ensure that the compaction works, not just ensuring operator replacement, but also runtime compaction correctness on both bin packing and sorting!

MOR delete materialisation is verified on spark-3.4/3.5/4.0; cancelled on 4.1/4.2 pending iceberg/iceberg#15238.

@jordepic jordepic force-pushed the main branch 3 times, most recently from 352861a to f1366a1 Compare May 7, 2026 02:18
@mbutrovich
Copy link
Copy Markdown
Contributor

Spark 3.4 failed for a reflection access. IIRC we use an older version of Iceberg there, so signatures might have changed. We might need different reflection logic for older versions of Iceberg. I had to do that somewhere else in the reflection class, but can't recall what right now.

@mbutrovich
Copy link
Copy Markdown
Contributor

Thanks @jordepic! I will take a proper pass through this today.

@mbutrovich mbutrovich self-requested a review May 7, 2026 14:59
@mbutrovich
Copy link
Copy Markdown
Contributor

mbutrovich commented May 7, 2026

Thanks for this, @jordepic! Nice find that SparkStagedScan is the entry point for RewriteDataFiles, and the listener-based plan capture in the test reads clean.

A few questions.

On the Spark 3.4 CI failure from before the force-push, do you still have the stack? I checked Iceberg 1.5.2 and the SparkScan / SparkPartitioningAwareScan hierarchy matches 3.5, so the three accessors in IcebergReflection should resolve the same way. My guess is the failure is in the test's own fluent-builder reflection against the Actions API rather than the new reflection helpers, but hard to say without the trace.

For test coverage, have you tried a rewrite over a table that already has MOR position deletes applied? The current suite exercises plain data files only, and the behavior I most want pinned down is that the flattened FileScanTasks carry their deletes() through the native path. A binPack over a table with one row deleted would be the direct assertion.

Smaller thing: the metadata-table guard at CometScanRule.scala:98-116 already rejects .position_deletes via suffix match, which I think is what keeps the new SparkStagedScan case safely scoped to data-file rewrites and away from RewritePositionDeleteFilesSparkAction. A short comment near the StagedScan match pointing at that upstream check would have saved me a detour on review.

@jordepic
Copy link
Copy Markdown
Author

jordepic commented May 7, 2026

On the Spark 3.4 CI failure from before the force-push, do you still have the stack? I checked Iceberg 1.5.2 and the SparkScan / SparkPartitioningAwareScan hierarchy matches 3.5, so the three accessors in IcebergReflection should resolve the same way. My guess is the failure is in the test's own fluent-builder reflection against the Actions API rather than the new reflection helpers, but hard to say without the trace.

in Iceberg 1.5.2 (used by the spark-3.4 profile), option(String, String) on the rewrite action chain is declared on the package-private BaseSparkAction class. My invokeMethod calls getMethod(...).invoke(...) directly. Since JDK 11+, you can't invoke a method whose declaring class is non-public from outside its package — even if the method itself is public — without setAccessible(true). In Iceberg 1.8.1+ (spark-3.5/4.x) the same method is declared on a public class, so getMethod(...).invoke(...) worked transparently.

Smaller thing: the metadata-table guard at CometScanRule.scala:98-116 already rejects .position_deletes via suffix match, which I think is what keeps the new SparkStagedScan case safely scoped to data-file rewrites and away from RewritePositionDeleteFilesSparkAction. A short comment near the StagedScan match pointing at that upstream check would have saved me a detour on review.

I wish I could say that I actually remembered to check this and maliciously omitted a comment to make your life harder, but the truth is that I was just negligent. Great catch! I'll add a comment, thanks Matt!

In response to your comment, I now have 5 tests.

  1. Bin packing
  2. Sorting
  3. Z ordering
  4. Bin packing with positional and equality deletes
  5. Ensuring that we do not convert RewritePositionDeleteFilesSparkAction

@mbutrovich
Copy link
Copy Markdown
Contributor

Review notes for #4251

Thanks for the revision, @jordepic. The deletes test is a strong end-to-end check (asserting sumDataFileRecords == 1 after rewrite catches the case where positional or equality deletes silently fail to apply).

A few things I'd love your take on.

Reflection fall-back in tasksFromTaskGroups

In IcebergReflection.scala, the new tasksFromTaskGroups wraps the outer taskGroups() lookup in findMethodInHierarchy, which returns None cleanly. But the inner per-group group.getClass.getMethod("tasks").invoke(group) isn't guarded, so any NoSuchMethodException or InvocationTargetException there would bubble out of getTasks instead of returning None and letting CometScanRule fall back to Spark via the existing "Failed to extract Iceberg metadata via reflection" path. Every sibling helper in this file (getTable, tasksFromTasksAccessor, getFilterExpressions, getExpectedSchema) honors that contract. Could we wrap the loop body so a future Iceberg reshuffle of BaseScanTaskGroup falls back rather than crashes the planner?

Method caching in the loop

Same function: group.getClass.getMethod("tasks") runs once per group, but every group is the same concrete class. validateIcebergFileScanTasks already caches its Method handles outside its iteration loop. Worth following the same pattern here to avoid N redundant lookups on large rewrite groups?

Where do the new class-name constants live?

SPARK_STAGED_SCAN_CLASS is a private val in IcebergReflection, and ICEBERG_SCAN_CLASSES plus isIcebergScanClass live on CometScanRule. The existing IcebergReflection.ClassNames block is already the home for Iceberg FQCNs, and the SparkBatchQueryScan literal at CometScanRule.scala:307 (pre-existing) is the same string. Would consolidating all three under ClassNames (and exposing isIcebergScanClass from IcebergReflection) help avoid drift?

The long comment on ICEBERG_SCAN_CLASSES

The comment explains that metadata scans are gated because getMetadataLocation returns None, but it's attached to the Set in CometScanRule rather than to getMetadataLocation where the gating actually happens. As a future reader I think I'd look for it at the gate. Reasonable to move it (or shorten it to a one-liner pointing at the real site)?

Plan assertions

assertReadsAreComet and assertOperator substring-match physicalPlanDescription. Two small concerns: assertOperator(plans, "CometExchange") will silently match CometColumnarExchange, and the captured string is a toString rather than a real plan tree. Would attaching a QueryExecutionListener and doing plan.collect { case _: CometIcebergNativeScanExec => } give us exact-class assertions? Same for "AppendData", which is a logical-plan node name appearing in the executedPlan toString by convention.

The RewritePositionDeleteFiles negative test is great as-is: asserting "no Comet operator at all" is a much sharper pin than substring-matching individual operators.

Spark 4.1 coverage

The deletes test cancels (rather than fails) on the iceberg-spark-runtime-4.0 NoSuchMethodError. Reasonable given the upstream issue, but it does mean delete coverage is effectively spark-3.5 / 4.0 only. Worth a line in the PR description so reviewers don't assume the deletes path is exercised on every profile?

Smaller things

  • tasksFromTaskGroups returns Some(flat.asInstanceOf[java.util.List[_]]). ArrayList[_] is already a java.util.List[_], so the cast looks unneeded. Declaring flat as ArrayList[AnyRef] would also drop the asInstanceOf[java.util.Collection[Any]] rewrap on the inner addAll.
  • icebergAvailable and withTempIcebergDir are flagged in the diff as duplicated from CometIcebergNativeSuite. withTempIcebergDir is also close to SQLTestUtils.withTempDir that CometTestBase already inherits. Worth lifting both into a shared trait?
  • captureSqlPlans is generic SparkListener plan recording with no Iceberg specifics. There are similar CometListenerBusUtils.waitUntilEmpty patterns in CometIcebergNativeSuite. Promote to CometTestBase?
  • runRewriteAction and runRewritePositionDeletes are near-duplicates. One runActionChain(actionMethod, configure, options) could cover both.
  • runRewriteTest has five callbacks with three defaulting to no-ops. A small RewriteTestCase case class might read a bit cleaner.
  • Several Class.forName(...) calls in the suite could go through IcebergReflection.loadClass, which already handles the context classloader.
  • mutable.ArrayBuffer writes in captureSqlPlans are synchronized but the .toSeq read after removeSparkListener isn't. CopyOnWriteArrayList or wrapping the read would close the gap.
  • numFiles = 4 and the >= 4 assertion are unrelated magic numbers. A single named constant would link them.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support Iceberg "Rewrite Data Files Procedure"

2 participants