[FLINK-39062][table] Add APPLY_WATERMARK built-in function for flexible watermark assignment#27984
Draft
featzhang wants to merge 3 commits intoapache:masterfrom
Draft
[FLINK-39062][table] Add APPLY_WATERMARK built-in function for flexible watermark assignment#27984featzhang wants to merge 3 commits intoapache:masterfrom
featzhang wants to merge 3 commits intoapache:masterfrom
Conversation
…le watermark assignment This commit implements the APPLY_WATERMARK built-in function as proposed in FLIP-XXX, enabling flexible watermark assignment on tables, views, and subqueries in Flink SQL. **Motivation:** Currently, Flink SQL requires watermarks to be defined in DDL, which limits flexibility when: - Working with catalog tables without DDL modification permissions - Using views or complex subqueries - Dynamically adjusting watermark strategies **Solution:** Introduce `APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column), watermark_expr)` function that: - Accepts any table expression (base table, view, or subquery) - Assigns or overrides watermark on the specified rowtime column - Validates column existence and TIMESTAMP/TIMESTAMP_LTZ types **Implementation:** 1. **SQL Layer** (`SqlApplyWatermarkFunction.java`): - Registers as built-in function in `FlinkSqlOperatorTable` - Implements DESCRIPTOR pattern for column specification - Performs compile-time validation (column existence, type checking) - Returns table type with updated time attributes 2. **Logical Planning** (`FlinkLogicalWatermarkAssigner.scala`, `LogicalApplyWatermarkRule.java`): - Extends existing `FlinkLogicalWatermarkAssigner` node - Converts SQL APPLY_WATERMARK calls to logical watermark nodes - Preserves watermark semantics from DDL-based assignments 3. **Physical Planning** (`StreamPhysicalWatermarkAssigner.scala`): - Updates physical watermark assigner for SQL function path - Integrates with existing `StreamExecWatermarkAssigner` infrastructure - Maintains compatibility with DDL-defined watermarks 4. **Testing** (`ApplyWatermarkFunctionTest.java`): - Unit tests for function registration and validation - Type checking tests for TIMESTAMP/TIMESTAMP_LTZ - Column existence validation tests **Example Usage:** ```sql -- Apply watermark to a catalog table SELECT * FROM APPLY_WATERMARK( orders, DESCRIPTOR(order_time), order_time - INTERVAL '5' SECOND ); -- Override watermark on a view SELECT * FROM APPLY_WATERMARK( recent_orders_view, DESCRIPTOR(event_time), event_time - INTERVAL '10' SECOND ); ``` **Design Discussion:** https://lists.apache.org/thread/oonylk4h8dnsom40g8rr5k52zf3tz64v **Related Issues:** - Closes FLINK-39062 **Author:** FeatZhang <featzhang@apache.org>
Collaborator
84c7406 to
385b1e5
Compare
…est: pass TableConfig to streamTestUtil()
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
This PR implements the
APPLY_WATERMARKbuilt-in function as proposed in the community discussion thread, enabling flexible watermark assignment on tables, views, and subqueries in Flink SQL.Motivation:
Currently, Flink SQL requires watermarks to be defined in DDL (
CREATE TABLE ... WITH WATERMARK FOR ...), which limits flexibility when:Solution:
Introduce
APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column), watermark_expr)function that:Brief change log
SqlApplyWatermarkFunctionas a new built-in SQL functionLogicalApplyWatermarkRuleto convert SQL function calls to logical plan nodesFlinkLogicalWatermarkAssignerto support SQL function pathStreamPhysicalWatermarkAssignerto integrate with existing watermark infrastructureVerifying this change
This change added tests and can be verified as follows:
ApplyWatermarkFunctionTestfor function registration and operand validationExample Usage
Does this pull request potentially affect one of the following parts:
@Public(Evolving): no (internal planner API only)Documentation
Discussion Thread
https://lists.apache.org/thread/oonylk4h8dnsom40g8rr5k52zf3tz64v