From e3271104328f2191c41202b5bd05521db745eeaf Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Wed, 1 Jul 2026 15:42:34 -0700 Subject: [PATCH 1/2] refactor to remove unnecessary code for mysql source retention --- .../texera/amber/operator/LogicalOp.scala | 2 +- .../source/sql/mysql/MySQLConnUtil.scala | 44 --------- .../source/sql/mysql/MySQLSourceOpDesc.scala | 26 +----- .../source/sql/mysql/MySQLSourceOpExec.scala | 72 --------------- .../source/sql/mysql/MySQLConnUtilSpec.scala | 90 ------------------- .../sql/mysql/MySQLSourceOpDescSpec.scala | 13 +-- 6 files changed, 6 insertions(+), 241 deletions(-) delete mode 100644 common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/sql/mysql/MySQLConnUtil.scala delete mode 100644 common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/sql/mysql/MySQLSourceOpExec.scala delete mode 100644 common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/sql/mysql/MySQLConnUtilSpec.scala diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala index 55e241ecaf3..56094945640 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala @@ -215,7 +215,7 @@ trait StateTransferFunc new Type(value = classOf[PythonUDFOpDescV2], name = "PythonUDFV2"), new Type(value = classOf[PythonUDFSourceOpDescV2], name = "PythonUDFSourceV2"), new Type(value = classOf[DualInputPortsPythonUDFOpDescV2], name = "DualInputPortsPythonUDFV2"), - new Type(value = classOf[MySQLSourceOpDesc], name = "MySQLSource"), + new Type(value = classOf[MySQLSourceOpDesc], name = "MySQLSource"), // Retained non-executable for legacy-workflow deserialization only. new Type(value = classOf[PostgreSQLSourceOpDesc], name = "PostgreSQLSource"), new Type(value = classOf[AsterixDBSourceOpDesc], name = "AsterixDBSource"), new Type(value = classOf[TypeCastingOpDesc], name = "TypeCasting"), diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/sql/mysql/MySQLConnUtil.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/sql/mysql/MySQLConnUtil.scala deleted file mode 100644 index 120f5dfe518..00000000000 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/sql/mysql/MySQLConnUtil.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.texera.amber.operator.source.sql.mysql - -import java.sql.{Connection, DriverManager, SQLException} - -object MySQLConnUtil { - - // Builds the JDBC URL. - private[mysql] def buildJdbcUrl(host: String, port: String, database: String): String = - "jdbc:mysql://" + host + ":" + port + "/" + database + "?autoReconnect=true&useSSL=true" - - @throws[SQLException] - def connect( - host: String, - port: String, - database: String, - username: String, - password: String - ): Connection = { - val url = buildJdbcUrl(host, port, database) - val connection = DriverManager.getConnection(url, username, password) - // set to readonly to improve efficiency - connection.setReadOnly(true) - connection - } -} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/sql/mysql/MySQLSourceOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/sql/mysql/MySQLSourceOpDesc.scala index bc9e1f9e66c..9c1c0d38f2d 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/sql/mysql/MySQLSourceOpDesc.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/sql/mysql/MySQLSourceOpDesc.scala @@ -19,15 +19,10 @@ package org.apache.texera.amber.operator.source.sql.mysql -import org.apache.texera.amber.core.executor.OpExecWithClassName import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} -import org.apache.texera.amber.core.workflow.{OutputPort, PhysicalOp, SchemaPropagationFunc} +import org.apache.texera.amber.core.workflow.{OutputPort, PhysicalOp} import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} import org.apache.texera.amber.operator.source.sql.SQLSourceOpDesc -import org.apache.texera.amber.operator.source.sql.mysql.MySQLConnUtil.connect -import org.apache.texera.amber.util.JSONUtils.objectMapper - -import java.sql.{Connection, SQLException} @deprecated("MySQL source operator is no longer executable.", "1.1.0-incubating") class MySQLSourceOpDesc extends SQLSourceOpDesc { @@ -36,21 +31,7 @@ class MySQLSourceOpDesc extends SQLSourceOpDesc { workflowId: WorkflowIdentity, executionId: ExecutionIdentity ): PhysicalOp = - PhysicalOp - .sourcePhysicalOp( - workflowId, - executionId, - this.operatorIdentifier, - OpExecWithClassName( - "org.apache.texera.amber.operator.source.sql.mysql.MySQLSourceOpExec", - objectMapper.writeValueAsString(this) - ) - ) - .withInputPorts(operatorInfo.inputPorts) - .withOutputPorts(operatorInfo.outputPorts) - .withPropagateSchema( - SchemaPropagationFunc(_ => Map(operatorInfo.outputPorts.head.id -> sourceSchema())) - ) + throw new UnsupportedOperationException("MySQL Source operator is no longer executable.") override def operatorInfo: OperatorInfo = OperatorInfo( @@ -61,9 +42,6 @@ class MySQLSourceOpDesc extends SQLSourceOpDesc { outputPorts = List(OutputPort()) ) - @throws[SQLException] - override def establishConn: Connection = connect(host, port, database, username, password) - override def updatePort(): Unit = port = if (port.trim().equals("default")) "3306" else port } diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/sql/mysql/MySQLSourceOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/sql/mysql/MySQLSourceOpExec.scala deleted file mode 100644 index bd494a940b1..00000000000 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/sql/mysql/MySQLSourceOpExec.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.texera.amber.operator.source.sql.mysql - -import org.apache.texera.amber.core.tuple.AttributeType -import org.apache.texera.amber.operator.source.sql.SQLSourceOpExec -import org.apache.texera.amber.operator.source.sql.mysql.MySQLConnUtil.connect -import org.apache.texera.amber.util.JSONUtils.objectMapper - -import java.sql._ - -@deprecated("MySQL source operator is no longer executable.", "1.1.0-incubating") -class MySQLSourceOpExec private[mysql] ( - descString: String -) extends SQLSourceOpExec(descString) { - override val desc: MySQLSourceOpDesc = - objectMapper.readValue(descString, classOf[MySQLSourceOpDesc]) - schema = desc.sourceSchema() - val FETCH_TABLE_NAMES_SQL = - "SELECT table_name FROM information_schema.tables WHERE table_schema = ?;" - - @throws[SQLException] - override def establishConn(): Connection = - connect(desc.host, desc.port, desc.database, desc.username, desc.password) - - @throws[RuntimeException] - override def addFilterConditions(queryBuilder: StringBuilder): Unit = { - val keywordSearchByColumn = desc.keywordSearchByColumn.orNull - if ( - desc.keywordSearch.getOrElse(false) && keywordSearchByColumn != null && desc.keywords != null - ) { - val columnType = schema.getAttribute(keywordSearchByColumn).getType - - if (columnType == AttributeType.STRING) - // in sql prepared statement, column name cannot be inserted using PreparedStatement.setString either - queryBuilder ++= " AND MATCH(" + keywordSearchByColumn + ") AGAINST (? IN BOOLEAN MODE)" - else - throw new RuntimeException("Can't do keyword search on type " + columnType.toString) - } - } - - @throws[SQLException] - override protected def loadTableNames(): Unit = { - val preparedStatement = connection.prepareStatement(FETCH_TABLE_NAMES_SQL) - preparedStatement.setString(1, desc.database) - val resultSet = preparedStatement.executeQuery - while ({ - resultSet.next - }) { - tableNames += resultSet.getString(1) - } - resultSet.close() - preparedStatement.close() - } -} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/sql/mysql/MySQLConnUtilSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/sql/mysql/MySQLConnUtilSpec.scala deleted file mode 100644 index 99f3f384453..00000000000 --- a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/sql/mysql/MySQLConnUtilSpec.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.texera.amber.operator.source.sql.mysql - -import org.scalatest.flatspec.AnyFlatSpec - -class MySQLConnUtilSpec extends AnyFlatSpec { - - // Strategy: same as PostgreSQLConnUtilSpec. Pin the JDBC URL composition - // (the only application-logic in this util) without a real DB. - - // --------------------------------------------------------------------------- - // URL composition - host/port/database - // --------------------------------------------------------------------------- - - "MySQLConnUtil.buildJdbcUrl" should - "build a URL of the form jdbc:mysql://{host}:{port}/{database}?..." in { - assert( - MySQLConnUtil - .buildJdbcUrl("host-m", "3306", "db-m") - .startsWith("jdbc:mysql://host-m:3306/db-m") - ) - } - - it should "interpolate distinct host/port/database values into the URL" in { - assert( - MySQLConnUtil - .buildJdbcUrl("host-1", "3306", "db-1") - .startsWith("jdbc:mysql://host-1:3306/db-1") - ) - assert( - MySQLConnUtil - .buildJdbcUrl("host-2", "33060", "db-2") - .startsWith("jdbc:mysql://host-2:33060/db-2") - ) - } - - it should "place host BEFORE port" in { - val url = MySQLConnUtil.buildJdbcUrl("a", "1", "x") - assert(url.contains("//a:1/")) - assert(!url.contains("//1:a/")) - } - - // --------------------------------------------------------------------------- - // Query parameters - autoReconnect=true and useSSL=true must be present - // --------------------------------------------------------------------------- - - it should "include the `autoReconnect=true` query parameter" in { - val url = MySQLConnUtil.buildJdbcUrl("h", "3306", "db") - assert(url.contains("autoReconnect=true"), s"URL must include autoReconnect=true, got: $url") - } - - it should "include the `useSSL=true` query parameter (TLS contract)" in { - val url = MySQLConnUtil.buildJdbcUrl("h", "3306", "db") - assert(url.contains("useSSL=true"), s"URL must include useSSL=true (TLS), got: $url") - } - - it should "use the canonical `?...&...` separator pattern" in { - assert( - MySQLConnUtil.buildJdbcUrl( - "h", - "3306", - "db" - ) == "jdbc:mysql://h:3306/db?autoReconnect=true&useSSL=true" - ) - } - - it should "use the `mysql` JDBC subprotocol (not e.g. `postgresql`)" in { - val url = MySQLConnUtil.buildJdbcUrl("h", "3306", "db") - assert(url.startsWith("jdbc:mysql://")) - assert(!url.contains("jdbc:postgresql:")) - } -} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/sql/mysql/MySQLSourceOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/sql/mysql/MySQLSourceOpDescSpec.scala index cde2c636800..315fa7925b7 100644 --- a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/sql/mysql/MySQLSourceOpDescSpec.scala +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/sql/mysql/MySQLSourceOpDescSpec.scala @@ -19,7 +19,6 @@ package org.apache.texera.amber.operator.source.sql.mysql -import org.apache.texera.amber.core.executor.OpExecWithClassName import org.apache.texera.amber.core.workflow.WorkflowContext.{ DEFAULT_EXECUTION_ID, DEFAULT_WORKFLOW_ID @@ -65,16 +64,10 @@ class MySQLSourceOpDescSpec extends AnyFlatSpec with Matchers { } "MySQLSourceOpDesc.getPhysicalOp" should - "wire the MySQL exec as a source op with no input port and one output port" in { + "throw because the operator is no longer executable" in { val d = new MySQLSourceOpDesc - val physical = d.getPhysicalOp(DEFAULT_WORKFLOW_ID, DEFAULT_EXECUTION_ID) - physical.opExecInitInfo match { - case OpExecWithClassName(className, _) => - className shouldBe "org.apache.texera.amber.operator.source.sql.mysql.MySQLSourceOpExec" - case other => fail(s"expected OpExecWithClassName, got $other") - } - physical.inputPorts.keySet shouldBe empty - physical.outputPorts.keySet shouldBe d.operatorInfo.outputPorts.map(_.id).toSet + an[UnsupportedOperationException] should be thrownBy + d.getPhysicalOp(DEFAULT_WORKFLOW_ID, DEFAULT_EXECUTION_ID) } "MySQLSourceOpDesc" should "round-trip its config fields through the polymorphic base" in { From d4f7ff6f3e1e6bf05e98c5adbe1062785ed4566b Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Wed, 1 Jul 2026 15:51:28 -0700 Subject: [PATCH 2/2] lint --- .../scala/org/apache/texera/amber/operator/LogicalOp.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala index 56094945640..61cdd47b664 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala @@ -215,7 +215,10 @@ trait StateTransferFunc new Type(value = classOf[PythonUDFOpDescV2], name = "PythonUDFV2"), new Type(value = classOf[PythonUDFSourceOpDescV2], name = "PythonUDFSourceV2"), new Type(value = classOf[DualInputPortsPythonUDFOpDescV2], name = "DualInputPortsPythonUDFV2"), - new Type(value = classOf[MySQLSourceOpDesc], name = "MySQLSource"), // Retained non-executable for legacy-workflow deserialization only. + new Type( + value = classOf[MySQLSourceOpDesc], + name = "MySQLSource" + ), // Retained non-executable for legacy-workflow deserialization only. new Type(value = classOf[PostgreSQLSourceOpDesc], name = "PostgreSQLSource"), new Type(value = classOf[AsterixDBSourceOpDesc], name = "AsterixDBSource"), new Type(value = classOf[TypeCastingOpDesc], name = "TypeCasting"),