feat: accelerate Iceberg RewriteDataFiles reads via Comet native scan#4251
feat: accelerate Iceberg RewriteDataFiles reads via Comet native scan#4251jordepic wants to merge 2 commits intoapache:mainfrom
Conversation
352861a to
f1366a1
Compare
|
Spark 3.4 failed for a reflection access. IIRC we use an older version of Iceberg there, so signatures might have changed. We might need different reflection logic for older versions of Iceberg. I had to do that somewhere else in the reflection class, but can't recall what right now. |
|
Thanks @jordepic! I will take a proper pass through this today. |
|
Thanks for this, @jordepic! Nice find that SparkStagedScan is the entry point for RewriteDataFiles, and the listener-based plan capture in the test reads clean. A few questions. On the Spark 3.4 CI failure from before the force-push, do you still have the stack? I checked Iceberg 1.5.2 and the SparkScan / SparkPartitioningAwareScan hierarchy matches 3.5, so the three accessors in For test coverage, have you tried a rewrite over a table that already has MOR position deletes applied? The current suite exercises plain data files only, and the behavior I most want pinned down is that the flattened FileScanTasks carry their Smaller thing: the metadata-table guard at |
in Iceberg 1.5.2 (used by the spark-3.4 profile), option(String, String) on the rewrite action chain is declared on the package-private BaseSparkAction class. My invokeMethod calls getMethod(...).invoke(...) directly. Since JDK 11+, you can't invoke a method whose declaring class is non-public from outside its package — even if the method itself is public — without setAccessible(true). In Iceberg 1.8.1+ (spark-3.5/4.x) the same method is declared on a public class, so getMethod(...).invoke(...) worked transparently.
I wish I could say that I actually remembered to check this and maliciously omitted a comment to make your life harder, but the truth is that I was just negligent. Great catch! I'll add a comment, thanks Matt! In response to your comment, I now have 5 tests.
|
Review notes for #4251Thanks for the revision, @jordepic. The deletes test is a strong end-to-end check (asserting A few things I'd love your take on. Reflection fall-back in
|
Which issue does this PR close?
Closes #4250.
Rationale for this change
A large number of query resources are devoted across the industry to rewriting data files using spark procedures for iceberg tables. Using native code here where possible can significantly speed up this process!
What changes are included in this PR?
Detect spark scans (
SparkStagedScan) that are created during theRewriteDataFilesSparkActionand replace them with comet scans. Extract their associated tasks and pass in the lack of filter (see SparkStagedScan line 50 in the apache iceberg project).Note that some things are NOT included in this PR:
Datafusion-comet does not yet support writing iceberg data. Once it does, we can "comet-ify" this whole pipeline!
How are these changes tested?
We write two tests to inspect the spark plan associated with rewriting data files and ensure that the operators get replaced. Before this change is merged I can also try to run it locally and pick up some benchmarks for table compactions (on tables that are only data files, and those with delete files associated).
I have also added tests to ensure that the compaction works, not just ensuring operator replacement, but also runtime compaction correctness on both bin packing and sorting!
MOR delete materialisation is verified on spark-3.4/3.5/4.0; cancelled on 4.1/4.2 pending iceberg/iceberg#15238.