diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index 174eae89c6..d0a55e9fb6 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -364,6 +364,7 @@ jobs: org.apache.spark.sql.CometCollationSuite org.apache.comet.CometFuzzAggregateSuite org.apache.spark.sql.comet.execution.arrow.CometArrowStreamSuite + org.apache.spark.sql.CometSparkInternalFunctionsSuite - name: "expressions" value: | org.apache.comet.CometExpressionSuite diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index 2294e00363..c35fc9db86 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -180,6 +180,7 @@ jobs: org.apache.spark.sql.CometCollationSuite org.apache.comet.CometFuzzAggregateSuite org.apache.spark.sql.comet.execution.arrow.CometArrowStreamSuite + org.apache.spark.sql.CometSparkInternalFunctionsSuite - name: "expressions" value: | org.apache.comet.CometExpressionSuite diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 143048fb44..75b6600ba2 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -255,7 +255,8 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { classOf[ToCharacter] -> CometToCharacter, classOf[ToNumber] -> CometToNumber, classOf[TryToNumber] -> CometTryToNumber, - classOf[Mask] -> CometMask) + classOf[Mask] -> CometMask, + classOf[Empty2Null] -> CometEmpty2Null) base ++ sparkVersionSpecificStringExpressions } diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index 0dc092eb27..f5c0c82284 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -19,7 +19,7 @@ package org.apache.comet.serde -import org.apache.spark.sql.catalyst.expressions.{Attribute, BitLength, Cast, Concat, ConcatWs, Elt, Expression, FindInSet, FormatNumber, FormatString, GetJsonObject, If, InitCap, IsNull, Left, Length, Levenshtein, Like, Literal, Lower, Mask, OctetLength, Overlay, RegExpExtract, RegExpExtractAll, RegExpInStr, RegExpReplace, Right, RLike, SoundEx, StringLocate, StringLPad, StringRepeat, StringReplace, StringRPad, StringSplit, StringTranslate, Substring, SubstringIndex, ToCharacter, ToNumber, TryToNumber, UnBase64, Upper} +import org.apache.spark.sql.catalyst.expressions.{Attribute, BitLength, Cast, Concat, ConcatWs, Elt, Empty2Null, Expression, FindInSet, FormatNumber, FormatString, GetJsonObject, If, InitCap, IsNull, Left, Length, Levenshtein, Like, Literal, Lower, Mask, OctetLength, Overlay, RegExpExtract, RegExpExtractAll, RegExpInStr, RegExpReplace, Right, RLike, SoundEx, StringLocate, StringLPad, StringRepeat, StringReplace, StringRPad, StringSplit, StringTranslate, Substring, SubstringIndex, ToCharacter, ToNumber, TryToNumber, UnBase64, Upper} import org.apache.spark.sql.types.{BinaryType, DataTypes, LongType, StringType} import org.apache.spark.unsafe.types.UTF8String @@ -668,3 +668,7 @@ object CometToNumber extends CometCodegenDispatch[ToNumber] object CometTryToNumber extends CometCodegenDispatch[TryToNumber] object CometMask extends CometCodegenDispatch[Mask] + +// A internal function that converts the empty string to null for partition values. +// This function should be only used in V1Writes. +object CometEmpty2Null extends CometCodegenDispatch[Empty2Null] diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala index f6795b91a3..0a33a50450 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala @@ -21,8 +21,9 @@ package org.apache.comet.parquet import java.io.File -import scala.util.Random +import scala.util.{Random, Using} +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{CometTestBase, DataFrame, Row} import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometNativeWriteExec, CometScanExec} import org.apache.spark.sql.execution.{FileSourceScanExec, QueryExecution, SparkPlan} @@ -37,6 +38,27 @@ class CometParquetWriterSuite extends CometTestBase { import testImplicits._ + test("partitioned write with empty string partition value") { + withTempPath { path => + Seq(("", 1), ("a", 2)) + .toDF("part", "value") + .write + .partitionBy("part") + .parquet(path.toString) + Using(FileSystem.get(spark.sparkContext.hadoopConfiguration)) { fs => + val partitions = fs + .listStatus(new Path(path.toString)) + .filter(_.isDirectory) + .map(_.getPath.getName) + .sorted + assert(partitions.contains("part=a")) + assert(!partitions.contains("part=")) + assert(partitions.count(_.startsWith("part=__HIVE_DEFAULT_PARTITION__")) == 1) + } + checkAnswer(spark.read.parquet(path.toString), Row(1, null) :: Row(2, "a") :: Nil) + } + } + test("basic parquet write") { withTempPath { dir => val outputPath = new File(dir, "output.parquet").getAbsolutePath diff --git a/spark/src/test/spark-3.5/org/apache/spark/sql/CometSparkInternalFunctionsSuite.scala b/spark/src/test/spark-3.5/org/apache/spark/sql/CometSparkInternalFunctionsSuite.scala new file mode 100644 index 0000000000..6f64574739 --- /dev/null +++ b/spark/src/test/spark-3.5/org/apache/spark/sql/CometSparkInternalFunctionsSuite.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.expressions.Empty2Null +import org.apache.spark.sql.functions.col + +class CometSparkInternalFunctionsSuite extends CometTestBase { + + test("empty2null is offloaded to Comet") { + withParquetTable(Seq("", "a", null, "b").map(Tuple1(_)), "tbl") { + val df = spark + .sql("select _1 from tbl") + .select(new Column(Empty2Null(col("_1").expr)).as("p")) + checkSparkAnswerAndOperator(df) + } + } +} diff --git a/spark/src/test/spark-4.x/org/apache/spark/sql/CometSparkInternalFunctionsSuite.scala b/spark/src/test/spark-4.x/org/apache/spark/sql/CometSparkInternalFunctionsSuite.scala new file mode 100644 index 0000000000..b8b107f2e8 --- /dev/null +++ b/spark/src/test/spark-4.x/org/apache/spark/sql/CometSparkInternalFunctionsSuite.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.expressions.Empty2Null +import org.apache.spark.sql.classic.ExpressionUtils +import org.apache.spark.sql.functions._ + +class CometSparkInternalFunctionsSuite extends CometTestBase { + + test("empty2null is offloaded to Comet") { + withParquetTable(Seq("", "a", null, "b").map(Tuple1(_)), "tbl") { + val df = sql("select _1 from tbl") + .select(ExpressionUtils.column(Empty2Null(ExpressionUtils.expression(col("_1")))).as("p")) + checkSparkAnswerAndOperator(df) + } + } +}