fix(python-worker): coerce integral floats to int for INT/LONG fields#6053
fix(python-worker): coerce integral floats to int for INT/LONG fields#6053eugenegujing wants to merge 2 commits into
Conversation
Problem: pandas-based Python operators (e.g. Sort via TableOperator) build a DataFrame from input tuples. When an INT/LONG column contains nulls, pandas promotes the whole column to float64 because an int column cannot hold NaN, so 119 becomes 119.0. On output, the worker's strict schema validation in Tuple.finalize() then fails with "TypeError: Unmatched type for field 'weight', expected AttributeType.INT, got 119.0 (<class 'float'>) instead." This crashed every workflow whose CSV had an integer column with at least one missing value, forcing users to manually insert a Type Casting operator. Fix (Tuple.cast_to_schema only; validate_schema unchanged): when the target type is INT or LONG and the value is a float (including np.float64) with a zero fractional part, cast it back to int, but only when the result is provably the original integer: - INT window: Arrow int32 capacity [-2^31, 2^31 - 1]. int32 values are always exactly representable in float64, so capacity is the only constraint. - LONG window: the float64 exact-integer range [-(2^53) + 1, 2^53 - 1] instead of int64 capacity. Above 2^53 float64 rounds, so the received float may already be a corrupted rendition of the original integer; coercing it would turn a loud validation error into silent data corruption. - The endpoint 2^53 is excluded because it is ambiguous: 2^53 + 1 also rounds to float 2^53. - The range check compares the converted int, not floats, to avoid rounding at the endpoints. - Non-integral, infinite, and out-of-window floats are left untouched so validate_schema() still rejects them: lossy coercion must never happen silently. - An out-of-window integral float additionally logs an actionable warning suggesting a cast to STRING or DOUBLE (or LONG for large integers in an INT field). Also fixes a pre-existing stale-variable bug exposed by restructuring the if-chain: a NaN destined for a BINARY field was first set to None and then re-pickled from the stale local variable, producing pickled-NaN bytes instead of None. NaN in a BINARY field now correctly finalizes to None (guarded by a dedicated test). Tests: 34 new cases in test_tuple.py: - coercion cases incl. int32 and float64-exact-window boundaries and np.float64 - rejection of non-integral / infinite / out-of-window floats, plus the out-of-window warning - NaN/None handling; DOUBLE and STRING fields stay untouched - coercion pinned into cast_to_schema rather than validate_schema - an integration-style test reproducing the full pipeline (Table.from_tuple_likes -> float64 promotion -> as_tuples -> finalize) Fixes apache#5935
Automated Reviewer SuggestionsBased on the
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #6053 +/- ##
============================================
+ Coverage 56.62% 56.76% +0.13%
Complexity 3019 3019
============================================
Files 1122 1122
Lines 43200 43367 +167
Branches 4648 4648
============================================
+ Hits 24464 24618 +154
- Misses 17299 17312 +13
Partials 1437 1437
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
| config | throughput | MB/s | latency | max Δ latest / 7d | |
|---|---|---|---|---|---|
| 🔴 | bs=10 sw=10 sl=64 | 417 | 0.254 | 22,320/31,971/31,971 us | 🔴 -6.3% / 🔴 +116.3% |
| 🟢 | bs=100 sw=10 sl=64 | 932 | 0.569 | 106,462/125,400/125,400 us | 🟢 -18.5% / 🔴 +17.3% |
| ⚪ | bs=1000 sw=10 sl=64 | 1,109 | 0.677 | 896,534/950,287/950,287 us | ⚪ within ±5% / 🟢 -9.8% |
Baseline details
Latest main 878eb8a from same runner
| config | metric | PR | latest main | 7d avg | Δ latest | Δ 7d |
|---|---|---|---|---|---|---|
| bs=10 sw=10 sl=64 | throughput | 417 tuples/sec | 443 tuples/sec | 786.27 tuples/sec | -5.9% | -47.0% |
| bs=10 sw=10 sl=64 | MB/s | 0.254 MB/s | 0.271 MB/s | 0.48 MB/s | -6.3% | -47.1% |
| bs=10 sw=10 sl=64 | p50 | 22,320 us | 22,679 us | 12,495 us | -1.6% | +78.6% |
| bs=10 sw=10 sl=64 | p95 | 31,971 us | 33,075 us | 14,784 us | -3.3% | +116.3% |
| bs=10 sw=10 sl=64 | p99 | 31,971 us | 33,075 us | 18,468 us | -3.3% | +73.1% |
| bs=100 sw=10 sl=64 | throughput | 932 tuples/sec | 959 tuples/sec | 991.49 tuples/sec | -2.8% | -6.0% |
| bs=100 sw=10 sl=64 | MB/s | 0.569 MB/s | 0.585 MB/s | 0.605 MB/s | -2.7% | -6.0% |
| bs=100 sw=10 sl=64 | p50 | 106,462 us | 102,376 us | 100,929 us | +4.0% | +5.5% |
| bs=100 sw=10 sl=64 | p95 | 125,400 us | 153,844 us | 106,894 us | -18.5% | +17.3% |
| bs=100 sw=10 sl=64 | p99 | 125,400 us | 153,844 us | 114,085 us | -18.5% | +9.9% |
| bs=1000 sw=10 sl=64 | throughput | 1,109 tuples/sec | 1,106 tuples/sec | 1,023 tuples/sec | +0.3% | +8.4% |
| bs=1000 sw=10 sl=64 | MB/s | 0.677 MB/s | 0.675 MB/s | 0.624 MB/s | +0.3% | +8.4% |
| bs=1000 sw=10 sl=64 | p50 | 896,534 us | 907,054 us | 983,835 us | -1.2% | -8.9% |
| bs=1000 sw=10 sl=64 | p95 | 950,287 us | 942,998 us | 1,023,777 us | +0.8% | -7.2% |
| bs=1000 sw=10 sl=64 | p99 | 950,287 us | 942,998 us | 1,053,883 us | +0.8% | -9.8% |
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,480.10,200,128000,417,0.254,22319.72,31970.83,31970.83
1,100,10,64,20,2145.05,2000,1280000,932,0.569,106461.90,125399.66,125399.66
2,1000,10,64,20,18027.17,20000,12800000,1109,0.677,896534.23,950286.80,950286.80Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
|
/request-review @aicam |
What changes were proposed in this PR?
Problem. Pandas-based Python operators (e.g. Sort via
TableOperator) build a DataFrame from input tuples. When an INT/LONG column contains nulls, pandas promotes the whole column to float64 because an int column cannot hold NaN, so119becomes119.0. On output, the worker's strict schema validation inTuple.finalize()then fails withTypeError: Unmatched type for field 'weight', expected AttributeType.INT, got 119.0 (<class 'float'>) instead.This crashed every workflow whose CSV had an integer column with at least one missing value, and the only workaround was manually inserting a Type Casting operator for each affected column.Why fix it in the Python worker (option (a) of the issue). The CSV schema inference is correct (an all-integer column with nulls is INTEGER; the JVM side handles null ints fine), and a UI per-column override (option (c)) would not remove the crash. The type contract is broken by pandas at the Python-worker boundary, so the fix belongs at that boundary's single chokepoint:
Tuple.cast_to_schema(), which already performs safe casts (NaN -> None, object -> pickled bytes) right beforevalidate_schema().The fix (in
Tuple.cast_to_schema()only;validate_schema()is unchanged): when the target type is INT or LONG and the value is a float (includingnp.float64) with a zero fractional part, cast it back to int — but only when the result is provably the original integer:[-2^31, 2^31 - 1]. int32 values are always exactly representable in float64, so capacity is the only constraint.[-(2^53) + 1, 2^53 - 1]instead of int64 capacity. Above 2^53, float64 rounds, so the received float may already be a corrupted rendition of the original integer; coercing it would turn a loud validation error into silent data corruption. The endpoint 2^53 itself is excluded because it is ambiguous (2^53 + 1also rounds to float2^53).validate_schema()still rejects them: lossy coercion must never happen silently. An out-of-window integral float additionally logs an actionable warning suggesting a cast to STRING or DOUBLE (or LONG for large integers in an INT field).Deliberate behavior change for reviewers to note. Restructuring the if-chain in
cast_to_schema()also fixes a pre-existing stale-variable bug: a NaN destined for a BINARY field was first set to None and then re-pickled from the stale local variable, producing pickled-NaN bytes instead of None. NaN in a BINARY field now correctly finalizes to None (guarded by a dedicated test).The changed logic in
core/models/tuple.py. A new module-level constant defines the safely coercible window per integral type:cast_to_schema()'s per-field loop is restructured from two independentifs into mutually exclusive branches (null handling / integral-float coercion / BINARY pickling), which both hosts the new coercion and eliminates the stale-variable read described above:The outer per-field
try/except(keep the value unchanged if a cast fails, continue with the next field) is preserved, andvalidate_schema()is untouched, so anything the coercion deliberately skips still fails validation loudly.Any related issues, documentation, discussions?
Fixes #5935
How was this PR tested?
TDD: the tests were written first and confirmed to reproduce the crash (red), then the fix turned them green.
amber/src/test/python/core/models/test_tuple.py(59 total in the file, all passing): coercion cases including the int32 and float64-exact-window boundaries andnp.float64; rejection of non-integral / infinite / out-of-window floats; the out-of-window warning; NaN/None handling; DOUBLE and STRING fields staying untouched; tests pinning the coercion intocast_to_schemarather thanvalidate_schema; and an integration-style test reproducing the full pipeline (Table.from_tuple_likes-> float64 promotion ->as_tuples->finalize).cd amber && pytest -m "not integration"— all pass.ruff checkandruff format --checkclean on both changed files.sbt "scalafixAll --check"andsbt scalafmtCheckAllpass.AMBER_TEST_FILTER=skip-integration sbt test: the full suite passes — 0 failed, 0 aborted (WorkflowCore 1570, amber 1076, all other service modules green). Run against a clean iceberg catalog, matching how CI provisions one per run.Table->finalizecode path the worker uses.Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Fable 5)