diff --git a/sdks/python/apache_beam/transforms/stats_test.py b/sdks/python/apache_beam/transforms/stats_test.py index bf634c003a07..b236c7e3d5ac 100644 --- a/sdks/python/apache_beam/transforms/stats_test.py +++ b/sdks/python/apache_beam/transforms/stats_test.py @@ -264,6 +264,10 @@ def _approx_quantile_generator(size, num_of_quantiles, absoluteError): quantiles.append(size - 1) return quantiles + @staticmethod + def _sum_and_second(x): + return (sum(x), x[1]) + def test_quantiles_globaly(self): with TestPipeline() as p: pc = p | Create(list(range(101))) @@ -490,22 +494,32 @@ def test_batched_quantiles(self): 3, input_batched=True)) with_key = ( pc | 'Globally with key' >> beam.ApproximateQuantiles.Globally( - 3, key=sum, input_batched=True)) + 3, + key=ApproximateQuantilesTest._sum_and_second, + input_batched=True)) key_with_reversed = ( pc | 'Globally with key and reversed' >> beam.ApproximateQuantiles.Globally( - 3, key=sum, reverse=True, input_batched=True)) + 3, + key=ApproximateQuantilesTest._sum_and_second, + reverse=True, + input_batched=True)) assert_that( globally, equal_to([[(0.0, 500), (49.9, 1), (99.9, 499)]]), label='checkGlobally') + # When key is present, both (72.5, 225) and (22.5, 275) produce the exact same + # sum (297.5). If we just use key=sum, tie-breaking is sensitive to bundle merging + # order and shared class-level jitter state, leading to flaky test failures. + # With the secondary key (defined in _sum_and_second), we can break ties + # deterministically. assert_that( with_key, equal_to([[(50.0, 0), (72.5, 225), (99.9, 499)]]), label='checkGloballyWithKey') assert_that( key_with_reversed, - equal_to([[(99.9, 499), (72.5, 225), (50.0, 0)]]), + equal_to([[(99.9, 499), (22.5, 275), (50.0, 0)]]), label='checkGloballyWithKeyAndReversed') def test_batched_weighted_quantiles(self):