Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 36 additions & 8 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ index a9f69ab28a1..760ea0e9565 100644
assert(exchanges.size == 2)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index 433b4741979..07148eee480 100644
index 433b4741979..c9e0fd49604 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -23,8 +23,9 @@ import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
Expand All @@ -371,7 +371,26 @@ index 433b4741979..07148eee480 100644
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction, Window}
import org.apache.spark.sql.functions._
@@ -1186,10 +1187,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest
@@ -969,7 +970,8 @@ class DataFrameWindowFunctionsSuite extends QueryTest
}
}

- test("Window spill with more than the inMemoryThreshold and spillThreshold") {
+ test("Window spill with more than the inMemoryThreshold and spillThreshold",
+ IgnoreComet("Comet does not support spilling")) {
val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4")).toDF("key", "value")
val window = Window.partitionBy($"key").orderBy($"value")

@@ -981,7 +983,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
}
}

- test("SPARK-21258: complex object in combination with spilling") {
+ test("SPARK-21258: complex object in combination with spilling", IgnoreComet("Comet does not support spilling")) {
// Make sure we trigger the spilling path.
withSQLConf(SQLConf.WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "1",
SQLConf.WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "17") {
@@ -1186,10 +1188,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest
}

def isShuffleExecByRequirement(
Expand All @@ -385,7 +404,7 @@ index 433b4741979..07148eee480 100644
case _ => false
}

@@ -1212,7 +1215,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
@@ -1212,13 +1216,15 @@ class DataFrameWindowFunctionsSuite extends QueryTest
val shuffleByRequirement = windowed.queryExecution.executedPlan.exists {
case w: WindowExec =>
w.child.exists {
Expand All @@ -394,6 +413,15 @@ index 433b4741979..07148eee480 100644
case _ => false
}
case _ => false
}

- assert(shuffleByRequirement, "Can't find desired shuffle node from the query plan")
+ // Ignore shuffleByRequirement assertion as it refers to original Spark node names,
+ // which not relevant to Comet
+ // assert(shuffleByRequirement, "Can't find desired shuffle node from the query plan")
}
}

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index daef11ae4d6..9f3cc9181f2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Expand Down Expand Up @@ -1969,7 +1997,7 @@ index 07e2849ce6f..3e73645b638 100644
ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString
)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 104b4e416cd..b8af360fa14 100644
index 104b4e416cd..4adb273170a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType
Expand Down Expand Up @@ -2882,7 +2910,7 @@ index abe606ad9c1..2d930b64cca 100644
val tblTargetName = "tbl_target"
val tblSourceQualified = s"default.$tblSourceName"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index dd55fcfe42c..99bc018008a 100644
index dd55fcfe42c..cd18a23d4de 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -27,6 +27,7 @@ import scala.concurrent.duration._
Expand Down Expand Up @@ -2948,7 +2976,7 @@ index dd55fcfe42c..99bc018008a 100644
protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
SparkSession.setActiveSession(spark)
super.withSQLConf(pairs: _*)(f)
@@ -434,6 +487,8 @@ private[sql] trait SQLTestUtilsBase
@@ -434,6 +469,8 @@ private[sql] trait SQLTestUtilsBase
val schema = df.schema
val withoutFilters = df.queryExecution.executedPlan.transform {
case FilterExec(_, child) => child
Expand All @@ -2958,7 +2986,7 @@ index dd55fcfe42c..99bc018008a 100644

spark.internalCreateDataFrame(withoutFilters.execute(), schema)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index ed2e309fa07..a5ea58146ad 100644
index ed2e309fa07..25b798d2c1c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -74,6 +74,20 @@ trait SharedSparkSessionBase
Expand Down Expand Up @@ -3071,7 +3099,7 @@ index a902cb3a69e..800a3acbe99 100644

test("SPARK-4963 DataFrame sample on mutable row return wrong result") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 07361cfdce9..97dab2a3506 100644
index 07361cfdce9..4fdbcd18656 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -55,25 +55,41 @@ object TestHive
Expand Down
59 changes: 38 additions & 21 deletions dev/diffs/3.5.8.diff
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ index 7ee18df3756..d09f70e5d99 100644
assert(exchanges.size == 2)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index a1d5d579338..c201d39cc78 100644
index a1d5d579338..75d9f484c97 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -24,8 +24,9 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
Expand All @@ -352,7 +352,26 @@ index a1d5d579338..c201d39cc78 100644
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction, Window}
import org.apache.spark.sql.functions._
@@ -1187,10 +1188,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest
@@ -970,7 +971,8 @@ class DataFrameWindowFunctionsSuite extends QueryTest
}
}

- test("Window spill with more than the inMemoryThreshold and spillThreshold") {
+ test("Window spill with more than the inMemoryThreshold and spillThreshold",
+ IgnoreComet("Comet does not support spilling")) {
val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4")).toDF("key", "value")
val window = Window.partitionBy($"key").orderBy($"value")

@@ -982,7 +984,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
}
}

- test("SPARK-21258: complex object in combination with spilling") {
+ test("SPARK-21258: complex object in combination with spilling", IgnoreComet("Comet does not support spilling")) {
// Make sure we trigger the spilling path.
withSQLConf(SQLConf.WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "1",
SQLConf.WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "17") {
@@ -1187,10 +1189,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest
}

def isShuffleExecByRequirement(
Expand All @@ -366,7 +385,7 @@ index a1d5d579338..c201d39cc78 100644
case _ => false
}

@@ -1213,7 +1216,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
@@ -1213,13 +1217,15 @@ class DataFrameWindowFunctionsSuite extends QueryTest
val shuffleByRequirement = windowed.queryExecution.executedPlan.exists {
case w: WindowExec =>
w.child.exists {
Expand All @@ -375,6 +394,15 @@ index a1d5d579338..c201d39cc78 100644
case _ => false
}
case _ => false
}

- assert(shuffleByRequirement, "Can't find desired shuffle node from the query plan")
+ // Ignore shuffleByRequirement assertion as it refers to original Spark node names,
+ // which not relevant to Comet
+ // assert(shuffleByRequirement, "Can't find desired shuffle node from the query plan")
}
}

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index c4fb4fa943c..a04b23870a8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Expand Down Expand Up @@ -1444,7 +1472,7 @@ index 5a413c77754..207b66e1d7b 100644

import testImplicits._
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 2f8e401e743..dbcf3171946 100644
index 2f8e401e743..0bedd3a11bc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -27,9 +27,11 @@ import org.scalatest.time.SpanSugar._
Expand Down Expand Up @@ -1850,14 +1878,12 @@ index 2f8e401e743..dbcf3171946 100644
}.size == (if (firstAccess) 2 else 0))
assert(collect(initialExecutedPlan) {
case i: InMemoryTableScanLike => i
@@ -2980,7 +3023,9 @@ class AdaptiveQueryExecSuite
@@ -2980,7 +3023,7 @@ class AdaptiveQueryExecSuite

val plan = df.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
assert(plan.inputPlan.isInstanceOf[TakeOrderedAndProjectExec])
- assert(plan.finalPhysicalPlan.isInstanceOf[WindowExec])
+ assert(
+ plan.finalPhysicalPlan.isInstanceOf[WindowExec] ||
+ plan.finalPhysicalPlan.find(_.isInstanceOf[CometWindowExec]).nonEmpty)
+ assert(plan.finalPhysicalPlan.isInstanceOf[WindowExec] || plan.finalPhysicalPlan.isInstanceOf[CometWindowExec])
plan.inputPlan.output.zip(plan.finalPhysicalPlan.output).foreach { case (o1, o2) =>
assert(o1.semanticEquals(o2), "Different output column order after AQE optimization")
}
Expand Down Expand Up @@ -1958,7 +1984,7 @@ index 07e2849ce6f..3e73645b638 100644
ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString
)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 8e88049f51e..20d7ef7b1bc 100644
index 8e88049f51e..097c518a19a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -1095,7 +1095,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
Expand Down Expand Up @@ -2834,7 +2860,7 @@ index abe606ad9c1..2d930b64cca 100644
val tblTargetName = "tbl_target"
val tblSourceQualified = s"default.$tblSourceName"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index e937173a590..7d20538bc68 100644
index e937173a590..4d8783e9054 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -27,6 +27,7 @@ import scala.concurrent.duration._
Expand Down Expand Up @@ -2900,17 +2926,8 @@ index e937173a590..7d20538bc68 100644
protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
SparkSession.setActiveSession(spark)
super.withSQLConf(pairs: _*)(f)
@@ -435,6 +488,8 @@ private[sql] trait SQLTestUtilsBase
val schema = df.schema
val withoutFilters = df.queryExecution.executedPlan.transform {
case FilterExec(_, child) => child
+ case CometFilterExec(_, _, _, _, child, _) => child
+ case CometProjectExec(_, _, _, _, CometFilterExec(_, _, _, _, child, _), _) => child
}

spark.internalCreateDataFrame(withoutFilters.execute(), schema)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index ed2e309fa07..a5ea58146ad 100644
index ed2e309fa07..25b798d2c1c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -74,6 +74,20 @@ trait SharedSparkSessionBase
Expand Down Expand Up @@ -3023,7 +3040,7 @@ index 6160c3e5f6c..0956d7d9edc 100644

test("SPARK-4963 DataFrame sample on mutable row return wrong result") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 1d646f40b3e..5babe505301 100644
index 1d646f40b3e..df108c17c42 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -53,25 +53,41 @@ object TestHive
Expand Down
Loading
Loading