Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ jobs:
org.apache.spark.sql.comet.ParquetEncryptionITCase
org.apache.comet.exec.CometNativeReaderSuite
org.apache.comet.CometIcebergNativeSuite
org.apache.comet.CometIcebergRewriteActionSuite
org.apache.comet.iceberg.IcebergReflectionSuite
- name: "csv"
value: |
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ jobs:
org.apache.spark.sql.comet.ParquetEncryptionITCase
org.apache.comet.exec.CometNativeReaderSuite
org.apache.comet.CometIcebergNativeSuite
org.apache.comet.CometIcebergRewriteActionSuite
org.apache.comet.iceberg.IcebergReflectionSuite
- name: "csv"
value: |
Expand Down
113 changes: 88 additions & 25 deletions spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,22 @@ object IcebergReflection extends Logging {
val PARTITION_SPEC = "org.apache.iceberg.PartitionSpec"
val PARTITION_FIELD = "org.apache.iceberg.PartitionField"
val UNBOUND_PREDICATE = "org.apache.iceberg.expressions.UnboundPredicate"
val SPARK_BATCH_QUERY_SCAN = "org.apache.iceberg.spark.source.SparkBatchQueryScan"
val SPARK_STAGED_SCAN = "org.apache.iceberg.spark.source.SparkStagedScan"
}

/**
* SparkScan implementations that Comet recognises as Iceberg data scans.
*
* `SparkStagedScan` also backs reads against Iceberg metadata tables (e.g. `POSITION_DELETES`),
* but the gate for that lives in `getMetadataLocation`, which returns None for metadata-table
* instances.
*/
val ICEBERG_SCAN_CLASSES: Set[String] =
Set(ClassNames.SPARK_BATCH_QUERY_SCAN, ClassNames.SPARK_STAGED_SCAN)

def isIcebergScanClass(name: String): Boolean = ICEBERG_SCAN_CLASSES.contains(name)

/**
* Iceberg content types.
*/
Expand Down Expand Up @@ -172,44 +186,87 @@ object IcebergReflection extends Logging {
}
}

private lazy val sparkStagedScanClass: Class[_] = loadClass(ClassNames.SPARK_STAGED_SCAN)

private def isStagedScan(scan: Any): Boolean = sparkStagedScanClass.isInstance(scan)

/**
* Gets the tasks from a SparkScan.
*
* The tasks() method is protected in SparkScan, requiring reflection to access.
* Most Iceberg scans (e.g. SparkBatchQueryScan) inherit a `tasks()` accessor from
* SparkPartitioningAwareScan. SparkStagedScan extends SparkScan directly and only declares
* `taskGroups()`, so for staged scans we flatten the groups instead. Both methods are protected
* and require reflection.
*/
def getTasks(scan: Any): Option[java.util.List[_]] = {
try {
val tasksMethod = scan.getClass.getSuperclass
.getDeclaredMethod("tasks")
tasksMethod.setAccessible(true)
Some(tasksMethod.invoke(scan).asInstanceOf[java.util.List[_]])
} catch {
case e: Exception =>
def getTasks(scan: Any): Option[java.util.List[_]] =
if (isStagedScan(scan)) tasksFromTaskGroups(scan) else tasksFromTasksAccessor(scan)

private def tasksFromTasksAccessor(scan: Any): Option[java.util.List[_]] =
findMethodInHierarchy(scan.getClass, "tasks") match {
case Some(method) =>
Some(method.invoke(scan).asInstanceOf[java.util.List[_]])
case None =>
logError(
s"Iceberg reflection failure: Failed to get tasks from SparkScan: ${e.getMessage}")
"Iceberg reflection failure: Failed to get tasks from SparkScan: " +
s"tasks() not found on ${scan.getClass.getName}")
None
}

private def tasksFromTaskGroups(scan: Any): Option[java.util.List[_]] =
findMethodInHierarchy(scan.getClass, "taskGroups") match {
case Some(method) =>
try {
val groups = method.invoke(scan).asInstanceOf[java.util.List[_]]
if (groups.isEmpty) {
Some(new java.util.ArrayList[AnyRef]())
} else {
// All task groups in a stage share the same concrete class, so the per-group
// `tasks()` lookup can be cached once instead of done N times.
val groupTasksMethod = groups.get(0).getClass.getMethod("tasks")
val flat = new java.util.ArrayList[AnyRef]()
groups.forEach { group =>
val groupTasks =
groupTasksMethod.invoke(group).asInstanceOf[java.util.Collection[_ <: AnyRef]]
flat.addAll(groupTasks)
}
Some(flat)
}
} catch {
case e: ReflectiveOperationException =>
logError(
"Iceberg reflection failure: Failed to flatten tasks from SparkStagedScan: " +
s"${e.getMessage}")
None
}
case None =>
logError(
"Iceberg reflection failure: Failed to flatten tasks from SparkStagedScan: " +
s"taskGroups() not found on ${scan.getClass.getName}")
None
}
}

/**
* Gets the filter expressions from a SparkScan.
*
* The filterExpressions() method is protected in SparkScan.
* `filterExpressions()` is declared on SparkPartitioningAwareScan but absent from plain
* SparkScan. SparkStagedScan (used by RewriteDataFiles) extends SparkScan directly and never
* pushes filters, so we short-circuit with an empty list rather than reflectively probing for a
* method we know isn't there.
*/
def getFilterExpressions(scan: Any): Option[java.util.List[_]] = {
try {
val filterExpressionsMethod = scan.getClass.getSuperclass.getSuperclass
.getDeclaredMethod("filterExpressions")
filterExpressionsMethod.setAccessible(true)
Some(filterExpressionsMethod.invoke(scan).asInstanceOf[java.util.List[_]])
} catch {
case e: Exception =>
logError(
"Iceberg reflection failure: Failed to get filter expressions from SparkScan: " +
s"${e.getMessage}")
None
def getFilterExpressions(scan: Any): Option[java.util.List[_]] =
if (isStagedScan(scan)) {
Some(java.util.Collections.emptyList[AnyRef]())
} else {
findMethodInHierarchy(scan.getClass, "filterExpressions") match {
case Some(method) =>
Some(method.invoke(scan).asInstanceOf[java.util.List[_]])
case None =>
logError(
"Iceberg reflection failure: Failed to get filter expressions from SparkScan: " +
s"filterExpressions() not found on ${scan.getClass.getName}")
None
}
}
}

/**
* Gets the Iceberg table format version.
Expand Down Expand Up @@ -350,6 +407,12 @@ object IcebergReflection extends Logging {
/**
* Gets the metadata file location from an Iceberg table.
*
* Returns None for Iceberg metadata-table instances (e.g. POSITION_DELETES, the table that
* `RewritePositionDeleteFiles` reads via `SparkStagedScan`). This is the gate that keeps Comet
* from accelerating metadata-table reads, which have a different schema from the parent data
* table and aren't supported by the iceberg-rust-driven native path. `CometScanRule` falls back
* to Spark when this returns None; `CometIcebergRewriteActionSuite` pins the behaviour.
*
* @param table
* The Iceberg table instance
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,10 @@ case class CometScanRule(session: SparkSession)
withInfos(scanExec, fallbackReasons.toSet)
}

// Iceberg scan - detected by class name
case _
if scanExec.scan.getClass.getName ==
"org.apache.iceberg.spark.source.SparkBatchQueryScan" =>
// Iceberg scan - detected by class name. SparkStagedScan covers reads issued by
// RewriteDataFiles (and similar maintenance actions) where the planner has already
// staged FileScanTasks via ScanTaskSetManager.
case _ if IcebergReflection.isIcebergScanClass(scanExec.scan.getClass.getName) =>
val fallbackReasons = new ListBuffer[String]()

// Native Iceberg scan requires both configs to be enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.comet

import java.io.File
import java.nio.file.Files

import scala.collection.mutable
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -49,17 +48,8 @@ import org.apache.comet.testing.{FuzzDataGenerator, SchemaGenOptions}
class CometIcebergNativeSuite
extends CometTestBase
with RESTCatalogHelper
with AdaptiveSparkPlanHelper {

// Skip these tests if Iceberg is not available in classpath
private def icebergAvailable: Boolean = {
try {
Class.forName("org.apache.iceberg.catalog.Catalog")
true
} catch {
case _: ClassNotFoundException => false
}
}
with AdaptiveSparkPlanHelper
with CometIcebergTestBase {

/** Collects all CometIcebergNativeScanExec nodes from a plan */
private def collectIcebergNativeScans(plan: SparkPlan): Seq[CometIcebergNativeScanExec] = {
Expand Down Expand Up @@ -2518,23 +2508,6 @@ class CometIcebergNativeSuite
}
}

// Helper to create temp directory
def withTempIcebergDir(f: File => Unit): Unit = {
val dir = Files.createTempDirectory("comet-iceberg-test").toFile
try {
f(dir)
} finally {
def deleteRecursively(file: File): Unit = {
if (file.isDirectory) {
file.listFiles().foreach(deleteRecursively)
}
file.delete()
}

deleteRecursively(dir)
}
}

test("runtime filtering - multiple DPP filters on two partition columns") {
assume(icebergAvailable, "Iceberg not available")
withTempIcebergDir { warehouseDir =>
Expand Down
Loading
Loading