diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index d285d83059653..f0e4bedba174e 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -1595,7 +1595,7 @@ def load_plan(self, plan_reference: PlanReference) -> CompiledPlan: """ return CompiledPlan( j_compiled_plan=self._j_tenv.loadPlan(plan_reference._j_plan_reference), - t_env=self._j_tenv + t_env=self ) def compile_plan_sql(self, stmt: str) -> CompiledPlan: @@ -1621,7 +1621,7 @@ def compile_plan_sql(self, stmt: str) -> CompiledPlan: .. versionadded:: 2.1.0 """ - return CompiledPlan(j_compiled_plan=self._j_tenv.compilePlanSql(stmt), t_env=self._j_tenv) + return CompiledPlan(j_compiled_plan=self._j_tenv.compilePlanSql(stmt), t_env=self) def execute_plan(self, plan_reference: PlanReference) -> TableResult: """ diff --git a/flink-python/pyflink/table/tests/test_compiled_plan.py b/flink-python/pyflink/table/tests/test_compiled_plan.py index fe0675e5939bf..287cda3e36bb5 100644 --- a/flink-python/pyflink/table/tests/test_compiled_plan.py +++ b/flink-python/pyflink/table/tests/test_compiled_plan.py @@ -17,6 +17,7 @@ ################################################################################ import os.path import re +import uuid from pathlib import Path from pyflink.table import Schema, DataTypes, TableDescriptor, PlanReference @@ -26,6 +27,10 @@ JSON_PLAN_DIR = os.path.join(THIS_DIR, "jsonplan") +def generate_random_table_name(): + return "Table{0}".format(str(uuid.uuid1()).replace("-", "_")) + + def _replace_flink_version(plan: str) -> str: """ Ignore the value for the Flink Version in the compiled plan. @@ -97,6 +102,47 @@ def test_write_load_compiled_plan(self): compiled_plan.as_json_string(), compiled_plan_from_string.as_json_string() ) + def test_load_plan_execute(self): + schema = Schema.new_builder().column("f0", DataTypes.STRING()).build() + table = self.t_env.from_descriptor( + TableDescriptor.for_connector("datagen") + .option("number-of-rows", "5") + .schema(schema) + .build() + ) + self.t_env.create_temporary_table( + "Sink", + TableDescriptor.for_connector("blackhole").schema(schema).build(), + ) + + compiled_plan = table.insert_into("Sink").compile_plan() + plan_path = Path(self.tempdir + "/plan.json") + compiled_plan.write_to_file(plan_path) + + loaded_plan = self.t_env.load_plan(PlanReference.from_file(plan_path)) + loaded_plan.execute().wait() + + def test_compile_plan_sql_execute(self): + source_table = generate_random_table_name() + sink_table = generate_random_table_name() + + src = f""" + CREATE TABLE {source_table} (a BIGINT, b INT, c VARCHAR) + WITH ('connector' = 'datagen', 'number-of-rows' = '5') + """ + self.t_env.execute_sql(src) + + sink = f""" + CREATE TABLE {sink_table} (a BIGINT, b INT, c VARCHAR) + WITH ('connector' = 'blackhole') + """ + self.t_env.execute_sql(sink) + + compiled_plan = self.t_env.compile_plan_sql( + f"INSERT INTO {sink_table} SELECT * FROM {source_table}" + ) + compiled_plan.execute().wait() + if __name__ == "__main__": import unittest