[FLINK-35180][python] Fix embedded (thread-mode) type converters#27927
[FLINK-35180][python] Fix embedded (thread-mode) type converters#27927polsinello wants to merge 1 commit intoapache:release-1.19from
Conversation
…temporal, numeric, and nested types
Pemja auto-converts Python datetime/date/time to java.sql.Timestamp/Date/Time,
but Flink's ExternalSerializer and DataFormatConverters.RowConverter expect the
modern java.time.* bridge classes (LocalDateTime, LocalDate, LocalTime, Instant).
In thread mode this mismatch causes ClassCastException at serialization boundaries.
Process mode is unaffected because its Beam-based runnerOutputTypeSerializer
resolves to the correct java.time.* serializers via LegacyTypeInfoDataTypeConverter.
This patch:
- Adds bridge-aware DataConverters for all temporal types on both the
Table API and DataStream paths, replacing IdentityDataConverter where
it silently passed java.sql.* through.
- TimeLocalTimeDataConverter.toExternalImpl returns LocalTime as-is
instead of java.sql.Time (second-precision only; silently drops
microseconds for TIME(3)+). DataStreamLocalTimeConverter mirrors this
on the Python side via explicit JLocalTime.of, preserving sub-second
precision across the JNI boundary.
- Fixes row/tuple buffer reuse that caused silent data corruption in
ARRAY<ROW<>>.
- Widens numeric DataConverter generics from Long/Double to Number to
avoid bridge-method ClassCastException under pemja coercion, and adds
null guards in toInternalImpl / toExternalImpl for Byte/Short/Int/Long.
- Caches the original Java ExternalTypeInfo to prevent lossy DECIMAL
precision round-trips through legacy TypeInfo conversion; adds
__getstate__ / __setstate__ so ExternalTypeInfo survives pickling
(the cached Java handle is dropped and rebuilt lazily on first access).
- Null-safety in temporal to_external paths.
c5df8a7 to
6f7be1a
Compare
dianfu
left a comment
There was a problem hiding this comment.
@polsinello Thanks a lot for the PR! Could you resubmit a PR target to release-1.20? The current target version release-1.19 isn't maintained any more and release-1.20 is the last version and also the LTS version which will be still maintained in the 1.x series.
For the PR itself, it's very good. Just left a few minor comments.
| /** | ||
| * Python Float will be converted to Double in PemJa, so we need FloatDataConverter to convert | ||
| * Java Double to internal Float. | ||
| * Accepts both Double (from pemja float→Double) and Long (from pemja int→Long) |
There was a problem hiding this comment.
The description seems confusing for me. What's the difference between IntDataConverter and FloatDataConverter?
There was a problem hiding this comment.
You're right — the docstrings were copy-paste that didn't reflect the widened Number generic, which made the four narrowing converters look interchangeable. Rewrote so each class now leads with its narrow target type.
| @Override | ||
| public DataConverter visit(DateType dateType) { | ||
| return new IdentityDataConverter<>(DataFormatConverters.DateConverter.INSTANCE); | ||
| return new DateLocalDateDataConverter(); |
There was a problem hiding this comment.
Could use DateLocalDateDataConverter.INSTANCE?
There was a problem hiding this comment.
Done — switched to DateLocalDateDataConverter.INSTANCE at the visit(DateType) call site; the INSTANCE was already defined.
| public static final class TimestampInstantDataConverter | ||
| extends DataConverter<TimestampData, java.time.Instant, Object> { | ||
|
|
||
| private final int precision; |
There was a problem hiding this comment.
Could be removed since it's not used
There was a problem hiding this comment.
Dropped the field + the this.precision = precision assignment; kept the constructor parameter since it still flows to the inner converter.
| public static final class TimestampLocalDateTimeDataConverter | ||
| extends DataConverter<TimestampData, java.time.LocalDateTime, Object> { | ||
|
|
||
| private final int precision; |
There was a problem hiding this comment.
Could be removed since it's not used
There was a problem hiding this comment.
Same as above — dropped the redundant local precision field; the inner DataFormatConverters.InstantConverter already stores its own copy.
|
|
||
| @Override | ||
| public DataConverter visit(TimeType timeType) { | ||
| return TimeDataConverter.INSTANCE; |
There was a problem hiding this comment.
TimeDataConverter is not used any more. We can remove it in this PR.
| delta = value - _dt.datetime(1970, 1, 1) | ||
| else: | ||
| delta = value - _dt.datetime(1970, 1, 1, tzinfo=_dt.timezone.utc) | ||
| epoch_s = int(delta.total_seconds()) |
There was a problem hiding this comment.
Review comments from AI:
converters.py line ~375. int() truncates towards zero, which gives incorrect epoch_s/nano pairs for negative timestamps (before 1970):
epoch_s = int(delta.total_seconds())
nano = (
delta.microseconds + (delta.total_seconds() - epoch_s) * 1_000_000
) * 1000
Example: for a datetime representing -99.3s from epoch:
- delta.total_seconds() = -99.3
- epoch_s = int(-99.3) = -99 (truncates towards zero, should be -100)
- nano computes to 400,000,000 instead of 700,000,000
- Result: Instant.ofEpochSecond(-99, 400000000) = -98.6s (wrong, should be -99.3s)
Fix: use int(delta.total_seconds() // 1) or math.floor(), or better yet, compute via integer microseconds:
total_us = int(delta / _dt.timedelta(microseconds=1))
epoch_s, us_rem = divmod(total_us, 1_000_000)
return JInstant.ofEpochSecond(epoch_s, us_rem * 1000)
There was a problem hiding this comment.
good catch, fixed by computing via integer microseconds with divmod, which floor-divides correctly for negative values:
total_us = delta // _dt.timedelta(microseconds=1)
epoch_s, us_rem = divmod(total_us, 1_000_000)
return JInstant.ofEpochSecond(epoch_s, us_rem * 1000)| if value is None: | ||
| return None | ||
|
|
||
| return list(super(ArrayDataConverter, self).to_internal(value)) |
There was a problem hiding this comment.
What's the purpose of this change? (tuple -> list)
There was a problem hiding this comment.
The asymmetry is driven by pemja's Python↔Java type-coercion table:
to_internal(Java → Python UDF arg) returnslistto match process-mode's Beam-based path — UDFs see mutable sequences in both modes, so code likearr.append(x) / arr[i] = vdoesn't
break only in thread mode. Before this change, thread mode returned atuple, which created a subtle UDF-visible divergence between modes.to_external(Python return → Java) staystuplebecause pemja coercestuple→Object[](what the downstream Java ArrayDataConverter expects), whereaslist→ArrayList, which would
breakArrayDataConverter.toExternalImpl's typed-array element loop.
I've added a class-level docstring on ArrayDataConverter documenting the asymmetry.
|
@dianfu thanks a lot for your review. I've addressed every comment, let me know if this is OK for you. I also opened a PR for release-1.20 with the new fix from your comments. Happy to close this one in favor of the 1.20 PR — whichever you prefer for keeping the history. |
|
@polsinello Thanks! Let's close this one and discuss on #27999. |
What is the purpose of the change
In embedded (thread) mode, Pemja's JNI bridge auto-converts Python
datetime/date/timeobjects tojava.sql.Timestamp/Date/Time, but Flink'sExternalSerializerandDataFormatConverters.RowConverterexpect the modernjava.time.*bridge classes (LocalDateTime,LocalDate,LocalTime,Instant). This mismatch causesClassCastExceptionat serialization boundaries for any UDF that returns or receives temporal types. Process mode is unaffected because its Beam-basedrunnerOutputTypeSerializerresolves to the correctjava.time.*serializers viaLegacyTypeInfoDataTypeConverter.This patch fixes several related type-conversion bugs in the embedded Python execution path so that all Flink-supported types work correctly in thread mode.
Brief change log
DataConverterimplementations for all temporal types (TIMESTAMP,DATE,TIME,TIMESTAMP_LTZ) on both the Table API (PythonTypeUtilsinflink-table-runtime) and DataStream (PythonTypeUtilsinflink-streaming-java) pathsIdentityDataConverterfor temporal types where it silently passedjava.sql.*objects through without conversionROW/TUPLEbuffer reuse inArrayDataConverterthat caused silent data corruption inARRAY<ROW<...>>resultsDataConvertergenerics fromLong/DoubletoNumberto avoid bridge-methodClassCastExceptionwhen Pemja returnsInteger/Floatfor small valuesExternalTypeInfoin the PythonExternalTypeInfowrapper to prevent lossyDECIMALprecision round-trips through legacyTypeInfoconversionVerifying this change
This change is already covered by existing tests — the existing PyFlink Table API and DataStream test suites exercise all affected type paths.
Does this pull request potentially affect one of the following parts:
@Public(Evolving): nojava.time.*factory call per value).Documentation