diff --git a/packages/bigframes/bigframes/_config/experiment_options.py b/packages/bigframes/bigframes/_config/experiment_options.py index e8183d2b2228..202b47b738c1 100644 --- a/packages/bigframes/bigframes/_config/experiment_options.py +++ b/packages/bigframes/bigframes/_config/experiment_options.py @@ -28,6 +28,7 @@ def __init__(self): self._semantic_operators: bool = False self._ai_operators: bool = False self._sql_compiler: Literal["legacy", "stable", "experimental"] = "stable" + self._enable_python_transpiler: bool = False @property def semantic_operators(self) -> bool: @@ -166,3 +167,17 @@ def blob_display_height(self, value: Optional[int]): warnings.warn(msg, category=bfe.ApiDeprecationWarning) bigframes.options.display.blob_display_height = value + + @property + def enable_python_transpiler(self) -> bool: + return self._enable_python_transpiler + + @enable_python_transpiler.setter + def enable_python_transpiler(self, value: bool): + if value: + msg = bfe.format_message( + "Python transpiler is an unstable, experimental feature, and not yet fully " + "validated, use at your own risk." + ) + warnings.warn(msg, category=bfe.PythonTranspilerPreviewWarning) + self._enable_python_transpiler = value diff --git a/packages/bigframes/bigframes/core/block_transforms.py b/packages/bigframes/bigframes/core/block_transforms.py index cea59c028b83..10bd2a736412 100644 --- a/packages/bigframes/bigframes/core/block_transforms.py +++ b/packages/bigframes/bigframes/core/block_transforms.py @@ -14,8 +14,9 @@ from __future__ import annotations import functools +import inspect import typing -from typing import Optional, Sequence +from typing import Callable, Hashable, Optional, Sequence import bigframes_vendored.constants as constants import pandas as pd @@ -23,13 +24,48 @@ import bigframes.constants import bigframes.core as core import bigframes.core.blocks as blocks +import bigframes.core.bytecode as bytecode import bigframes.core.expression as ex import bigframes.core.ordering as ordering import bigframes.core.window_spec as windows import bigframes.dtypes as dtypes import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops -from bigframes.core import agg_expressions +from bigframes.core import agg_expressions, py_expressions + + +def apply_to_block_rows( + func: Callable, block: blocks.Block, *args, **kwargs +) -> blocks.Block: + """ + Apply the given function to each row of the block. + + The function is applied to each row of the block, and the result is returned + as a new block with the same index. + """ + expr = bytecode._compile_bytecode_to_py_expr(func) + sig = inspect.signature(func) + + bindings: dict[Hashable, ex.Expression] = {} + + bound_args = sig.bind(*(None, *args), **kwargs) + bound_args.apply_defaults() + bound_params = bound_args.arguments + for name, value in bound_params.items(): + bindings[name] = ex.const(value) + + expr = py_expressions.resolve_py_exprs( + expr, + series_arg=next(iter(sig.parameters.keys())), + series_attrs={ + label: col_id + for label in block.column_labels + if (col_id := block.resolve_label_exact(label)) is not None + }, + ) + expr = expr.bind_variables(bindings) + + return block.project_exprs([expr], labels=[None], drop=True) def equals(block1: blocks.Block, block2: blocks.Block) -> bool: diff --git a/packages/bigframes/bigframes/core/bytecode.py b/packages/bigframes/bigframes/core/bytecode.py index 5887254eb4de..fb4d3eabd8b7 100644 --- a/packages/bigframes/bigframes/core/bytecode.py +++ b/packages/bigframes/bigframes/core/bytecode.py @@ -249,15 +249,12 @@ def _compile_bytecode_to_py_expr(func: Callable) -> expression.Expression: raise ValueError("No return value found") -def dis_to_expr(func: Callable, unpack_mode: bool = False) -> expression.Expression: +def py_to_expression(func: Callable) -> expression.Expression: """ Try to convert a python function to a BigQuery expression. - Unpack mode is whether SQL columns are addressed as attributes of a single - python argument (e.g. row.col1), or as separate arguments (e.g. col1). - This is "best effort" - if the function contains operations that cannot be converted to BigQuery expressions, it will raise an Exception. """ py_expr = _compile_bytecode_to_py_expr(func) - return py_exprs.resolve_py_exprs(py_expr, unpack_mode=unpack_mode) + return py_exprs.resolve_py_exprs(py_expr) diff --git a/packages/bigframes/bigframes/core/py_expressions.py b/packages/bigframes/bigframes/core/py_expressions.py index f29a35b0d161..e1885e13afea 100644 --- a/packages/bigframes/bigframes/core/py_expressions.py +++ b/packages/bigframes/bigframes/core/py_expressions.py @@ -17,7 +17,7 @@ import dataclasses import itertools from types import ModuleType -from typing import Callable, Hashable, Mapping, Tuple +from typing import Callable, Hashable, Mapping, Optional, Tuple import bigframes.operations.python_op_maps as python_op_maps from bigframes import dtypes @@ -27,6 +27,7 @@ OpExpression, UnboundVariableExpression, const, + deref, ) from bigframes.operations import NUMPY_TO_BINOP, NUMPY_TO_OP, generic_ops, numeric_ops @@ -310,7 +311,11 @@ def bind_refs( # TODO: Mode that resolves free variable attrs as columns -def resolve_py_exprs(expression: Expression, unpack_mode: bool = False) -> Expression: +def resolve_py_exprs( + expression: Expression, + series_arg: Optional[str] = None, + series_attrs: Mapping[Hashable, str] | None = None, +) -> Expression: """Replace all PyObject, attribute, call expressions. Bottom-up.""" def resolve_expr_if_call(expression: Expression) -> Expression: @@ -325,10 +330,15 @@ def resolve_attrs(expression: Expression) -> Expression: if isinstance(expression.input, Module): # resolves things like Math.pi return PyObject(getattr(expression.input.module, expression.attr)) - if not unpack_mode and isinstance( - expression.input, UnboundVariableExpression + # TODO: Resolve some series methods + if ( + series_arg is not None + and series_attrs is not None + and isinstance(expression.input, UnboundVariableExpression) + and expression.input.id == series_arg + and expression.attr in series_attrs ): - return UnboundVariableExpression(expression.attr) + return deref(series_attrs[expression.attr]) return expression def resolve_pyobjs(expression: Expression) -> Expression: diff --git a/packages/bigframes/bigframes/dataframe.py b/packages/bigframes/bigframes/dataframe.py index f5fc7bdfc6b1..28ac0006d300 100644 --- a/packages/bigframes/bigframes/dataframe.py +++ b/packages/bigframes/bigframes/dataframe.py @@ -4692,13 +4692,17 @@ def _prepare_export( return array_value, id_overrides def map(self, func, na_action: Optional[str] = None) -> DataFrame: - if not isinstance(func, bigframes.functions.Udf): + from bigframes._config import options + + if not isinstance(func, bigframes.functions.Udf) and not ( + options.experiments.enable_python_transpiler and callable(func) + ): raise TypeError("the first argument must be callable") if na_action not in {None, "ignore"}: raise ValueError(f"na_action={na_action} not supported") - expr = ops.func_to_op(func).as_expr(ex.free_var("input")) + expr = ops.func_to_expr(func).apply(ex.free_var("input")) if na_action == "ignore": # True case, predicate, False case expr = ops.where_op.as_expr( @@ -4718,11 +4722,25 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): ) warnings.warn(msg, category=bfe.FunctionAxisOnePreviewWarning) - if not isinstance(func, bigframes.functions.Udf): + from bigframes._config import options + + if not isinstance(func, bigframes.functions.Udf) and not ( + options.experiments.enable_python_transpiler and callable(func) + ): raise ValueError( "For axis=1 a BigFrames BigQuery function must be used." ) + if ( + not isinstance(func, bigframes.functions.Udf) + and options.experiments.enable_python_transpiler + and callable(func) + ): + result_block = block_ops.apply_to_block_rows( + func, self._block, *args, **kwargs + ) + return bigframes.series.Series(result_block) + if func.udf_def.signature.is_row_processor: # Early check whether the dataframe dtypes are currently supported # in the bigquery function @@ -4776,8 +4794,14 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): ) # Apply the function + expr = ops.func_to_expr(func).expr + if not ( + isinstance(expr, ex.OpExpression) + and isinstance(expr.op, ops.NaryOp) + ): + raise TypeError(f"Expected OpExpression with NaryOp, got {expr}") result_series = rows_as_json_series._apply_nary_op( - ops.func_to_op(func), + expr.op, list(args), ) @@ -4837,8 +4861,8 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): series_list = [self[col] for col in self.columns] op_list = series_list[1:] + list(args) - result_series = series_list[0]._apply_nary_op( - ops.func_to_op(func), op_list + result_series = series_list[0]._apply_callable_expr( + ops.func_to_expr(func), op_list ) result_series.name = None diff --git a/packages/bigframes/bigframes/exceptions.py b/packages/bigframes/bigframes/exceptions.py index 9facb40e8eac..dea8a55f9b55 100644 --- a/packages/bigframes/bigframes/exceptions.py +++ b/packages/bigframes/bigframes/exceptions.py @@ -75,6 +75,10 @@ class MaximumResultRowsExceeded(RuntimeError): """Maximum number of rows in the result was exceeded.""" +class TranspilationError(RuntimeError): + """Failed to transpile a Python function to BigFrames Expression.""" + + class TimeTravelDisabledWarning(Warning): """A query was reattempted without time travel.""" @@ -126,6 +130,10 @@ class FunctionPackageVersionWarning(PreviewWarning): """ +class PythonTranspilerPreviewWarning(PreviewWarning): + """Python Transpiler is a preview feature.""" + + def format_message(message: str, fill: bool = True): """[Private] Formats a warning message. diff --git a/packages/bigframes/bigframes/operations/__init__.py b/packages/bigframes/bigframes/operations/__init__.py index b8d860029a0f..2b4d1ca31d21 100644 --- a/packages/bigframes/bigframes/operations/__init__.py +++ b/packages/bigframes/bigframes/operations/__init__.py @@ -229,7 +229,7 @@ timestamp_add_op, timestamp_sub_op, ) -from bigframes.operations.to_op import func_to_op +from bigframes.operations.to_op import func_to_expr __all__ = [ # Base ops @@ -437,7 +437,7 @@ "AIScore", "AISimilarity", # Helper functions - "func_to_op", + "func_to_expr", # Numpy ops mapping "NUMPY_TO_BINOP", "NUMPY_TO_OP", diff --git a/packages/bigframes/bigframes/operations/to_op.py b/packages/bigframes/bigframes/operations/to_op.py index 7fd44d957e40..4f97a61e3c0b 100644 --- a/packages/bigframes/bigframes/operations/to_op.py +++ b/packages/bigframes/bigframes/operations/to_op.py @@ -11,31 +11,191 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations +import dataclasses +import inspect +import typing + +import bigframes.core.expression as ex +from bigframes._config import options +from bigframes.exceptions import TranspilationError from bigframes.functions import Udf from bigframes.functions.udf_def import BigqueryUdf, PythonUdf from bigframes.operations import base_ops, remote_function_ops +ArgKind = typing.Literal[ + "positional_only", + "positional_or_keyword", + "keyword_only", + "var_positional", + "var_keyword", +] + +_ARGKIND_MAP: dict[inspect._ParameterKind, ArgKind] = { + inspect.Parameter.POSITIONAL_ONLY: "positional_only", + inspect.Parameter.POSITIONAL_OR_KEYWORD: "positional_or_keyword", + inspect.Parameter.VAR_POSITIONAL: "var_positional", + inspect.Parameter.KEYWORD_ONLY: "keyword_only", + inspect.Parameter.VAR_KEYWORD: "var_keyword", +} -def func_to_op(op) -> base_ops.NaryOp: + +@dataclasses.dataclass(frozen=True) +class ArgumentSpec: + """ + Information about a single argument to a function """ - Convert various bigframes, python functions into bigframes operations. - This should handle anything that might be passed to eg map, combine, other pandas methods that take a function. + name: str + default_value: typing.Any + argkind: ArgKind + + @property + def is_positional(self) -> bool: + return self.argkind in ["positional_only", "positional_or_keyword"] + + @property + def is_keyword(self) -> bool: + return self.argkind in ["keyword_only", "positional_or_keyword"] + + @property + def is_var_positional(self) -> bool: + return self.argkind == "var_positional" + + @property + def is_var_keyword(self) -> bool: + return self.argkind == "var_keyword" + + @property + def is_varargs(self) -> bool: + return self.is_var_positional + + +@dataclasses.dataclass(frozen=True) +class CallableExpression: + """ + Encodes a calling convention and an expression to bind arguments to. + """ + + expr: ex.Expression + arg_specs: typing.Sequence[ArgumentSpec] + + @classmethod + def from_callable(cls, func: typing.Callable) -> CallableExpression: + sig = inspect.signature(func) + arg_specs = [] + for name, param in sig.parameters.items(): + arg_specs.append( + ArgumentSpec( + name=name, + default_value=param.default, + argkind=_ARGKIND_MAP[param.kind], + ) + ) - It should raise a TypeError if the object is not a supported type. + from bigframes.core.bytecode import py_to_expression - Args: - op: The object to convert. + try: + expr = py_to_expression(func) + except Exception as ex: + raise TranspilationError(f"Failed to transpile function {func}") from ex + return cls(expr=expr, arg_specs=arg_specs) - Returns: - A bigframes operations. + def apply(self, *args, **kwargs) -> ex.Expression: + """ + Apply the arguments to the expression. + + All args are expected to be column references, or scalars. + """ + return self.bind_partial(*args, _offset=0, **kwargs).expr + + def bind_partial( + self, + *args, + _offset: int = 0, + **kwargs, + ) -> CallableExpression: + """ + Bind a subset of arguments and return a new CallableExpression with the remaining unbound arguments. + """ + bindings: dict[typing.Hashable, ex.Expression] = {} + pos_idx = 0 + allowed_params = self.arg_specs[_offset:] + allowed_names = {spec.name for spec in allowed_params} + + # Validate unexpected keyword arguments + for key in kwargs: + if key not in allowed_names: + raise TypeError(f"got an unexpected keyword argument '{key}'") + + def to_expr(val): + if isinstance(val, ex.Expression): + return val + return ex.const(val) + + for spec in allowed_params: + if spec.is_varargs: + raise NotImplementedError( + "varargs in compiled python functions is not supported" + ) + + if pos_idx < len(args): + if spec.name in kwargs: + raise TypeError( + f"got multiple values for keyword argument '{spec.name}'" + ) + bindings[spec.name] = to_expr(args[pos_idx]) + pos_idx += 1 + elif spec.name in kwargs: + bindings[spec.name] = to_expr(kwargs[spec.name]) + elif spec.default_value is not inspect.Parameter.empty: + bindings[spec.name] = to_expr(spec.default_value) + else: + raise TypeError(f"missing required argument: '{spec.name}'") + + if pos_idx < len(args): + raise TypeError( + f"too many positional arguments: expected {len(allowed_params)}, got {len(args)}" + ) + + new_expr = self.expr.bind_variables(bindings, allow_partial_bindings=True) + remaining_specs = list(self.arg_specs[:_offset]) + return CallableExpression(expr=new_expr, arg_specs=remaining_specs) + + +def func_to_expr(op) -> CallableExpression: + """ + Convert various bigframes, python functions into bigframes CallableExpression. """ - # TODO(b/517578802): Handle numpy ufuncs, builtin functions, etc. if isinstance(op, Udf): + bq_op: base_ops.NaryOp if isinstance(op.udf_def, BigqueryUdf): - return remote_function_ops.RemoteFunctionOp(function_def=op.udf_def) + bq_op = remote_function_ops.RemoteFunctionOp(function_def=op.udf_def) elif isinstance(op.udf_def, PythonUdf): - return remote_function_ops.PythonUdfOp(function_def=op.udf_def) + bq_op = remote_function_ops.PythonUdfOp(function_def=op.udf_def) + else: + raise TypeError(f"Unsupported UDF definition: {op.udf_def}") + + inputs_expr = tuple( + ex.free_var(arg.name) for arg in op.udf_def.signature.inputs + ) + expr = ex.OpExpression(bq_op, inputs_expr) + + arg_specs = [ + ArgumentSpec( + name=arg.name, + default_value=inspect.Parameter.empty, + # Udf specs don't have concept of positional only or keyword only yet, + # so default to positional_or_keyword. + argkind="positional_or_keyword", + ) + for arg in op.udf_def.signature.inputs + ] + return CallableExpression(expr=expr, arg_specs=arg_specs) + + elif options.experiments.enable_python_transpiler and callable(op): + return CallableExpression.from_callable(op) + else: raise TypeError(f"Unsupported function type: {op}") diff --git a/packages/bigframes/bigframes/series.py b/packages/bigframes/bigframes/series.py index 262e1859ab92..51cf489ba919 100644 --- a/packages/bigframes/bigframes/series.py +++ b/packages/bigframes/bigframes/series.py @@ -2065,25 +2065,48 @@ def apply( " are supported." ) - if isinstance(func, bigframes.functions.Udf): - # We are working with bigquery function at this point - result_series = self._apply_nary_op(ops.func_to_op(func), args) - # TODO(jialuo): Investigate why `_apply_nary_op` drops the series - # `name`. Manually reassigning it here as a temporary fix. - result_series.name = self.name - - return result_series - + # Highest priority: try to map directly to an operator, for eg numpy + # ufuncs, or simple arithmetic/logic operators. bf_op = python_ops.python_callable_to_op(func) if bf_op and isinstance(bf_op, ops.UnaryOp): return self._apply_unary_op(bf_op) - # It is neither a remote function nor a managed function. - # Then it must be a vectorized function that applies to the Series - # as a whole. if by_row: + from bigframes._config import options + + enable_transpile = options.experiments.enable_python_transpiler + return self._apply_by_row( + func, args=args, transpile_enabled=enable_transpile + ) + try: + return func(self) # type: ignore + except Exception as ex: + # This could happen if any of the operators in func is not + # supported on a Series. Let's guide the customer to use a + # bigquery function instead + if hasattr(ex, "message"): + ex.message += f"\n{_bigquery_function_recommendation_message}" + raise + + def _apply_by_row( + self, + func: typing.Callable, + args: typing.Tuple = (), + transpile_enabled: bool = False, + ) -> Series: + """ + Apply callable or deployed udf row-wise on the series. + """ + if not callable(func): + raise ValueError( + "Expected a callable function. If you meant to use a BigQuery function, please wrap it with bigframes.pandas.udf(...)" + ) + try: + expr = ops.func_to_expr(func) + # We get this message even if transpiler could have in theory translated it. + except Exception: raise ValueError( - "You have passed a function as-is. If your intention is to " + "You have passed a functi1on as-is. If your intention is to " "apply this function in a vectorized way (i.e. to the " "entire Series as a whole, and you are sure that it " "performs only the operations that are implemented for a " @@ -2097,15 +2120,12 @@ def apply( "or `bigframes.pandas.remote_function` before passing." ) - try: - return func(self) # type: ignore - except Exception as ex: - # This could happen if any of the operators in func is not - # supported on a Series. Let's guide the customer to use a - # bigquery function instead - if hasattr(ex, "message"): - ex.message += f"\n{_bigquery_function_recommendation_message}" - raise + result_series = self._apply_callable_expr(expr, args) + # TODO(jialuo): Investigate why `_apply_nary_op` drops the series + # `name`. Manually reassigning it here as a temporary fix. + result_series.name = self.name + + return result_series def combine( self, @@ -2119,8 +2139,12 @@ def combine( " are supported." ) - if isinstance(func, bigframes.functions.Udf): - result_series = self._apply_nary_op(ops.func_to_op(func), (other,)) + from bigframes._config import options + + if isinstance(func, bigframes.functions.Udf) or ( + options.experiments.enable_python_transpiler and callable(func) + ): + result_series = self._apply_callable_expr(ops.func_to_expr(func), (other,)) if hasattr(other, "name") and other.name != self._name: # type: ignore result_series.name = None else: @@ -2490,7 +2514,10 @@ def map( map_df = map_df.set_index("keys") elif callable(arg): # This is for remote function and managed funtion. - return self.apply(arg) + from bigframes._config import options + + enable_transpile = options.experiments.enable_python_transpiler + return self._apply_by_row(arg, transpile_enabled=enable_transpile) else: # Mirroring pandas, call the uncallable object arg() # throws TypeError: object is not callable @@ -2727,6 +2754,19 @@ def _apply_nary_op( block, result_id = block.project_expr(op.as_expr(*values)) return Series(block.select_column(result_id).with_column_labels([None])) + def _apply_callable_expr( + self, + callable_expr: bigframes.operations.to_op.CallableExpression, + others: Sequence[typing.Union[Series, scalars.Scalar]], + ignore_self=False, + ): + """Applies a CallableExpression to the series and others.""" + values, block = self._align_n( + others, ignore_self=ignore_self, cast_scalars=False + ) + block, result_id = block.project_expr(callable_expr.apply(*values)) + return Series(block.select_column(result_id).with_column_labels([None])) + def _apply_binary_aggregation( self, other: Series, stat: agg_ops.BinaryAggregateOp ) -> float: diff --git a/packages/bigframes/conftest.py b/packages/bigframes/conftest.py index e0f059fa4322..5d3f116b521c 100644 --- a/packages/bigframes/conftest.py +++ b/packages/bigframes/conftest.py @@ -29,7 +29,7 @@ warnings.simplefilter("ignore", pd.errors.SettingWithCopyWarning) -@pytest.fixture(scope="session") +@pytest.fixture() def polars_session_or_bpd(): # Since the doctest imports fixture is autouse=True, don't skip if polars # isn't available. diff --git a/packages/bigframes/tests/unit/core/test_bytecode.py b/packages/bigframes/tests/unit/core/test_bytecode.py index e718d252c601..036e3f00e8fa 100644 --- a/packages/bigframes/tests/unit/core/test_bytecode.py +++ b/packages/bigframes/tests/unit/core/test_bytecode.py @@ -18,73 +18,64 @@ import bigframes.core.expression as ex import bigframes.operations as ops -from bigframes.core.bytecode import dis_to_expr +from bigframes.core.bytecode import py_to_expression -def test_dis_to_expr_simple_arithmetic(): - func = lambda row: row.x + 1 - expr = dis_to_expr(func, unpack_mode=False) +def test_py_to_expression_simple_arithmetic(): + func = lambda x: x + 1 + expr = py_to_expression(func) assert expr is not None expected = ops.add_op.as_expr(ex.free_var("x"), ex.const(1)) assert expr == expected -def test_dis_to_expr_unpack_mode(): - func = lambda col1, col2: col1 * col2 - expr = dis_to_expr(func, unpack_mode=True) - assert expr is not None - - expected = ops.mul_op.as_expr(ex.free_var("col1"), ex.free_var("col2")) - assert expr == expected - - -def test_dis_to_expr_math_function(): - func = lambda row: math.sin(row.x) - expr = dis_to_expr(func, unpack_mode=False) +def test_py_to_expression_math_function(): + func = lambda x: math.sin(x) + expr = py_to_expression(func) assert expr is not None expected = ops.numeric_ops.sin_op.as_expr(ex.free_var("x")) assert expr == expected -def test_dis_to_expr_negation(): - func = lambda row: -row.x - expr = dis_to_expr(func, unpack_mode=False) +def test_py_to_expression_negation(): + func = lambda x: -x + expr = py_to_expression(func) assert expr is not None expected = ops.numeric_ops.neg_op.as_expr(ex.free_var("x")) assert expr == expected -def test_dis_to_expr_comparison(): - func = lambda row: row.x == row.y - expr = dis_to_expr(func, unpack_mode=False) +def test_py_to_expression_comparison(): + func = lambda x, y: x == y + expr = py_to_expression(func) assert expr is not None expected = ops.comparison_ops.eq_op.as_expr(ex.free_var("x"), ex.free_var("y")) assert expr == expected -def test_dis_to_expr_unsupported(): +def test_py_to_expression_unsupported(): # Control flow or unsupported structures should return None - def func_with_loop(row): + def func_with_loop(x): res = 0 - for val in range(int(row.x)): + for val in range(int(x)): res += val return res with pytest.raises(ValueError): - dis_to_expr(func_with_loop, unpack_mode=False) + py_to_expression(func_with_loop) global_none_val = None -def test_dis_to_expr_global_none(): +def test_py_to_expression_global_none(): # Test resolving a global variable explicitly set to None - func = lambda row: row.x == global_none_val - expr = dis_to_expr(func, unpack_mode=False) + func = lambda x: x == global_none_val + expr = py_to_expression(func) assert expr is not None expected = ops.comparison_ops.eq_op.as_expr(ex.free_var("x"), ex.const(None)) diff --git a/packages/bigframes/tests/unit/test_py_udf.py b/packages/bigframes/tests/unit/test_py_udf.py new file mode 100644 index 000000000000..ad491f6a7393 --- /dev/null +++ b/packages/bigframes/tests/unit/test_py_udf.py @@ -0,0 +1,243 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pathlib +from typing import Generator + +import pandas as pd +import pandas.testing +import pytest + +import bigframes +import bigframes.pandas as bpd +from bigframes.testing.utils import ( + assert_frame_equal, + assert_series_equal, + convert_pandas_dtypes, +) + +pytest.importorskip("polars") +pytest.importorskip("pandas", minversion="2.0.0") + +CURRENT_DIR = pathlib.Path(__file__).parent +DATA_DIR = CURRENT_DIR.parent / "data" + + +@pytest.fixture(scope="module", autouse=True) +def session() -> Generator[bigframes.Session, None, None]: + import bigframes.core.global_session + from bigframes.testing import polars_session + + with bpd.option_context("experiments.enable_python_transpiler", True): + session = polars_session.TestSession() + with bigframes.core.global_session._GlobalSessionContext(session): + yield session + + +@pytest.fixture(scope="module") +def scalars_pandas_df_index() -> pd.DataFrame: + """pd.DataFrame pointing at test data.""" + + df = pd.read_json( + DATA_DIR / "scalars.jsonl", + lines=True, + ) + convert_pandas_dtypes(df, bytes_col=True) + + df = df.set_index("rowindex", drop=False) + df.index.name = None + return df.set_index("rowindex").sort_index() + + +@pytest.fixture(scope="module") +def scalars_df_index( + session: bigframes.Session, scalars_pandas_df_index +) -> bpd.DataFrame: + return session.read_pandas(scalars_pandas_df_index) + + +@pytest.fixture(scope="module") +def scalars_dfs( + scalars_df_index, + scalars_pandas_df_index, +): + return scalars_df_index, scalars_pandas_df_index + + +def test_dataframe_map_transpile( + scalars_df_index, + scalars_pandas_df_index, +): + columns = ["int64_too", "int64_col"] + + def foo(input): + return input * 3 + 12 + + bf_result = scalars_df_index[columns].map(foo, na_action="ignore").to_pandas() + + pd_result = ( + scalars_pandas_df_index[columns].map(foo, na_action="ignore").astype("Int64") + ) + + assert_frame_equal(bf_result, pd_result) + + +def test_dataframe_apply_axis_1_transpile( + scalars_df_index, + scalars_pandas_df_index, +): + columns = ["int64_too", "int64_col"] + + def foo(input): + return input.int64_too + input.int64_col + + bf_result = scalars_df_index[columns].apply(foo, axis=1).to_pandas() + + pd_result = scalars_pandas_df_index[columns].apply(foo, axis=1).astype("Int64") + + assert_series_equal(bf_result, pd_result) + + +def test_series_combine_transpile( + scalars_df_index, + scalars_pandas_df_index, +): + def which_smaller(left, right): + return (left * right) + 3 + + bf_result = ( + scalars_df_index["int64_too"] + .combine(scalars_df_index["int64_col"], which_smaller) + .to_pandas() + ) + + pd_result = scalars_pandas_df_index["int64_too"].combine( + scalars_pandas_df_index["int64_col"], which_smaller + ) + + assert_series_equal(bf_result, pd_result) + + +def test_dataframe_apply_axis_1_transpile_with_defaults( + scalars_df_index, + scalars_pandas_df_index, +): + columns = ["int64_too", "int64_col"] + + def foo(input, x=10, y=5): + return input.int64_too + input.int64_col + x + y + + bf_result = scalars_df_index[columns].apply(foo, axis=1).to_pandas() + pd_result = scalars_pandas_df_index[columns].apply(foo, axis=1).astype("Int64") + + assert_series_equal(bf_result, pd_result) + + +def test_dataframe_apply_axis_1_transpile_with_args( + scalars_df_index, + scalars_pandas_df_index, +): + columns = ["int64_too", "int64_col"] + + def foo(input, x, y=5): + return input.int64_too + input.int64_col + x + y + + bf_result = ( + scalars_df_index[columns].apply(foo, axis=1, args=(12,), y=20).to_pandas() + ) + pd_result = ( + scalars_pandas_df_index[columns] + .apply(foo, axis=1, args=(12,), y=20) + .astype("Int64") + ) + + assert_series_equal(bf_result, pd_result) + + +def test_dataframe_apply_axis_1_transpile_invalid_bindings( + scalars_df_index, +): + columns = ["int64_too", "int64_col"] + + def foo(input, x, y=5): + return input.int64_too + input.int64_col + x + y + + # 1. Unexpected keyword argument + with pytest.raises(TypeError, match="unexpected keyword argument 'z'"): + scalars_df_index[columns].apply(foo, axis=1, args=(10,), z=20) + + # 2. Multiple values for keyword argument 'x' + with pytest.raises(TypeError, match="multiple values for argument 'x'"): + scalars_df_index[columns].apply(foo, axis=1, args=(10,), x=20) + + # 3. Too many positional arguments + with pytest.raises(TypeError, match="too many positional arguments"): + scalars_df_index[columns].apply(foo, axis=1, args=(10, 20, 30)) + + # 4. Missing required argument 'x' + with pytest.raises(TypeError, match="missing a required argument: 'x'"): + scalars_df_index[columns].apply(foo, axis=1) + + +def test_series_apply_transpile( + scalars_df_index, + scalars_pandas_df_index, +): + def foo(x, y=10): + return x * 2 + y + + bf_result = scalars_df_index["int64_col"].apply(foo, args=(5,)).to_pandas() + pd_result = ( + scalars_pandas_df_index["int64_col"].apply(foo, args=(5,)).astype("Int64") + ) + + assert_series_equal(bf_result, pd_result) + + +def test_series_apply_transpile_invalid_bindings( + scalars_df_index, +): + def foo(x, y): + return x + y + + # Too many positional args: foo takes 2 args (x, y), we pass self and 2 more args (total 3 positional) + with pytest.raises( + TypeError, match="too many positional arguments: expected 2, got 3" + ): + scalars_df_index["int64_col"].apply(foo, args=(10, 20)) + + # Missing required argument: foo takes 2 args, we only pass self (so y is missing) + with pytest.raises(TypeError, match="missing required argument: 'y'"): + scalars_df_index["int64_col"].apply(foo) + + +def test_transpilation_unsupported_ops_raise( + scalars_df_index, +): + def foo_with_if(x): + if x > 0: + return x + return -x + + with pytest.raises(ValueError): + scalars_df_index["int64_col"].apply(foo_with_if) + + def foo_with_loop(x): + total = 0 + for i in range(x): + total += i + return total + + with pytest.raises(ValueError): + scalars_df_index["int64_col"].apply(foo_with_loop) diff --git a/packages/bigframes/third_party/bigframes_vendored/pandas/core/frame.py b/packages/bigframes/third_party/bigframes_vendored/pandas/core/frame.py index f016cab47ae3..f68c6a40b088 100644 --- a/packages/bigframes/third_party/bigframes_vendored/pandas/core/frame.py +++ b/packages/bigframes/third_party/bigframes_vendored/pandas/core/frame.py @@ -4470,6 +4470,22 @@ def map(self, func, na_action: Optional[str] = None) -> DataFrame: [7 rows x 2 columns] + With experimental Python Transpiler enabled, you can use some lambda functions without + deploying them as remote functions. + + >>> bpd.options.experiments.enable_python_transpiler = True + >>> df_minutes.map(lambda hours: hours / 60) + system_minutes user_minutes + 0 0.0 0.0 + 1 0.5 0.25 + 2 1.0 1.25 + 3 1.5 + 4 1.5 0.1 + 5 2.0 + 6 + + [7 rows x 2 columns] + Args: func (function): Python function wrapped by ``remote_function`` decorator, @@ -5053,6 +5069,15 @@ def apply(self, func, *, axis=0, args=(), **kwargs): 1 3.8 dtype: Float64 + With experimental Python Transpiler enabled, you can use some lambda functions without + deploying them as remote functions: + + >>> bpd.options.experiments.enable_python_transpiler = True + >>> df.apply(lambda row: 1 + row.col1 + row.col2/row.col3, axis=1) + 0 2.6 + 1 3.8 + dtype: Float64 + Args: func (function): Function to apply to each column or row. To apply to each row diff --git a/packages/bigframes/third_party/bigframes_vendored/pandas/core/series.py b/packages/bigframes/third_party/bigframes_vendored/pandas/core/series.py index c116ed640122..fc3062c87776 100644 --- a/packages/bigframes/third_party/bigframes_vendored/pandas/core/series.py +++ b/packages/bigframes/third_party/bigframes_vendored/pandas/core/series.py @@ -5631,6 +5631,17 @@ def map( 3 rAbbIt dtype: string + With experimental Python Transpiler enabled, you can use some lambda functions without + deploying them as remote functions: + + >>> bpd.options.experiments.enable_python_transpiler = True + >>> s.map(lambda val: val + "fish") + 0 catfish + 1 dogfish + 2 + 3 rabbitfish + dtype: string + Args: arg (function, Mapping, Series): remote function, collections.abc.Mapping subclass or Series