Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,10 @@ trait StateTransferFunc
new Type(value = classOf[PythonUDFOpDescV2], name = "PythonUDFV2"),
new Type(value = classOf[PythonUDFSourceOpDescV2], name = "PythonUDFSourceV2"),
new Type(value = classOf[DualInputPortsPythonUDFOpDescV2], name = "DualInputPortsPythonUDFV2"),
new Type(value = classOf[MySQLSourceOpDesc], name = "MySQLSource"),
new Type(
value = classOf[MySQLSourceOpDesc],
name = "MySQLSource"
), // Retained non-executable for legacy-workflow deserialization only.
new Type(value = classOf[PostgreSQLSourceOpDesc], name = "PostgreSQLSource"),
new Type(value = classOf[AsterixDBSourceOpDesc], name = "AsterixDBSource"),
new Type(value = classOf[TypeCastingOpDesc], name = "TypeCasting"),
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,10 @@

package org.apache.texera.amber.operator.source.sql.mysql

import org.apache.texera.amber.core.executor.OpExecWithClassName
import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
import org.apache.texera.amber.core.workflow.{OutputPort, PhysicalOp, SchemaPropagationFunc}
import org.apache.texera.amber.core.workflow.{OutputPort, PhysicalOp}
import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo}
import org.apache.texera.amber.operator.source.sql.SQLSourceOpDesc
import org.apache.texera.amber.operator.source.sql.mysql.MySQLConnUtil.connect
import org.apache.texera.amber.util.JSONUtils.objectMapper

import java.sql.{Connection, SQLException}

@deprecated("MySQL source operator is no longer executable.", "1.1.0-incubating")
class MySQLSourceOpDesc extends SQLSourceOpDesc {
Expand All @@ -36,21 +31,7 @@ class MySQLSourceOpDesc extends SQLSourceOpDesc {
workflowId: WorkflowIdentity,
executionId: ExecutionIdentity
): PhysicalOp =
PhysicalOp
.sourcePhysicalOp(
workflowId,
executionId,
this.operatorIdentifier,
OpExecWithClassName(
"org.apache.texera.amber.operator.source.sql.mysql.MySQLSourceOpExec",
objectMapper.writeValueAsString(this)
)
)
.withInputPorts(operatorInfo.inputPorts)
.withOutputPorts(operatorInfo.outputPorts)
.withPropagateSchema(
SchemaPropagationFunc(_ => Map(operatorInfo.outputPorts.head.id -> sourceSchema()))
)
throw new UnsupportedOperationException("MySQL Source operator is no longer executable.")

override def operatorInfo: OperatorInfo =
OperatorInfo(
Expand All @@ -61,9 +42,6 @@ class MySQLSourceOpDesc extends SQLSourceOpDesc {
outputPorts = List(OutputPort())
)

@throws[SQLException]
override def establishConn: Connection = connect(host, port, database, username, password)

override def updatePort(): Unit = port = if (port.trim().equals("default")) "3306" else port

}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.texera.amber.operator.source.sql.mysql

import org.apache.texera.amber.core.executor.OpExecWithClassName
import org.apache.texera.amber.core.workflow.WorkflowContext.{
DEFAULT_EXECUTION_ID,
DEFAULT_WORKFLOW_ID
Expand Down Expand Up @@ -65,16 +64,10 @@ class MySQLSourceOpDescSpec extends AnyFlatSpec with Matchers {
}

"MySQLSourceOpDesc.getPhysicalOp" should
"wire the MySQL exec as a source op with no input port and one output port" in {
"throw because the operator is no longer executable" in {
val d = new MySQLSourceOpDesc
val physical = d.getPhysicalOp(DEFAULT_WORKFLOW_ID, DEFAULT_EXECUTION_ID)
physical.opExecInitInfo match {
case OpExecWithClassName(className, _) =>
className shouldBe "org.apache.texera.amber.operator.source.sql.mysql.MySQLSourceOpExec"
case other => fail(s"expected OpExecWithClassName, got $other")
}
physical.inputPorts.keySet shouldBe empty
physical.outputPorts.keySet shouldBe d.operatorInfo.outputPorts.map(_.id).toSet
an[UnsupportedOperationException] should be thrownBy
d.getPhysicalOp(DEFAULT_WORKFLOW_ID, DEFAULT_EXECUTION_ID)
Comment on lines 66 to +70
}

"MySQLSourceOpDesc" should "round-trip its config fields through the polymorphic base" in {
Expand Down
Loading