From 4dbce1f1abb10cad62a2b8a6b0fc279ff7154ec2 Mon Sep 17 00:00:00 2001 From: eugenegujing Date: Wed, 1 Jul 2026 15:45:24 -0700 Subject: [PATCH 1/5] fix(python-worker): coerce integral floats to int for INT/LONG fields Problem: pandas-based Python operators (e.g. Sort via TableOperator) build a DataFrame from input tuples. When an INT/LONG column contains nulls, pandas promotes the whole column to float64 because an int column cannot hold NaN, so 119 becomes 119.0. On output, the worker's strict schema validation in Tuple.finalize() then fails with "TypeError: Unmatched type for field 'weight', expected AttributeType.INT, got 119.0 () instead." This crashed every workflow whose CSV had an integer column with at least one missing value, forcing users to manually insert a Type Casting operator. Fix (Tuple.cast_to_schema only; validate_schema unchanged): when the target type is INT or LONG and the value is a float (including np.float64) with a zero fractional part, cast it back to int, but only when the result is provably the original integer: - INT window: Arrow int32 capacity [-2^31, 2^31 - 1]. int32 values are always exactly representable in float64, so capacity is the only constraint. - LONG window: the float64 exact-integer range [-(2^53) + 1, 2^53 - 1] instead of int64 capacity. Above 2^53 float64 rounds, so the received float may already be a corrupted rendition of the original integer; coercing it would turn a loud validation error into silent data corruption. - The endpoint 2^53 is excluded because it is ambiguous: 2^53 + 1 also rounds to float 2^53. - The range check compares the converted int, not floats, to avoid rounding at the endpoints. - Non-integral, infinite, and out-of-window floats are left untouched so validate_schema() still rejects them: lossy coercion must never happen silently. - An out-of-window integral float additionally logs an actionable warning suggesting a cast to STRING or DOUBLE (or LONG for large integers in an INT field). Also fixes a pre-existing stale-variable bug exposed by restructuring the if-chain: a NaN destined for a BINARY field was first set to None and then re-pickled from the stale local variable, producing pickled-NaN bytes instead of None. NaN in a BINARY field now correctly finalizes to None (guarded by a dedicated test). Tests: 34 new cases in test_tuple.py: - coercion cases incl. int32 and float64-exact-window boundaries and np.float64 - rejection of non-integral / infinite / out-of-window floats, plus the out-of-window warning - NaN/None handling; DOUBLE and STRING fields stay untouched - coercion pinned into cast_to_schema rather than validate_schema - an integration-style test reproducing the full pipeline (Table.from_tuple_likes -> float64 promotion -> as_tuples -> finalize) Fixes #5935 --- amber/src/main/python/core/models/tuple.py | 45 ++++- .../src/test/python/core/models/test_tuple.py | 155 +++++++++++++++++- 2 files changed, 194 insertions(+), 6 deletions(-) diff --git a/amber/src/main/python/core/models/tuple.py b/amber/src/main/python/core/models/tuple.py index 1493ec00333..73a9b48c573 100644 --- a/amber/src/main/python/core/models/tuple.py +++ b/amber/src/main/python/core/models/tuple.py @@ -34,6 +34,17 @@ from .schema.field import Field from .schema.schema import Schema +# Ranges within which an integral float can be safely cast back to int. +# INT is bounded by Arrow int32 capacity. LONG is bounded by the float64 +# exact-integer window rather than int64 capacity: above 2**53 float64 +# rounds, so the received float may already be a corrupted rendition of +# the original integer. The endpoint 2**53 itself is excluded because it +# is ambiguous (2**53 + 1 also rounds to float 2**53). +INTEGRAL_TYPE_RANGES = { + AttributeType.INT: (-(2**31), 2**31 - 1), + AttributeType.LONG: (-(2**53) + 1, 2**53 - 1), +} + @runtime_checkable class TupleLike(Protocol): @@ -303,9 +314,10 @@ def cast_to_schema(self, schema: Schema) -> None: """ Safely cast each field value to match the target schema. If failed, the value will stay not changed. - This current conducts two kinds of casts: + This current conducts three kinds of casts: 1. cast NaN to None; - 2. cast any object to bytes (using pickle). + 2. cast integral floats to int for INT/LONG fields; + 3. cast any object to bytes (using pickle). :param schema: The target Schema that describes the target AttributeType to cast. :return: @@ -317,10 +329,33 @@ def cast_to_schema(self, schema: Schema) -> None: # convert NaN to None to support null value conversion if checknull(field_value): self[field_name] = None - - if field_value is not None: + elif field_value is not None: field_type = schema.get_attr_type(field_name) - if field_type == AttributeType.BINARY and not isinstance( + if ( + field_type in INTEGRAL_TYPE_RANGES + and isinstance(field_value, float) + and field_value.is_integer() + ): + # pandas promotes an int column holding nulls to + # float64 (119 -> 119.0), so convert integral floats + # destined for INT/LONG back to int — but only within + # the safe range above; out-of-range floats are left + # unchanged so validation still fails. Compare on the + # int result to avoid float rounding at the endpoints. + min_value, max_value = INTEGRAL_TYPE_RANGES[field_type] + int_value = int(field_value) + if min_value <= int_value <= max_value: + self[field_name] = int_value + else: + logger.warning( + f"Field '{field_name}': integral float " + f"{field_value} is outside the safely coercible " + f"range of {field_type}; leaving it unchanged " + f"(schema validation will fail). Consider " + f"casting the column to STRING or DOUBLE (or " + f"LONG for large integers in an INT field)." + ) + elif field_type == AttributeType.BINARY and not isinstance( field_value, bytes ): self[field_name] = b"pickle " + pickle.dumps(field_value) diff --git a/amber/src/test/python/core/models/test_tuple.py b/amber/src/test/python/core/models/test_tuple.py index f786e88a19c..c95d42f3e7d 100644 --- a/amber/src/test/python/core/models/test_tuple.py +++ b/amber/src/test/python/core/models/test_tuple.py @@ -21,8 +21,9 @@ import pytest import numpy as np from copy import deepcopy +from loguru import logger as loguru_logger -from core.models import Tuple, ArrowTableTupleProvider +from core.models import Table, Tuple, ArrowTableTupleProvider from core.models.schema.schema import Schema @@ -152,6 +153,158 @@ def test_finalize_tuple(self): assert isinstance(tuple_["scores"], bytes) assert tuple_["height"] is None + # Pandas-based operators (e.g. TableOperator via Table.from_tuple_likes) + # promote an int column containing nulls to float64, so an INT field can + # arrive at finalize() as 119.0. finalize() must coerce such integral + # floats back to int when they fit the target type's range, while still + # rejecting non-integral, infinite, and out-of-range floats. + + @pytest.mark.parametrize( + "raw_value, expected", + [ + (119.0, 119), + (-3.0, -3), + (-0.0, 0), + # int32 boundaries are exactly representable as float64 + (2147483647.0, 2**31 - 1), + (-2147483648.0, -(2**31)), + # np.float64 subclasses float and must be coerced the same way + (np.float64(119.0), 119), + ], + ) + def test_finalize_coerces_integral_float_to_int(self, raw_value, expected): + tuple_ = Tuple({"weight": raw_value}) + tuple_.finalize(Schema(raw_schema={"weight": "INTEGER"})) + assert tuple_["weight"] == expected + assert type(tuple_["weight"]) is int + + @pytest.mark.parametrize( + "raw_value, expected", + [ + (3000000000.0, 3000000000), + # np.float64 subclasses float and must be coerced the same way + (np.float64(3000000000.0), 3000000000), + # boundaries of the float64 exact-integer window: every integer + # in [-(2**53) + 1, 2**53 - 1] maps to a unique float64 + (float(2**53 - 1), 2**53 - 1), + (float(-(2**53) + 1), -(2**53) + 1), + ], + ) + def test_finalize_coerces_integral_float_to_long(self, raw_value, expected): + tuple_ = Tuple({"count": raw_value}) + tuple_.finalize(Schema(raw_schema={"count": "LONG"})) + assert tuple_["count"] == expected + assert type(tuple_["count"]) is int + + def test_finalize_tuples_from_pandas_promoted_int_column(self): + # Mirrors the real pipeline: pandas promotes the INT column to + # float64 inside Table.from_tuple_likes because of the null row + # (119 -> 119.0, None -> NaN). finalize() must restore the int + # and map NaN back to None. + table = Table([{"weight": 119}, {"weight": None}]) + assert table["weight"].dtype == "float64" + schema = Schema(raw_schema={"weight": "INTEGER"}) + finalized = [] + for tuple_ in table.as_tuples(): + tuple_.finalize(schema) + finalized.append(tuple_) + assert finalized[0]["weight"] == 119 + assert type(finalized[0]["weight"]) is int + assert finalized[1]["weight"] is None + + @pytest.mark.parametrize( + "null_value", + [None, float("nan"), np.float64("nan")], + ids=["none", "nan", "np-nan"], + ) + def test_finalize_keeps_null_int_field_as_none(self, null_value): + tuple_ = Tuple({"weight": null_value}) + tuple_.finalize(Schema(raw_schema={"weight": "INTEGER"})) + assert tuple_["weight"] is None + + @pytest.mark.parametrize( + "attr_type, raw_value", + [ + # non-integral floats must never be silently truncated + ("INTEGER", 119.5), + ("LONG", 3.5), + ("INTEGER", float("inf")), + ("INTEGER", float("-inf")), + # integral floats outside the target range must not be coerced + # into ints that would overflow Arrow int32 + ("INTEGER", 3e9), + ("INTEGER", 2147483648.0), # int32 max + 1 + ("INTEGER", -2147483649.0), # int32 min - 1 + # for LONG, floats beyond the float64 exact-integer window must + # be rejected even though they fit int64: float64 rounds above + # 2**53, so the received float may already be a corrupted + # rendition of the original integer. The endpoint 2**53 itself + # is ambiguous (2**53 + 1 also rounds to float 2**53). + ("LONG", float(2**53)), + ("LONG", float(-(2**53))), + ("LONG", -9223372036854775808.0), # -(2**63), fits int64 + ("LONG", 9223372036854775808.0), # 2**63, above long max + ("LONG", 1e20), + # coercion only applies to INT/LONG targets + ("STRING", 119.0), + ], + ) + def test_finalize_rejects_uncoercible_float(self, attr_type, raw_value): + tuple_ = Tuple({"weight": raw_value}) + with pytest.raises(TypeError, match="Unmatched type"): + tuple_.finalize(Schema(raw_schema={"weight": attr_type})) + + def test_cast_to_schema_warns_on_out_of_range_integral_float(self): + # An integral float outside the coercible window must be left + # unchanged, and a guidance warning emitted: the follow-up + # validation error alone would not explain the pandas float64 + # promotion or how to work around it. + messages = [] + handler_id = loguru_logger.add(messages.append, level="WARNING") + try: + tuple_ = Tuple({"weight": 3e9}) + tuple_.cast_to_schema(Schema(raw_schema={"weight": "INTEGER"})) + finally: + loguru_logger.remove(handler_id) + assert tuple_["weight"] == 3e9 + assert type(tuple_["weight"]) is float + assert any("outside the safely coercible range" in str(m) for m in messages) + + @pytest.mark.parametrize("raw_value", [0.5, 2.0]) + def test_finalize_leaves_double_field_untouched(self, raw_value): + tuple_ = Tuple({"ratio": raw_value}) + tuple_.finalize(Schema(raw_schema={"ratio": "DOUBLE"})) + assert tuple_["ratio"] == raw_value + assert type(tuple_["ratio"]) is float + + def test_finalize_keeps_plain_int_unchanged(self): + tuple_ = Tuple({"weight": 119}) + tuple_.finalize(Schema(raw_schema={"weight": "INTEGER"})) + assert tuple_["weight"] == 119 + assert type(tuple_["weight"]) is int + + def test_cast_to_schema_coerces_integral_float(self): + # The coercion must live in cast_to_schema(), not validate_schema() + tuple_ = Tuple({"weight": 119.0}) + tuple_.cast_to_schema(Schema(raw_schema={"weight": "INTEGER"})) + assert tuple_["weight"] == 119 + assert type(tuple_["weight"]) is int + + def test_validate_schema_still_rejects_integral_float(self): + # validate_schema() alone must stay strict: coercing there instead + # of in cast_to_schema() would let unfinalized floats slip through + tuple_ = Tuple({"weight": 119.0}) + with pytest.raises(TypeError, match="Unmatched type"): + tuple_.validate_schema(Schema(raw_schema={"weight": "INTEGER"})) + + def test_finalize_maps_nan_in_binary_field_to_none(self): + # NaN in a BINARY field must become None, not a pickled NaN. + # Guards the cast_to_schema() branch structure: after the NaN->None + # conversion, the stale pre-conversion value must not be re-pickled. + tuple_ = Tuple({"payload": float("nan")}) + tuple_.finalize(Schema(raw_schema={"payload": "BINARY"})) + assert tuple_["payload"] is None + def test_hash(self): schema = Schema( raw_schema={ From 38415b152abb6e0bad489163f3e5446e90faa994 Mon Sep 17 00:00:00 2001 From: eugenegujing Date: Wed, 1 Jul 2026 16:41:16 -0700 Subject: [PATCH 2/5] ci: re-trigger checks (flaky ReconfigurationIntegrationSpec on macOS) Co-Authored-By: Claude Fable 5 From d345bee31f45c0c94940a41fb5844d2d6a9c76de Mon Sep 17 00:00:00 2001 From: eugenegujing Date: Fri, 3 Jul 2026 01:50:03 -0700 Subject: [PATCH 3/5] refactor(python-worker): address review comments on #6053 - test_tuple.py: use loguru's `logger` directly instead of aliasing it. - Move INTEGRAL_TYPE_RANGES next to the other per-AttributeType maps in models/schema/attribute_type.py; import it into tuple.py. --- .../python/core/models/schema/attribute_type.py | 11 +++++++++++ amber/src/main/python/core/models/tuple.py | 17 +++++------------ amber/src/test/python/core/models/test_tuple.py | 6 +++--- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/amber/src/main/python/core/models/schema/attribute_type.py b/amber/src/main/python/core/models/schema/attribute_type.py index 24d0745f41e..666ee69ede4 100644 --- a/amber/src/main/python/core/models/schema/attribute_type.py +++ b/amber/src/main/python/core/models/schema/attribute_type.py @@ -99,3 +99,14 @@ class AttributeType(Enum): datetime.datetime: AttributeType.TIMESTAMP, largebinary: AttributeType.LARGE_BINARY, } + +# Signed value ranges within which an integral float can be safely cast back +# to int. INT is bounded by Arrow int32 capacity. LONG is bounded by the +# float64 exact-integer window rather than int64 capacity: above 2**53 float64 +# rounds, so the received float may already be a corrupted rendition of the +# original integer. The endpoint 2**53 itself is excluded because it is +# ambiguous (2**53 + 1 also rounds to float 2**53). +INTEGRAL_TYPE_RANGES = { + AttributeType.INT: (-(2**31), 2**31 - 1), + AttributeType.LONG: (-(2**53) + 1, 2**53 - 1), +} diff --git a/amber/src/main/python/core/models/tuple.py b/amber/src/main/python/core/models/tuple.py index 73a9b48c573..1b2db2f198c 100644 --- a/amber/src/main/python/core/models/tuple.py +++ b/amber/src/main/python/core/models/tuple.py @@ -30,21 +30,14 @@ from typing_extensions import Protocol, runtime_checkable from core.models.type.large_binary import largebinary -from .schema.attribute_type import TO_PYOBJECT_MAPPING, AttributeType +from .schema.attribute_type import ( + INTEGRAL_TYPE_RANGES, + TO_PYOBJECT_MAPPING, + AttributeType, +) from .schema.field import Field from .schema.schema import Schema -# Ranges within which an integral float can be safely cast back to int. -# INT is bounded by Arrow int32 capacity. LONG is bounded by the float64 -# exact-integer window rather than int64 capacity: above 2**53 float64 -# rounds, so the received float may already be a corrupted rendition of -# the original integer. The endpoint 2**53 itself is excluded because it -# is ambiguous (2**53 + 1 also rounds to float 2**53). -INTEGRAL_TYPE_RANGES = { - AttributeType.INT: (-(2**31), 2**31 - 1), - AttributeType.LONG: (-(2**53) + 1, 2**53 - 1), -} - @runtime_checkable class TupleLike(Protocol): diff --git a/amber/src/test/python/core/models/test_tuple.py b/amber/src/test/python/core/models/test_tuple.py index c95d42f3e7d..3d61fb10f57 100644 --- a/amber/src/test/python/core/models/test_tuple.py +++ b/amber/src/test/python/core/models/test_tuple.py @@ -21,7 +21,7 @@ import pytest import numpy as np from copy import deepcopy -from loguru import logger as loguru_logger +from loguru import logger from core.models import Table, Tuple, ArrowTableTupleProvider from core.models.schema.schema import Schema @@ -260,12 +260,12 @@ def test_cast_to_schema_warns_on_out_of_range_integral_float(self): # validation error alone would not explain the pandas float64 # promotion or how to work around it. messages = [] - handler_id = loguru_logger.add(messages.append, level="WARNING") + handler_id = logger.add(messages.append, level="WARNING") try: tuple_ = Tuple({"weight": 3e9}) tuple_.cast_to_schema(Schema(raw_schema={"weight": "INTEGER"})) finally: - loguru_logger.remove(handler_id) + logger.remove(handler_id) assert tuple_["weight"] == 3e9 assert type(tuple_["weight"]) is float assert any("outside the safely coercible range" in str(m) for m in messages) From 7a5811b964fb16f8f902ebee8a4fffe68312b485 Mon Sep 17 00:00:00 2001 From: eugenegujing Date: Fri, 3 Jul 2026 01:59:31 -0700 Subject: [PATCH 4/5] refactor(python-worker): address review comments on #6053 - tuple.py: label out pandas version --- amber/src/main/python/core/models/tuple.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amber/src/main/python/core/models/tuple.py b/amber/src/main/python/core/models/tuple.py index 1b2db2f198c..4b5a2e0ee9c 100644 --- a/amber/src/main/python/core/models/tuple.py +++ b/amber/src/main/python/core/models/tuple.py @@ -329,7 +329,7 @@ def cast_to_schema(self, schema: Schema) -> None: and isinstance(field_value, float) and field_value.is_integer() ): - # pandas promotes an int column holding nulls to + # pandas 2.2.3 promotes an int column holding nulls to # float64 (119 -> 119.0), so convert integral floats # destined for INT/LONG back to int — but only within # the safe range above; out-of-range floats are left From cc3fe9703ce3d542dc46e7692490339ff8009315 Mon Sep 17 00:00:00 2001 From: eugenegujing Date: Fri, 3 Jul 2026 12:13:09 -0700 Subject: [PATCH 5/5] ci: re-trigger checks