Skip to content

[FLINK-39377][table] Add initial implementation of ProcessTableFunctionTestHarness#27928

Open
autophagy wants to merge 3 commits intoapache:masterfrom
autophagy:FLINK-39377
Open

[FLINK-39377][table] Add initial implementation of ProcessTableFunctionTestHarness#27928
autophagy wants to merge 3 commits intoapache:masterfrom
autophagy:FLINK-39377

Conversation

@autophagy
Copy link
Copy Markdown
Contributor

@autophagy autophagy commented Apr 14, 2026

What is the purpose of the change

This change introduces an initial implementation of a test harness for PTFs, according to FLIP-567, for use in unit tests that do not require a running Flink cluster.

At a basic level, the harness allows users to set up test conditions with a builder API, which on build performs both test and PTF validation, and provides an auto-closable harness (that manages open/close). With this harness, users are able to pipe input rows into their PTF, and observe the collected output.

A motivating example:

@DataTypeHint("ROW<value INT>")
public class FilterPTF extends ProcessTableFunction<Row> {
    public void eval(
            @ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input,
            @ArgumentHint(ArgumentTrait.SCALAR) int threshold) {
        int value = input.getFieldAs("value");
        if (value > threshold) {
            collect(Row.of(value));
        }
    }
}

@Test
void testFilter() throws Exception {
    try (ProcessTableFunctionTestHarness<Row> harness =
            ProcessTableFunctionTestHarness.ofClass(FilterPTF.class)
                    .withTableArgument("input", DataTypes.of("ROW<value INT>"))
                    .withScalarArgument("threshold", 50)  // Configure scalar value
                    .build()) {

        harness.processElement(Row.of(30));
        harness.processElement(Row.of(70));

        List<Row> output = harness.getOutput();
        assertThat(output).containsExactly(Row.of(70));
    }
}

It currently supports:

  • Row-semantic tables (ROW_SEMANTIC_TABLE)
  • Set-semantic tables (SET_SEMANTIC_TABLE) with partition-by configuration
  • Scalar arguments (including scalar-only PTFs via invoke())
  • Multi-table PTFs with per-table element routing
  • Inline type annotations via @ArgumentHint(type = ...)
  • Supports PTFs that use Rows and structured types as their input and output types.
  • PASS_COLUMNS_THROUGH and OPTIONAL_PARTITION_BY argument traits.

The harness currently does not support the following, which will be added in subsequent PRs:

  • State with @StateHint
  • Timers (and the use of Context in general)
  • Update traits like SUPPORTS_UPDATES, REQUIRE_UPDATE_BEFORE

The missing feature set has been documented, along with some quickstart examples.

Brief change log

(for example:)

  • Added ProcessTableFunctionTestHarness
  • Added tests to validate behaviour of PTF test harness
  • Added initial user facing documentation

Verifying this change

This change added tests and can be verified as follows:

  • Added behaviour tests via ProcessTableFunctionTestHarnessTest.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (docs / JavaDocs)

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 14, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@autophagy autophagy marked this pull request as ready for review April 15, 2026 07:13
…onTestHarness

This commit introduces an initial implementation of a test harness for PTFs, for use
in unit tests that do not require a running Flink cluster.

The implementation supports setting up the harness by configuration various test
parameters, like fixtures for scalar arguments, datatypes for table arguments and
partition settings for table arguments with set semantics.

The harness on build does type and structure validation, as well as ensuring the
test setup can handle the arguments defined on the PTF.

It supports PTFs that use scalar, set semantic table and row semantic table arguments,
as well as PTFs that have multiple of each. It supports PASS_COLUMN_THROUGH and
OPTIONAL_PARTITION_BY traits.

It currently does not support State, or Context (so no timers). It also does not enforce
some static argument traits like SUPPORTS_UPDATES, REQUIRE_UPDATE_BEFORE.
Copy link
Copy Markdown
Contributor

@spuru9 spuru9 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some nits from early review. PTAL

Comment thread docs/content/docs/dev/table/testing/ptf_test_harness.md
@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label Apr 15, 2026
@autophagy
Copy link
Copy Markdown
Contributor Author

@spuru9 Thank you for the review! I've pushed up a couple of commits to address your comments, and thank you for the prompt to get me to sort of my IDE so it doesn't do that kind of inline import anymore 😅

under the License.
-->

# Testing Process Table Functions
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this to:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/ptfs/

I know this page is already long, but it is also very nice to everything you need to know around PTFs in one page.

# Testing Process Table Functions

The `ProcessTableFunctionTestHarness` provides a lightweight unit testing framework for Process Table
Functions (PTFs). It allows you to test PTF logic without starting a full Flink cluster. It is useful
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTF can also run locally without a full Flink cluster. This is more for unit testing.

Suggested change
Functions (PTFs). It allows you to test PTF logic without starting a full Flink cluster. It is useful
Functions (PTFs). It allows you to unit test PTF logic. It is useful

@Test
void testPassthrough() throws Exception {
try (ProcessTableFunctionTestHarness<Row> harness =
ProcessTableFunctionTestHarness.ofClass(PassthroughPTF.class)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use two spaces in examples to keep the page readable and lines short

public class FilterPTF extends ProcessTableFunction<Row> {
public void eval(
@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input,
@ArgumentHint(ArgumentTrait.SCALAR) int threshold) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For simplification, this is not necessary

Suggested change
@ArgumentHint(ArgumentTrait.SCALAR) int threshold) {
int threshold) {

{{< tab "Java" >}}
```java
@DataTypeHint("ROW<value INT>")
public class FilterPTF extends ProcessTableFunction<Row> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we also want to update at least one of the examples to non-Row. Just to show that also ProcessTableFunction<Integer> in this example is possible and is auto wrapped in the output.

.withScalarArgument("b", 7)
.build()) {

harness.invoke(); // Use invoke() instead of processElement()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better naming?

Suggested change
harness.invoke(); // Use invoke() instead of processElement()
harness.process();

What is the error behavior if you call this method but there are actually table args?

public int age;
}

@DataTypeHint("ROW<name STRING, age INT>")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@DataTypeHint("ROW<name STRING, age INT>")
// no annotation necessary, all types can be derived from class information

void testPOJO() throws Exception {
try (ProcessTableFunctionTestHarness<Customer> harness =
ProcessTableFunctionTestHarness.ofClass(CustomerPTF.class)
.withTableArgument("input", DataTypes.of(Customer.class))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not not be necessary if the type is explicit in the function?

Suggested change
.withTableArgument("input", DataTypes.of(Customer.class))
.withTableArgument("input")

Comment on lines +45 to +49
public Optional<DataType> dataType = Optional.empty();

public Optional<LogicalType> logicalType = Optional.empty();

public Optional<Class<?>> expectedClass = Optional.empty();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot of unnecessary code. How about we call this TestHarnessDataTypeFactory and clean it up. We should inform the user if certain methods are not implemented. Also dummyRaw and dummySerializer are not needed.

import org.apache.flink.table.annotation.ArgumentTrait;
import org.apache.flink.table.annotation.StateHint;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.data.conversion.DataStructureConverter;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important: The harness should be a light-weight Maven module that only depends on flink-table-common. Currently this module has a full planner and runtime dependency. How about we repurpose this module and potentially move TableAssertionTest to the planner module. After that we can simplify the deps for test-utils. This also means that DataStructureConverter is not available anymore. For this, let's introduce a new module flink-table/flink-table-type-utils. Introducing this module was under discussion a couple of times, we should do it now. Ideally under a new Jira ticket that gets merged before this PR.

for (int colIdx = 0; colIdx < expectedPartitionColumnCount; colIdx++) {
String firstColName = first.partitionColumnNames[colIdx];
String currentColName = current.partitionColumnNames[colIdx];
DataType firstColType = extractPartitionColumnType(first, firstColName);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The situation of StructuredType needs to be considered

*/
public Builder<OUT> withScalarArgument(String argumentName, Object value) {
checkNotNull(argumentName, "argumentName must not be null");
checkNotNull(value, "value must not be null");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it restricted here that argumentName and value cannot be null, while allowing eval input to be nullable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants