Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions amber/src/main/python/core/models/schema/attribute_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,14 @@ class AttributeType(Enum):
datetime.datetime: AttributeType.TIMESTAMP,
largebinary: AttributeType.LARGE_BINARY,
}

# Signed value ranges within which an integral float can be safely cast back
# to int. INT is bounded by Arrow int32 capacity. LONG is bounded by the
# float64 exact-integer window rather than int64 capacity: above 2**53 float64
# rounds, so the received float may already be a corrupted rendition of the
# original integer. The endpoint 2**53 itself is excluded because it is
# ambiguous (2**53 + 1 also rounds to float 2**53).
INTEGRAL_TYPE_RANGES = {
AttributeType.INT: (-(2**31), 2**31 - 1),
AttributeType.LONG: (-(2**53) + 1, 2**53 - 1),
}
40 changes: 34 additions & 6 deletions amber/src/main/python/core/models/tuple.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@
from typing_extensions import Protocol, runtime_checkable

from core.models.type.large_binary import largebinary
from .schema.attribute_type import TO_PYOBJECT_MAPPING, AttributeType
from .schema.attribute_type import (
INTEGRAL_TYPE_RANGES,
TO_PYOBJECT_MAPPING,
AttributeType,
)
from .schema.field import Field
from .schema.schema import Schema

Expand Down Expand Up @@ -303,9 +307,10 @@ def cast_to_schema(self, schema: Schema) -> None:
"""
Safely cast each field value to match the target schema.
If failed, the value will stay not changed.
This current conducts two kinds of casts:
This current conducts three kinds of casts:
1. cast NaN to None;
2. cast any object to bytes (using pickle).
2. cast integral floats to int for INT/LONG fields;
3. cast any object to bytes (using pickle).
:param schema: The target Schema that describes the target AttributeType to
cast.
:return:
Expand All @@ -317,10 +322,33 @@ def cast_to_schema(self, schema: Schema) -> None:
# convert NaN to None to support null value conversion
if checknull(field_value):
self[field_name] = None

if field_value is not None:
elif field_value is not None:
field_type = schema.get_attr_type(field_name)
if field_type == AttributeType.BINARY and not isinstance(
if (
field_type in INTEGRAL_TYPE_RANGES
and isinstance(field_value, float)
and field_value.is_integer()
):
# pandas 2.2.3 promotes an int column holding nulls to
# float64 (119 -> 119.0), so convert integral floats
# destined for INT/LONG back to int — but only within
# the safe range above; out-of-range floats are left
# unchanged so validation still fails. Compare on the
# int result to avoid float rounding at the endpoints.
min_value, max_value = INTEGRAL_TYPE_RANGES[field_type]
int_value = int(field_value)
if min_value <= int_value <= max_value:
self[field_name] = int_value
else:
logger.warning(
f"Field '{field_name}': integral float "
f"{field_value} is outside the safely coercible "
f"range of {field_type}; leaving it unchanged "
f"(schema validation will fail). Consider "
f"casting the column to STRING or DOUBLE (or "
f"LONG for large integers in an INT field)."
)
elif field_type == AttributeType.BINARY and not isinstance(
field_value, bytes
):
self[field_name] = b"pickle " + pickle.dumps(field_value)
Expand Down
155 changes: 154 additions & 1 deletion amber/src/test/python/core/models/test_tuple.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
import pytest
import numpy as np
from copy import deepcopy
from loguru import logger

from core.models import Tuple, ArrowTableTupleProvider
from core.models import Table, Tuple, ArrowTableTupleProvider
from core.models.schema.schema import Schema


Expand Down Expand Up @@ -152,6 +153,158 @@ def test_finalize_tuple(self):
assert isinstance(tuple_["scores"], bytes)
assert tuple_["height"] is None

# Pandas-based operators (e.g. TableOperator via Table.from_tuple_likes)
# promote an int column containing nulls to float64, so an INT field can
# arrive at finalize() as 119.0. finalize() must coerce such integral
# floats back to int when they fit the target type's range, while still
# rejecting non-integral, infinite, and out-of-range floats.

@pytest.mark.parametrize(
"raw_value, expected",
[
(119.0, 119),
(-3.0, -3),
(-0.0, 0),
# int32 boundaries are exactly representable as float64
(2147483647.0, 2**31 - 1),
(-2147483648.0, -(2**31)),
# np.float64 subclasses float and must be coerced the same way
(np.float64(119.0), 119),
],
)
def test_finalize_coerces_integral_float_to_int(self, raw_value, expected):
tuple_ = Tuple({"weight": raw_value})
tuple_.finalize(Schema(raw_schema={"weight": "INTEGER"}))
assert tuple_["weight"] == expected
assert type(tuple_["weight"]) is int

@pytest.mark.parametrize(
"raw_value, expected",
[
(3000000000.0, 3000000000),
# np.float64 subclasses float and must be coerced the same way
(np.float64(3000000000.0), 3000000000),
# boundaries of the float64 exact-integer window: every integer
# in [-(2**53) + 1, 2**53 - 1] maps to a unique float64
(float(2**53 - 1), 2**53 - 1),
(float(-(2**53) + 1), -(2**53) + 1),
],
)
def test_finalize_coerces_integral_float_to_long(self, raw_value, expected):
tuple_ = Tuple({"count": raw_value})
tuple_.finalize(Schema(raw_schema={"count": "LONG"}))
assert tuple_["count"] == expected
assert type(tuple_["count"]) is int

def test_finalize_tuples_from_pandas_promoted_int_column(self):
# Mirrors the real pipeline: pandas promotes the INT column to
# float64 inside Table.from_tuple_likes because of the null row
# (119 -> 119.0, None -> NaN). finalize() must restore the int
# and map NaN back to None.
table = Table([{"weight": 119}, {"weight": None}])
assert table["weight"].dtype == "float64"
schema = Schema(raw_schema={"weight": "INTEGER"})
finalized = []
for tuple_ in table.as_tuples():
tuple_.finalize(schema)
finalized.append(tuple_)
assert finalized[0]["weight"] == 119
assert type(finalized[0]["weight"]) is int
assert finalized[1]["weight"] is None

@pytest.mark.parametrize(
"null_value",
[None, float("nan"), np.float64("nan")],
ids=["none", "nan", "np-nan"],
)
def test_finalize_keeps_null_int_field_as_none(self, null_value):
tuple_ = Tuple({"weight": null_value})
tuple_.finalize(Schema(raw_schema={"weight": "INTEGER"}))
assert tuple_["weight"] is None

@pytest.mark.parametrize(
"attr_type, raw_value",
[
# non-integral floats must never be silently truncated
("INTEGER", 119.5),
("LONG", 3.5),
("INTEGER", float("inf")),
("INTEGER", float("-inf")),
# integral floats outside the target range must not be coerced
# into ints that would overflow Arrow int32
("INTEGER", 3e9),
("INTEGER", 2147483648.0), # int32 max + 1
("INTEGER", -2147483649.0), # int32 min - 1
# for LONG, floats beyond the float64 exact-integer window must
# be rejected even though they fit int64: float64 rounds above
# 2**53, so the received float may already be a corrupted
# rendition of the original integer. The endpoint 2**53 itself
# is ambiguous (2**53 + 1 also rounds to float 2**53).
("LONG", float(2**53)),
("LONG", float(-(2**53))),
("LONG", -9223372036854775808.0), # -(2**63), fits int64
("LONG", 9223372036854775808.0), # 2**63, above long max
("LONG", 1e20),
# coercion only applies to INT/LONG targets
("STRING", 119.0),
],
)
def test_finalize_rejects_uncoercible_float(self, attr_type, raw_value):
tuple_ = Tuple({"weight": raw_value})
with pytest.raises(TypeError, match="Unmatched type"):
tuple_.finalize(Schema(raw_schema={"weight": attr_type}))

def test_cast_to_schema_warns_on_out_of_range_integral_float(self):
# An integral float outside the coercible window must be left
# unchanged, and a guidance warning emitted: the follow-up
# validation error alone would not explain the pandas float64
# promotion or how to work around it.
messages = []
handler_id = logger.add(messages.append, level="WARNING")
try:
tuple_ = Tuple({"weight": 3e9})
tuple_.cast_to_schema(Schema(raw_schema={"weight": "INTEGER"}))
finally:
logger.remove(handler_id)
assert tuple_["weight"] == 3e9
assert type(tuple_["weight"]) is float
assert any("outside the safely coercible range" in str(m) for m in messages)

@pytest.mark.parametrize("raw_value", [0.5, 2.0])
def test_finalize_leaves_double_field_untouched(self, raw_value):
tuple_ = Tuple({"ratio": raw_value})
tuple_.finalize(Schema(raw_schema={"ratio": "DOUBLE"}))
assert tuple_["ratio"] == raw_value
assert type(tuple_["ratio"]) is float

def test_finalize_keeps_plain_int_unchanged(self):
tuple_ = Tuple({"weight": 119})
tuple_.finalize(Schema(raw_schema={"weight": "INTEGER"}))
assert tuple_["weight"] == 119
assert type(tuple_["weight"]) is int

def test_cast_to_schema_coerces_integral_float(self):
# The coercion must live in cast_to_schema(), not validate_schema()
tuple_ = Tuple({"weight": 119.0})
tuple_.cast_to_schema(Schema(raw_schema={"weight": "INTEGER"}))
assert tuple_["weight"] == 119
assert type(tuple_["weight"]) is int

def test_validate_schema_still_rejects_integral_float(self):
# validate_schema() alone must stay strict: coercing there instead
# of in cast_to_schema() would let unfinalized floats slip through
tuple_ = Tuple({"weight": 119.0})
with pytest.raises(TypeError, match="Unmatched type"):
tuple_.validate_schema(Schema(raw_schema={"weight": "INTEGER"}))

def test_finalize_maps_nan_in_binary_field_to_none(self):
# NaN in a BINARY field must become None, not a pickled NaN.
# Guards the cast_to_schema() branch structure: after the NaN->None
# conversion, the stale pre-conversion value must not be re-pickled.
tuple_ = Tuple({"payload": float("nan")})
tuple_.finalize(Schema(raw_schema={"payload": "BINARY"}))
assert tuple_["payload"] is None

def test_hash(self):
schema = Schema(
raw_schema={
Expand Down
Loading