From 8069b27c98e3ffd87281b760b8cc5139c5f4b8f5 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Mon, 18 May 2026 16:29:17 -0700 Subject: [PATCH 1/8] [SQL] Cache SQL metrics in MergeRowsExec interpreted iterator Resolve SQLMetric references once per partition in MergeRowIterator instead of calling longMetric on every row. This avoids repeated metrics map lookups on the interpreted path; the codegen path is unchanged. --- .../datasources/v2/MergeRowsExec.scala | 46 ++++++++++++------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala index 526ff843a1496..32bd02af53a7d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala @@ -518,6 +518,19 @@ case class MergeRowsExec( private val notMatchedBySourceInstructions: Seq[InstructionExec]) extends Iterator[InternalRow] { + private val numTargetRowsCopied = MergeRowsExec.this.longMetric("numTargetRowsCopied") + private val numTargetRowsInserted = MergeRowsExec.this.longMetric("numTargetRowsInserted") + private val numTargetRowsDeleted = MergeRowsExec.this.longMetric("numTargetRowsDeleted") + private val numTargetRowsUpdated = MergeRowsExec.this.longMetric("numTargetRowsUpdated") + private val numTargetRowsMatchedUpdated = + MergeRowsExec.this.longMetric("numTargetRowsMatchedUpdated") + private val numTargetRowsMatchedDeleted = + MergeRowsExec.this.longMetric("numTargetRowsMatchedDeleted") + private val numTargetRowsNotMatchedBySourceUpdated = + MergeRowsExec.this.longMetric("numTargetRowsNotMatchedBySourceUpdated") + private val numTargetRowsNotMatchedBySourceDeleted = + MergeRowsExec.this.longMetric("numTargetRowsNotMatchedBySourceDeleted") + var cachedExtraRow: InternalRow = _ override def hasNext: Boolean = cachedExtraRow != null || rowIterator.hasNext @@ -579,28 +592,27 @@ case class MergeRowsExec( null } - } - // For group based merge, copy is inserted if row matches no other case - private def incrementCopyMetric(): Unit = longMetric("numTargetRowsCopied") += 1 + private def incrementCopyMetric(): Unit = numTargetRowsCopied += 1 - private def incrementInsertMetric(): Unit = longMetric("numTargetRowsInserted") += 1 + private def incrementInsertMetric(): Unit = numTargetRowsInserted += 1 - private def incrementDeleteMetric(sourcePresent: Boolean): Unit = { - longMetric("numTargetRowsDeleted") += 1 - if (sourcePresent) { - longMetric("numTargetRowsMatchedDeleted") += 1 - } else { - longMetric("numTargetRowsNotMatchedBySourceDeleted") += 1 + private def incrementDeleteMetric(sourcePresent: Boolean): Unit = { + numTargetRowsDeleted += 1 + if (sourcePresent) { + numTargetRowsMatchedDeleted += 1 + } else { + numTargetRowsNotMatchedBySourceDeleted += 1 + } } - } - private def incrementUpdateMetric(sourcePresent: Boolean): Unit = { - longMetric("numTargetRowsUpdated") += 1 - if (sourcePresent) { - longMetric("numTargetRowsMatchedUpdated") += 1 - } else { - longMetric("numTargetRowsNotMatchedBySourceUpdated") += 1 + private def incrementUpdateMetric(sourcePresent: Boolean): Unit = { + numTargetRowsUpdated += 1 + if (sourcePresent) { + numTargetRowsMatchedUpdated += 1 + } else { + numTargetRowsNotMatchedBySourceUpdated += 1 + } } } } From 74abff84eb48f547abc52d7df6d2700e72b57d99 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Mon, 18 May 2026 16:47:09 -0700 Subject: [PATCH 2/8] [SPARK-56933][SQL] Add comment and simplify metric refs in MergeRowsExec Document why metrics are cached per partition and use longMetric directly in MergeRowIterator instead of MergeRowsExec.this. --- .../datasources/v2/MergeRowsExec.scala | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala index 32bd02af53a7d..89639661190b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala @@ -518,18 +518,20 @@ case class MergeRowsExec( private val notMatchedBySourceInstructions: Seq[InstructionExec]) extends Iterator[InternalRow] { - private val numTargetRowsCopied = MergeRowsExec.this.longMetric("numTargetRowsCopied") - private val numTargetRowsInserted = MergeRowsExec.this.longMetric("numTargetRowsInserted") - private val numTargetRowsDeleted = MergeRowsExec.this.longMetric("numTargetRowsDeleted") - private val numTargetRowsUpdated = MergeRowsExec.this.longMetric("numTargetRowsUpdated") + // Resolve metrics once per partition; longMetric(name) does a map lookup on each call. + // See SPARK-56933. + private val numTargetRowsCopied = longMetric("numTargetRowsCopied") + private val numTargetRowsInserted = longMetric("numTargetRowsInserted") + private val numTargetRowsDeleted = longMetric("numTargetRowsDeleted") + private val numTargetRowsUpdated = longMetric("numTargetRowsUpdated") private val numTargetRowsMatchedUpdated = - MergeRowsExec.this.longMetric("numTargetRowsMatchedUpdated") + longMetric("numTargetRowsMatchedUpdated") private val numTargetRowsMatchedDeleted = - MergeRowsExec.this.longMetric("numTargetRowsMatchedDeleted") + longMetric("numTargetRowsMatchedDeleted") private val numTargetRowsNotMatchedBySourceUpdated = - MergeRowsExec.this.longMetric("numTargetRowsNotMatchedBySourceUpdated") + longMetric("numTargetRowsNotMatchedBySourceUpdated") private val numTargetRowsNotMatchedBySourceDeleted = - MergeRowsExec.this.longMetric("numTargetRowsNotMatchedBySourceDeleted") + longMetric("numTargetRowsNotMatchedBySourceDeleted") var cachedExtraRow: InternalRow = _ From 1bcc5159ce81c033bff6227fff318ebe300032a5 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Mon, 18 May 2026 17:26:46 -0700 Subject: [PATCH 3/8] [SPARK-56933][TEST] Add warm-up to MergeRowsExecBenchmark interpreted cases Use a dedicated benchmark helper with 15s JIT warm-up and a 15s timed window for whole-stage on/off cases, and support running matched-update-only in isolation for local A/B testing. --- .../benchmark/MergeRowsExecBenchmark.scala | 69 +++++++++++++++---- 1 file changed, 55 insertions(+), 14 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MergeRowsExecBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MergeRowsExecBenchmark.scala index 8ddbca46b7396..5ba69ca04c482 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MergeRowsExecBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MergeRowsExecBenchmark.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.execution.benchmark +import scala.concurrent.duration._ + +import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterThan, IsNotNull, Literal} import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral @@ -24,6 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.MergeRows import org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Discard, Insert, Keep, Split, Update} import org.apache.spark.sql.classic.{ClassicConversions, Dataset} import org.apache.spark.sql.execution.datasources.v2.MergeRowsExec +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegerType, StringType} /** @@ -34,6 +38,7 @@ import org.apache.spark.sql.types.{IntegerType, StringType} * bin/spark-submit --class * --jars , * 2. build/sbt "sql/Test/runMain " + * Optional: pass "matched-update-only" to run a single case. * 3. generate result: * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/Test/runMain " * Results will be written to "benchmarks/MergeRowsExecBenchmark-results.txt". @@ -101,6 +106,33 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark with ClassicConversions Dataset.ofRows(spark, mergeRows) } + /** + * Like [[codegenBenchmark]], but with JIT warm-up and a longer timed window so interpreted + * (whole-stage off) results are more stable when comparing metric caching changes. + */ + private def mergeRowsCodegenBenchmark(name: String, cardinality: Long)(f: => Unit): Unit = { + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { f } + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { f } + + val benchmark = new Benchmark( + name, + cardinality, + minNumIters = 3, + warmupTime = 15.seconds, + minTime = 15.seconds, + output = output) + + benchmark.addCase(s"$name wholestage off") { _ => + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { f } + } + + benchmark.addCase(s"$name wholestage on") { _ => + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { f } + } + + benchmark.run() + } + private def mergeMatchedUpdateOnly(): Unit = { val inputDF = createJoinOutput(N, matchedFraction = 1.0) val a = inputDF.queryExecution.analyzed.output @@ -110,7 +142,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark with ClassicConversions a(0), a(5), a(6), a(3) ))) - codegenBenchmark("merge - matched update only", N) { + mergeRowsCodegenBenchmark("merge - matched update only", N) { val df = buildMergeRowsDF(inputDF, matchedInstr) assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec])) df.noop() @@ -126,7 +158,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark with ClassicConversions a(4), a(5), a(6), a(7) ))) - codegenBenchmark("merge - not matched insert only", N) { + mergeRowsCodegenBenchmark("merge - not matched insert only", N) { val df = buildMergeRowsDF(inputDF, Seq.empty, notMatchedInstr) assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec])) df.noop() @@ -144,7 +176,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark with ClassicConversions a(4), a(5), a(6), a(7) ))) - codegenBenchmark("merge - matched update + not matched insert", N) { + mergeRowsCodegenBenchmark("merge - matched update + not matched insert", N) { val df = buildMergeRowsDF(inputDF, matchedInstr, notMatchedInstr) assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec])) df.noop() @@ -156,7 +188,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark with ClassicConversions val matchedInstr = Seq(Discard(TrueLiteral)) - codegenBenchmark("merge - matched delete", N) { + mergeRowsCodegenBenchmark("merge - matched delete", N) { val df = buildMergeRowsDF(inputDF, matchedInstr) assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec])) df.noop() @@ -177,7 +209,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark with ClassicConversions Keep(Insert, GreaterThan(a(5), Literal(500)), Seq(a(4), a(5), a(6), a(7))) ) - codegenBenchmark("merge - conditional clauses", N) { + mergeRowsCodegenBenchmark("merge - conditional clauses", N) { val df = buildMergeRowsDF(inputDF, matchedInstr, notMatchedInstr) assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec])) df.noop() @@ -199,7 +231,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark with ClassicConversions ))) val notMatchedBySourceInstr = Seq(Discard(TrueLiteral)) - codegenBenchmark("merge - matched + not matched + not matched by source", N) { + mergeRowsCodegenBenchmark("merge - matched + not matched + not matched by source", N) { val df = buildMergeRowsDF(inputDF, matchedInstr, notMatchedInstr, notMatchedBySourceInstr) assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec])) df.noop() @@ -216,7 +248,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark with ClassicConversions Seq(a(0), a(5), a(6), a(3)) )) - codegenBenchmark("merge - split update (delete + insert)", N) { + mergeRowsCodegenBenchmark("merge - split update (delete + insert)", N) { val df = buildMergeRowsDF(inputDF, matchedInstr) assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec])) df.noop() @@ -225,13 +257,22 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark with ClassicConversions override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { runBenchmark("MergeRowsExec Codegen Benchmark") { - mergeMatchedUpdateOnly() - mergeNotMatchedInsertOnly() - mergeMatchedAndNotMatched() - mergeMatchedDelete() - mergeConditionalClauses() - mergeAllThreeClauses() - mergeSplitUpdate() + if (mainArgs.isEmpty) { + mergeMatchedUpdateOnly() + mergeNotMatchedInsertOnly() + mergeMatchedAndNotMatched() + mergeMatchedDelete() + mergeConditionalClauses() + mergeAllThreeClauses() + mergeSplitUpdate() + } else { + mainArgs.foreach { + case "matched-update-only" => mergeMatchedUpdateOnly() + case other => + throw new IllegalArgumentException( + s"Unknown benchmark case: $other (supported: matched-update-only)") + } + } } } } From 623a0cdd1c3ecfb6d5fd919486eff52cf2978751 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Mon, 18 May 2026 17:30:13 -0700 Subject: [PATCH 4/8] [SPARK-56933][SQL] Use explicit outer reference for cached metrics in MergeRowsExec Use MergeRowsExec.this.longMetric in MergeRowIterator so metric resolution clearly refers to the outer operator instance. --- .../execution/datasources/v2/MergeRowsExec.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala index 89639661190b9..0879b92d676a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala @@ -520,18 +520,18 @@ case class MergeRowsExec( // Resolve metrics once per partition; longMetric(name) does a map lookup on each call. // See SPARK-56933. - private val numTargetRowsCopied = longMetric("numTargetRowsCopied") - private val numTargetRowsInserted = longMetric("numTargetRowsInserted") - private val numTargetRowsDeleted = longMetric("numTargetRowsDeleted") - private val numTargetRowsUpdated = longMetric("numTargetRowsUpdated") + private val numTargetRowsCopied = MergeRowsExec.this.longMetric("numTargetRowsCopied") + private val numTargetRowsInserted = MergeRowsExec.this.longMetric("numTargetRowsInserted") + private val numTargetRowsDeleted = MergeRowsExec.this.longMetric("numTargetRowsDeleted") + private val numTargetRowsUpdated = MergeRowsExec.this.longMetric("numTargetRowsUpdated") private val numTargetRowsMatchedUpdated = - longMetric("numTargetRowsMatchedUpdated") + MergeRowsExec.this.longMetric("numTargetRowsMatchedUpdated") private val numTargetRowsMatchedDeleted = - longMetric("numTargetRowsMatchedDeleted") + MergeRowsExec.this.longMetric("numTargetRowsMatchedDeleted") private val numTargetRowsNotMatchedBySourceUpdated = - longMetric("numTargetRowsNotMatchedBySourceUpdated") + MergeRowsExec.this.longMetric("numTargetRowsNotMatchedBySourceUpdated") private val numTargetRowsNotMatchedBySourceDeleted = - longMetric("numTargetRowsNotMatchedBySourceDeleted") + MergeRowsExec.this.longMetric("numTargetRowsNotMatchedBySourceDeleted") var cachedExtraRow: InternalRow = _ From 6593c1239c8bcc441d01e104e4733ae7ba1450a2 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Mon, 18 May 2026 17:35:57 -0700 Subject: [PATCH 5/8] [SPARK-56933][TEST] Restore MergeRowsExecBenchmark suite entry point Run all benchmark cases from runBenchmarkSuite without main-args filtering. --- .../benchmark/MergeRowsExecBenchmark.scala | 24 ++++++------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MergeRowsExecBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MergeRowsExecBenchmark.scala index 5ba69ca04c482..3d6e6bc067070 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MergeRowsExecBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MergeRowsExecBenchmark.scala @@ -38,7 +38,6 @@ import org.apache.spark.sql.types.{IntegerType, StringType} * bin/spark-submit --class * --jars , * 2. build/sbt "sql/Test/runMain " - * Optional: pass "matched-update-only" to run a single case. * 3. generate result: * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/Test/runMain " * Results will be written to "benchmarks/MergeRowsExecBenchmark-results.txt". @@ -257,22 +256,13 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark with ClassicConversions override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { runBenchmark("MergeRowsExec Codegen Benchmark") { - if (mainArgs.isEmpty) { - mergeMatchedUpdateOnly() - mergeNotMatchedInsertOnly() - mergeMatchedAndNotMatched() - mergeMatchedDelete() - mergeConditionalClauses() - mergeAllThreeClauses() - mergeSplitUpdate() - } else { - mainArgs.foreach { - case "matched-update-only" => mergeMatchedUpdateOnly() - case other => - throw new IllegalArgumentException( - s"Unknown benchmark case: $other (supported: matched-update-only)") - } - } + mergeMatchedUpdateOnly() + mergeNotMatchedInsertOnly() + mergeMatchedAndNotMatched() + mergeMatchedDelete() + mergeConditionalClauses() + mergeAllThreeClauses() + mergeSplitUpdate() } } } From 5d7f2cd098be519532ec16c3431f23a0b49f7b18 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Tue, 19 May 2026 16:32:11 -0700 Subject: [PATCH 6/8] [SPARK-56933][SQL] Use lazy val for cached metrics in MergeRowsExec Resolve each SQLMetric on first use per partition and call longMetric() directly in MergeRowIterator per review feedback. --- .../datasources/v2/MergeRowsExec.scala | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala index 0879b92d676a9..178e028006d96 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala @@ -518,20 +518,18 @@ case class MergeRowsExec( private val notMatchedBySourceInstructions: Seq[InstructionExec]) extends Iterator[InternalRow] { - // Resolve metrics once per partition; longMetric(name) does a map lookup on each call. - // See SPARK-56933. - private val numTargetRowsCopied = MergeRowsExec.this.longMetric("numTargetRowsCopied") - private val numTargetRowsInserted = MergeRowsExec.this.longMetric("numTargetRowsInserted") - private val numTargetRowsDeleted = MergeRowsExec.this.longMetric("numTargetRowsDeleted") - private val numTargetRowsUpdated = MergeRowsExec.this.longMetric("numTargetRowsUpdated") - private val numTargetRowsMatchedUpdated = - MergeRowsExec.this.longMetric("numTargetRowsMatchedUpdated") - private val numTargetRowsMatchedDeleted = - MergeRowsExec.this.longMetric("numTargetRowsMatchedDeleted") - private val numTargetRowsNotMatchedBySourceUpdated = - MergeRowsExec.this.longMetric("numTargetRowsNotMatchedBySourceUpdated") - private val numTargetRowsNotMatchedBySourceDeleted = - MergeRowsExec.this.longMetric("numTargetRowsNotMatchedBySourceDeleted") + // Resolve each metric at most once per partition, on first use; longMetric(name) is a map + // lookup. See SPARK-56933. + private lazy val numTargetRowsCopied = longMetric("numTargetRowsCopied") + private lazy val numTargetRowsInserted = longMetric("numTargetRowsInserted") + private lazy val numTargetRowsDeleted = longMetric("numTargetRowsDeleted") + private lazy val numTargetRowsUpdated = longMetric("numTargetRowsUpdated") + private lazy val numTargetRowsMatchedUpdated = longMetric("numTargetRowsMatchedUpdated") + private lazy val numTargetRowsMatchedDeleted = longMetric("numTargetRowsMatchedDeleted") + private lazy val numTargetRowsNotMatchedBySourceUpdated = + longMetric("numTargetRowsNotMatchedBySourceUpdated") + private lazy val numTargetRowsNotMatchedBySourceDeleted = + longMetric("numTargetRowsNotMatchedBySourceDeleted") var cachedExtraRow: InternalRow = _ From 625856aef5f838f0cdd5a888590b61a35ece8dab Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Tue, 19 May 2026 16:32:13 -0700 Subject: [PATCH 7/8] [SPARK-56933][TEST] Extend codegenBenchmark timing for MergeRowsExecBenchmark Add warmupTime, minTime, and optional numIters to codegenBenchmark. Use 7s warmup and timed window in MergeRowsExecBenchmark without a separate helper or redundant pre-runs. --- .../benchmark/MergeRowsExecBenchmark.scala | 55 +++++++------------ .../benchmark/SqlBasedBenchmark.scala | 33 +++++++++-- 2 files changed, 47 insertions(+), 41 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MergeRowsExecBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MergeRowsExecBenchmark.scala index 3d6e6bc067070..0fcac326d923d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MergeRowsExecBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MergeRowsExecBenchmark.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.benchmark import scala.concurrent.duration._ -import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterThan, IsNotNull, Literal} import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral @@ -27,7 +26,6 @@ import org.apache.spark.sql.catalyst.plans.logical.MergeRows import org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Discard, Insert, Keep, Split, Update} import org.apache.spark.sql.classic.{ClassicConversions, Dataset} import org.apache.spark.sql.execution.datasources.v2.MergeRowsExec -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegerType, StringType} /** @@ -47,6 +45,18 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark with ClassicConversions private val N = 20 << 20 + /** Longer warm-up and timed window for stable interpreted (whole-stage off) results. */ + private def mergeRowsBenchmark(name: String, cardinality: Long)(f: => Unit): Unit = { + codegenBenchmark( + name, + cardinality, + warmupTime = 7.seconds, + minTime = 7.seconds, + minNumIters = 3, + wholestageOffNumIters = 0, + wholestageOnNumIters = 0)(f) + } + /** * Creates a DataFrame simulating the join output from a MERGE operation. * @@ -105,33 +115,6 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark with ClassicConversions Dataset.ofRows(spark, mergeRows) } - /** - * Like [[codegenBenchmark]], but with JIT warm-up and a longer timed window so interpreted - * (whole-stage off) results are more stable when comparing metric caching changes. - */ - private def mergeRowsCodegenBenchmark(name: String, cardinality: Long)(f: => Unit): Unit = { - withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { f } - withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { f } - - val benchmark = new Benchmark( - name, - cardinality, - minNumIters = 3, - warmupTime = 15.seconds, - minTime = 15.seconds, - output = output) - - benchmark.addCase(s"$name wholestage off") { _ => - withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { f } - } - - benchmark.addCase(s"$name wholestage on") { _ => - withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { f } - } - - benchmark.run() - } - private def mergeMatchedUpdateOnly(): Unit = { val inputDF = createJoinOutput(N, matchedFraction = 1.0) val a = inputDF.queryExecution.analyzed.output @@ -141,7 +124,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark with ClassicConversions a(0), a(5), a(6), a(3) ))) - mergeRowsCodegenBenchmark("merge - matched update only", N) { + mergeRowsBenchmark("merge - matched update only", N) { val df = buildMergeRowsDF(inputDF, matchedInstr) assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec])) df.noop() @@ -157,7 +140,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark with ClassicConversions a(4), a(5), a(6), a(7) ))) - mergeRowsCodegenBenchmark("merge - not matched insert only", N) { + mergeRowsBenchmark("merge - not matched insert only", N) { val df = buildMergeRowsDF(inputDF, Seq.empty, notMatchedInstr) assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec])) df.noop() @@ -175,7 +158,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark with ClassicConversions a(4), a(5), a(6), a(7) ))) - mergeRowsCodegenBenchmark("merge - matched update + not matched insert", N) { + mergeRowsBenchmark("merge - matched update + not matched insert", N) { val df = buildMergeRowsDF(inputDF, matchedInstr, notMatchedInstr) assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec])) df.noop() @@ -187,7 +170,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark with ClassicConversions val matchedInstr = Seq(Discard(TrueLiteral)) - mergeRowsCodegenBenchmark("merge - matched delete", N) { + mergeRowsBenchmark("merge - matched delete", N) { val df = buildMergeRowsDF(inputDF, matchedInstr) assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec])) df.noop() @@ -208,7 +191,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark with ClassicConversions Keep(Insert, GreaterThan(a(5), Literal(500)), Seq(a(4), a(5), a(6), a(7))) ) - mergeRowsCodegenBenchmark("merge - conditional clauses", N) { + mergeRowsBenchmark("merge - conditional clauses", N) { val df = buildMergeRowsDF(inputDF, matchedInstr, notMatchedInstr) assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec])) df.noop() @@ -230,7 +213,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark with ClassicConversions ))) val notMatchedBySourceInstr = Seq(Discard(TrueLiteral)) - mergeRowsCodegenBenchmark("merge - matched + not matched + not matched by source", N) { + mergeRowsBenchmark("merge - matched + not matched + not matched by source", N) { val df = buildMergeRowsDF(inputDF, matchedInstr, notMatchedInstr, notMatchedBySourceInstr) assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec])) df.noop() @@ -247,7 +230,7 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark with ClassicConversions Seq(a(0), a(5), a(6), a(3)) )) - mergeRowsCodegenBenchmark("merge - split update (delete + insert)", N) { + mergeRowsBenchmark("merge - split update (delete + insert)", N) { val df = buildMergeRowsDF(inputDF, matchedInstr) assert(df.queryExecution.sparkPlan.exists(_.isInstanceOf[MergeRowsExec])) df.noop() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala index 78d6b01580355..724f2e798d056 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.benchmark +import scala.concurrent.duration._ + import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} import org.apache.spark.internal.config.MAX_RESULT_SIZE import org.apache.spark.internal.config.UI.UI_ENABLED @@ -46,17 +48,38 @@ trait SqlBasedBenchmark extends BenchmarkBase with SQLHelper { .getOrCreate() } - /** Runs function `f` with whole stage codegen on and off. */ - final def codegenBenchmark(name: String, cardinality: Long)(f: => Unit): Unit = { - val benchmark = new Benchmark(name, cardinality, output = output) + /** + * Runs function `f` with whole stage codegen on and off. + * + * @param warmupTime JIT warm-up duration per case before timed iterations. + * @param minTime minimum total timed duration per case when `numIters` is 0. + * @param wholestageOffNumIters if non-zero, run exactly this many timed iterations + * (wholestage off); otherwise use `minNumIters` and `minTime`. + * @param wholestageOnNumIters same for wholestage on. + */ + final def codegenBenchmark( + name: String, + cardinality: Long, + warmupTime: FiniteDuration = 2.seconds, + minTime: FiniteDuration = 2.seconds, + minNumIters: Int = 2, + wholestageOffNumIters: Int = 2, + wholestageOnNumIters: Int = 5)(f: => Unit): Unit = { + val benchmark = new Benchmark( + name, + cardinality, + minNumIters = minNumIters, + warmupTime = warmupTime, + minTime = minTime, + output = output) - benchmark.addCase(s"$name wholestage off", numIters = 2) { _ => + benchmark.addCase(s"$name wholestage off", numIters = wholestageOffNumIters) { _ => withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { f } } - benchmark.addCase(s"$name wholestage on", numIters = 5) { _ => + benchmark.addCase(s"$name wholestage on", numIters = wholestageOnNumIters) { _ => withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { f } From 2ccdf0df2cb207d08727b2841ad7bb30e823203c Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Wed, 20 May 2026 13:06:52 -0700 Subject: [PATCH 8/8] [SPARK-56933][TEST] Align codegenBenchmark params with Benchmark Reorder minNumIters, warmupTime, and minTime to match Benchmark constructor order per review. Clarify scaladoc to reference only codegenBenchmark args. --- .../sql/execution/benchmark/SqlBasedBenchmark.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala index 724f2e798d056..6c60721599bbb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala @@ -51,18 +51,22 @@ trait SqlBasedBenchmark extends BenchmarkBase with SQLHelper { /** * Runs function `f` with whole stage codegen on and off. * + * @param minNumIters minimum timed iterations per case when the corresponding + * `wholestageOffNumIters` or `wholestageOnNumIters` is zero. * @param warmupTime JIT warm-up duration per case before timed iterations. - * @param minTime minimum total timed duration per case when `numIters` is 0. + * @param minTime minimum total timed duration per case when the corresponding + * `wholestageOffNumIters` or `wholestageOnNumIters` is zero. * @param wholestageOffNumIters if non-zero, run exactly this many timed iterations - * (wholestage off); otherwise use `minNumIters` and `minTime`. - * @param wholestageOnNumIters same for wholestage on. + * for the wholestage-off case; otherwise use `minNumIters` and `minTime`. + * @param wholestageOnNumIters if non-zero, run exactly this many timed iterations + * for the wholestage-on case; otherwise use `minNumIters` and `minTime`. */ final def codegenBenchmark( name: String, cardinality: Long, + minNumIters: Int = 2, warmupTime: FiniteDuration = 2.seconds, minTime: FiniteDuration = 2.seconds, - minNumIters: Int = 2, wholestageOffNumIters: Int = 2, wholestageOnNumIters: Int = 5)(f: => Unit): Unit = { val benchmark = new Benchmark(