Skip to content

[FLINK-39062][table] Add APPLY_WATERMARK built-in function for flexible watermark assignment#27984

Draft
featzhang wants to merge 3 commits intoapache:masterfrom
featzhang:feature/FLINK-39062-clean
Draft

[FLINK-39062][table] Add APPLY_WATERMARK built-in function for flexible watermark assignment#27984
featzhang wants to merge 3 commits intoapache:masterfrom
featzhang:feature/FLINK-39062-clean

Conversation

@featzhang
Copy link
Copy Markdown
Member

What is the purpose of the change

This PR implements the APPLY_WATERMARK built-in function as proposed in the community discussion thread, enabling flexible watermark assignment on tables, views, and subqueries in Flink SQL.

Motivation:
Currently, Flink SQL requires watermarks to be defined in DDL (CREATE TABLE ... WITH WATERMARK FOR ...), which limits flexibility when:

  • Working with catalog tables without DDL modification permissions
  • Using views or complex subqueries where DDL is not applicable
  • Dynamically adjusting watermark strategies without schema changes

Solution:
Introduce APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column), watermark_expr) function that:

  • Accepts any table expression (base table, view, or subquery)
  • Assigns or overrides watermark on the specified rowtime column
  • Validates column existence and TIMESTAMP/TIMESTAMP_LTZ types at compile time

Brief change log

  • Add SqlApplyWatermarkFunction as a new built-in SQL function
  • Add LogicalApplyWatermarkRule to convert SQL function calls to logical plan nodes
  • Extend FlinkLogicalWatermarkAssigner to support SQL function path
  • Update StreamPhysicalWatermarkAssigner to integrate with existing watermark infrastructure
  • Add unit tests for function registration and validation

Verifying this change

This change added tests and can be verified as follows:

  • Added ApplyWatermarkFunctionTest for function registration and operand validation
  • Existing watermark-related tests still pass (DDL-based watermarks remain unchanged)
  • Manual verification with example queries (see below)

Example Usage

-- Apply watermark to a catalog table
SELECT * FROM APPLY_WATERMARK(
  orders,
  DESCRIPTOR(order_time),
  order_time - INTERVAL '5' SECOND
);

-- Override watermark on a view
CREATE VIEW recent_orders AS SELECT * FROM orders WHERE order_time > CURRENT_TIMESTAMP - INTERVAL '1' DAY;

SELECT * FROM APPLY_WATERMARK(
  recent_orders,
  DESCRIPTOR(order_time),
  order_time - INTERVAL '10' SECOND
);

-- Use with subquery
SELECT * FROM APPLY_WATERMARK(
  (SELECT * FROM orders WHERE amount > 100),
  DESCRIPTOR(order_time),
  order_time - INTERVAL '3' SECOND
);

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no (internal planner API only)
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? not documented yet (will add docs in follow-up PR after initial review)

Discussion Thread

https://lists.apache.org/thread/oonylk4h8dnsom40g8rr5k52zf3tz64v

…le watermark assignment

This commit implements the APPLY_WATERMARK built-in function as proposed in FLIP-XXX,
enabling flexible watermark assignment on tables, views, and subqueries in Flink SQL.

**Motivation:**
Currently, Flink SQL requires watermarks to be defined in DDL, which limits flexibility when:
- Working with catalog tables without DDL modification permissions
- Using views or complex subqueries
- Dynamically adjusting watermark strategies

**Solution:**
Introduce `APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column), watermark_expr)` function that:
- Accepts any table expression (base table, view, or subquery)
- Assigns or overrides watermark on the specified rowtime column
- Validates column existence and TIMESTAMP/TIMESTAMP_LTZ types

**Implementation:**

1. **SQL Layer** (`SqlApplyWatermarkFunction.java`):
   - Registers as built-in function in `FlinkSqlOperatorTable`
   - Implements DESCRIPTOR pattern for column specification
   - Performs compile-time validation (column existence, type checking)
   - Returns table type with updated time attributes

2. **Logical Planning** (`FlinkLogicalWatermarkAssigner.scala`, `LogicalApplyWatermarkRule.java`):
   - Extends existing `FlinkLogicalWatermarkAssigner` node
   - Converts SQL APPLY_WATERMARK calls to logical watermark nodes
   - Preserves watermark semantics from DDL-based assignments

3. **Physical Planning** (`StreamPhysicalWatermarkAssigner.scala`):
   - Updates physical watermark assigner for SQL function path
   - Integrates with existing `StreamExecWatermarkAssigner` infrastructure
   - Maintains compatibility with DDL-defined watermarks

4. **Testing** (`ApplyWatermarkFunctionTest.java`):
   - Unit tests for function registration and validation
   - Type checking tests for TIMESTAMP/TIMESTAMP_LTZ
   - Column existence validation tests

**Example Usage:**
```sql
-- Apply watermark to a catalog table
SELECT * FROM APPLY_WATERMARK(
  orders,
  DESCRIPTOR(order_time),
  order_time - INTERVAL '5' SECOND
);

-- Override watermark on a view
SELECT * FROM APPLY_WATERMARK(
  recent_orders_view,
  DESCRIPTOR(event_time),
  event_time - INTERVAL '10' SECOND
);
```

**Design Discussion:**
https://lists.apache.org/thread/oonylk4h8dnsom40g8rr5k52zf3tz64v

**Related Issues:**
- Closes FLINK-39062

**Author:** FeatZhang <featzhang@apache.org>
@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 21, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@featzhang featzhang force-pushed the feature/FLINK-39062-clean branch from 84c7406 to 385b1e5 Compare April 21, 2026 13:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants