[FLINK-39377][table] Add initial implementation of ProcessTableFunctionTestHarness#27928
[FLINK-39377][table] Add initial implementation of ProcessTableFunctionTestHarness#27928autophagy wants to merge 3 commits intoapache:masterfrom
Conversation
…onTestHarness This commit introduces an initial implementation of a test harness for PTFs, for use in unit tests that do not require a running Flink cluster. The implementation supports setting up the harness by configuration various test parameters, like fixtures for scalar arguments, datatypes for table arguments and partition settings for table arguments with set semantics. The harness on build does type and structure validation, as well as ensuring the test setup can handle the arguments defined on the PTF. It supports PTFs that use scalar, set semantic table and row semantic table arguments, as well as PTFs that have multiple of each. It supports PASS_COLUMN_THROUGH and OPTIONAL_PARTITION_BY traits. It currently does not support State, or Context (so no timers). It also does not enforce some static argument traits like SUPPORTS_UPDATES, REQUIRE_UPDATE_BEFORE.
spuru9
left a comment
There was a problem hiding this comment.
Some nits from early review. PTAL
|
@spuru9 Thank you for the review! I've pushed up a couple of commits to address your comments, and thank you for the prompt to get me to sort of my IDE so it doesn't do that kind of inline import anymore 😅 |
| under the License. | ||
| --> | ||
|
|
||
| # Testing Process Table Functions |
There was a problem hiding this comment.
Can we move this to:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/ptfs/
I know this page is already long, but it is also very nice to everything you need to know around PTFs in one page.
| # Testing Process Table Functions | ||
|
|
||
| The `ProcessTableFunctionTestHarness` provides a lightweight unit testing framework for Process Table | ||
| Functions (PTFs). It allows you to test PTF logic without starting a full Flink cluster. It is useful |
There was a problem hiding this comment.
PTF can also run locally without a full Flink cluster. This is more for unit testing.
| Functions (PTFs). It allows you to test PTF logic without starting a full Flink cluster. It is useful | |
| Functions (PTFs). It allows you to unit test PTF logic. It is useful |
| @Test | ||
| void testPassthrough() throws Exception { | ||
| try (ProcessTableFunctionTestHarness<Row> harness = | ||
| ProcessTableFunctionTestHarness.ofClass(PassthroughPTF.class) |
There was a problem hiding this comment.
nit: use two spaces in examples to keep the page readable and lines short
| public class FilterPTF extends ProcessTableFunction<Row> { | ||
| public void eval( | ||
| @ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input, | ||
| @ArgumentHint(ArgumentTrait.SCALAR) int threshold) { |
There was a problem hiding this comment.
For simplification, this is not necessary
| @ArgumentHint(ArgumentTrait.SCALAR) int threshold) { | |
| int threshold) { |
| {{< tab "Java" >}} | ||
| ```java | ||
| @DataTypeHint("ROW<value INT>") | ||
| public class FilterPTF extends ProcessTableFunction<Row> { |
There was a problem hiding this comment.
nit: do we also want to update at least one of the examples to non-Row. Just to show that also ProcessTableFunction<Integer> in this example is possible and is auto wrapped in the output.
| .withScalarArgument("b", 7) | ||
| .build()) { | ||
|
|
||
| harness.invoke(); // Use invoke() instead of processElement() |
There was a problem hiding this comment.
Maybe better naming?
| harness.invoke(); // Use invoke() instead of processElement() | |
| harness.process(); |
What is the error behavior if you call this method but there are actually table args?
| public int age; | ||
| } | ||
|
|
||
| @DataTypeHint("ROW<name STRING, age INT>") |
There was a problem hiding this comment.
| @DataTypeHint("ROW<name STRING, age INT>") | |
| // no annotation necessary, all types can be derived from class information |
| void testPOJO() throws Exception { | ||
| try (ProcessTableFunctionTestHarness<Customer> harness = | ||
| ProcessTableFunctionTestHarness.ofClass(CustomerPTF.class) | ||
| .withTableArgument("input", DataTypes.of(Customer.class)) |
There was a problem hiding this comment.
Should not not be necessary if the type is explicit in the function?
| .withTableArgument("input", DataTypes.of(Customer.class)) | |
| .withTableArgument("input") |
| public Optional<DataType> dataType = Optional.empty(); | ||
|
|
||
| public Optional<LogicalType> logicalType = Optional.empty(); | ||
|
|
||
| public Optional<Class<?>> expectedClass = Optional.empty(); |
There was a problem hiding this comment.
There is a lot of unnecessary code. How about we call this TestHarnessDataTypeFactory and clean it up. We should inform the user if certain methods are not implemented. Also dummyRaw and dummySerializer are not needed.
| import org.apache.flink.table.annotation.ArgumentTrait; | ||
| import org.apache.flink.table.annotation.StateHint; | ||
| import org.apache.flink.table.catalog.DataTypeFactory; | ||
| import org.apache.flink.table.data.conversion.DataStructureConverter; |
There was a problem hiding this comment.
Important: The harness should be a light-weight Maven module that only depends on flink-table-common. Currently this module has a full planner and runtime dependency. How about we repurpose this module and potentially move TableAssertionTest to the planner module. After that we can simplify the deps for test-utils. This also means that DataStructureConverter is not available anymore. For this, let's introduce a new module flink-table/flink-table-type-utils. Introducing this module was under discussion a couple of times, we should do it now. Ideally under a new Jira ticket that gets merged before this PR.
| for (int colIdx = 0; colIdx < expectedPartitionColumnCount; colIdx++) { | ||
| String firstColName = first.partitionColumnNames[colIdx]; | ||
| String currentColName = current.partitionColumnNames[colIdx]; | ||
| DataType firstColType = extractPartitionColumnType(first, firstColName); |
There was a problem hiding this comment.
The situation of StructuredType needs to be considered
| */ | ||
| public Builder<OUT> withScalarArgument(String argumentName, Object value) { | ||
| checkNotNull(argumentName, "argumentName must not be null"); | ||
| checkNotNull(value, "value must not be null"); |
There was a problem hiding this comment.
Why is it restricted here that argumentName and value cannot be null, while allowing eval input to be nullable.
What is the purpose of the change
This change introduces an initial implementation of a test harness for PTFs, according to FLIP-567, for use in unit tests that do not require a running Flink cluster.
At a basic level, the harness allows users to set up test conditions with a builder API, which on build performs both test and PTF validation, and provides an auto-closable harness (that manages open/close). With this harness, users are able to pipe input rows into their PTF, and observe the collected output.
A motivating example:
It currently supports:
PASS_COLUMNS_THROUGHandOPTIONAL_PARTITION_BYargument traits.The harness currently does not support the following, which will be added in subsequent PRs:
SUPPORTS_UPDATES,REQUIRE_UPDATE_BEFOREThe missing feature set has been documented, along with some quickstart examples.
Brief change log
(for example:)
ProcessTableFunctionTestHarnessVerifying this change
This change added tests and can be verified as follows:
ProcessTableFunctionTestHarnessTest.Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes)Documentation