Skip to content
Open
15 changes: 15 additions & 0 deletions packages/bigframes/bigframes/_config/experiment_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def __init__(self):
self._semantic_operators: bool = False
self._ai_operators: bool = False
self._sql_compiler: Literal["legacy", "stable", "experimental"] = "stable"
self._enable_python_transpiler: bool = False

@property
def semantic_operators(self) -> bool:
Expand Down Expand Up @@ -166,3 +167,17 @@ def blob_display_height(self, value: Optional[int]):
warnings.warn(msg, category=bfe.ApiDeprecationWarning)

bigframes.options.display.blob_display_height = value

@property
def enable_python_transpiler(self) -> bool:
return self._enable_python_transpiler

@enable_python_transpiler.setter
def enable_python_transpiler(self, value: bool):
if value:
msg = bfe.format_message(
"Python transpiler is an unstable, experimental feature, and not yet fully "
"validated, use at your own risk."
)
warnings.warn(msg, category=bfe.PythonTranspilerPreviewWarning)
self._enable_python_transpiler = value
40 changes: 38 additions & 2 deletions packages/bigframes/bigframes/core/block_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,58 @@
from __future__ import annotations

import functools
import inspect
import typing
from typing import Optional, Sequence
from typing import Callable, Hashable, Optional, Sequence

import bigframes_vendored.constants as constants
import pandas as pd

import bigframes.constants
import bigframes.core as core
import bigframes.core.blocks as blocks
import bigframes.core.bytecode as bytecode
import bigframes.core.expression as ex
import bigframes.core.ordering as ordering
import bigframes.core.window_spec as windows
import bigframes.dtypes as dtypes
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
from bigframes.core import agg_expressions
from bigframes.core import agg_expressions, py_expressions


def apply_to_block_rows(
func: Callable, block: blocks.Block, *args, **kwargs
) -> blocks.Block:
"""
Apply the given function to each row of the block.

The function is applied to each row of the block, and the result is returned
as a new block with the same index.
"""
expr = bytecode._compile_bytecode_to_py_expr(func)
sig = inspect.signature(func)

bindings: dict[Hashable, ex.Expression] = {}

bound_args = sig.bind(*(None, *args), **kwargs)
bound_args.apply_defaults()
bound_params = bound_args.arguments
for name, value in bound_params.items():
bindings[name] = ex.const(value)

expr = py_expressions.resolve_py_exprs(
expr,
series_arg=next(iter(sig.parameters.keys())),
series_attrs={
label: col_id
for label in block.column_labels
if (col_id := block.resolve_label_exact(label)) is not None
},
)
expr = expr.bind_variables(bindings)

return block.project_exprs([expr], labels=[None], drop=True)


def equals(block1: blocks.Block, block2: blocks.Block) -> bool:
Expand Down
7 changes: 2 additions & 5 deletions packages/bigframes/bigframes/core/bytecode.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,15 +249,12 @@ def _compile_bytecode_to_py_expr(func: Callable) -> expression.Expression:
raise ValueError("No return value found")


def dis_to_expr(func: Callable, unpack_mode: bool = False) -> expression.Expression:
def py_to_expression(func: Callable) -> expression.Expression:
"""
Try to convert a python function to a BigQuery expression.

Unpack mode is whether SQL columns are addressed as attributes of a single
python argument (e.g. row.col1), or as separate arguments (e.g. col1).

This is "best effort" - if the function contains operations that cannot
be converted to BigQuery expressions, it will raise an Exception.
"""
py_expr = _compile_bytecode_to_py_expr(func)
return py_exprs.resolve_py_exprs(py_expr, unpack_mode=unpack_mode)
return py_exprs.resolve_py_exprs(py_expr)
20 changes: 15 additions & 5 deletions packages/bigframes/bigframes/core/py_expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import dataclasses
import itertools
from types import ModuleType
from typing import Callable, Hashable, Mapping, Tuple
from typing import Callable, Hashable, Mapping, Optional, Tuple

import bigframes.operations.python_op_maps as python_op_maps
from bigframes import dtypes
Expand All @@ -27,6 +27,7 @@
OpExpression,
UnboundVariableExpression,
const,
deref,
)
from bigframes.operations import NUMPY_TO_BINOP, NUMPY_TO_OP, generic_ops, numeric_ops

Expand Down Expand Up @@ -310,7 +311,11 @@ def bind_refs(


# TODO: Mode that resolves free variable attrs as columns
def resolve_py_exprs(expression: Expression, unpack_mode: bool = False) -> Expression:
def resolve_py_exprs(
expression: Expression,
series_arg: Optional[str] = None,
series_attrs: Mapping[Hashable, str] | None = None,
) -> Expression:
"""Replace all PyObject, attribute, call expressions. Bottom-up."""

def resolve_expr_if_call(expression: Expression) -> Expression:
Expand All @@ -325,10 +330,15 @@ def resolve_attrs(expression: Expression) -> Expression:
if isinstance(expression.input, Module):
# resolves things like Math.pi
return PyObject(getattr(expression.input.module, expression.attr))
if not unpack_mode and isinstance(
expression.input, UnboundVariableExpression
# TODO: Resolve some series methods
if (
series_arg is not None
and series_attrs is not None
and isinstance(expression.input, UnboundVariableExpression)
and expression.input.id == series_arg
and expression.attr in series_attrs
):
return UnboundVariableExpression(expression.attr)
return deref(series_attrs[expression.attr])
return expression

def resolve_pyobjs(expression: Expression) -> Expression:
Expand Down
36 changes: 30 additions & 6 deletions packages/bigframes/bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4692,13 +4692,17 @@ def _prepare_export(
return array_value, id_overrides

def map(self, func, na_action: Optional[str] = None) -> DataFrame:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love to see an example in the docstrings, especially if this is compatible with the polars engine and thus would make for a relatively speedy flakeless doctest.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added doctests

if not isinstance(func, bigframes.functions.Udf):
from bigframes._config import options

if not isinstance(func, bigframes.functions.Udf) and not (
options.experiments.enable_python_transpiler and callable(func)
):
raise TypeError("the first argument must be callable")

if na_action not in {None, "ignore"}:
raise ValueError(f"na_action={na_action} not supported")

expr = ops.func_to_op(func).as_expr(ex.free_var("input"))
expr = ops.func_to_expr(func).apply(ex.free_var("input"))
if na_action == "ignore":
# True case, predicate, False case
expr = ops.where_op.as_expr(
Expand All @@ -4718,11 +4722,25 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
)
warnings.warn(msg, category=bfe.FunctionAxisOnePreviewWarning)

if not isinstance(func, bigframes.functions.Udf):
from bigframes._config import options

if not isinstance(func, bigframes.functions.Udf) and not (
options.experiments.enable_python_transpiler and callable(func)
):
raise ValueError(
"For axis=1 a BigFrames BigQuery function must be used."
)

if (
not isinstance(func, bigframes.functions.Udf)
and options.experiments.enable_python_transpiler
and callable(func)
):
result_block = block_ops.apply_to_block_rows(
func, self._block, *args, **kwargs
)
return bigframes.series.Series(result_block)

if func.udf_def.signature.is_row_processor:
# Early check whether the dataframe dtypes are currently supported
# in the bigquery function
Expand Down Expand Up @@ -4776,8 +4794,14 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
)

# Apply the function
expr = ops.func_to_expr(func).expr
if not (
isinstance(expr, ex.OpExpression)
and isinstance(expr.op, ops.NaryOp)
):
raise TypeError(f"Expected OpExpression with NaryOp, got {expr}")
result_series = rows_as_json_series._apply_nary_op(
ops.func_to_op(func),
expr.op,
list(args),
)

Expand Down Expand Up @@ -4837,8 +4861,8 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):

series_list = [self[col] for col in self.columns]
op_list = series_list[1:] + list(args)
result_series = series_list[0]._apply_nary_op(
ops.func_to_op(func), op_list
result_series = series_list[0]._apply_callable_expr(
ops.func_to_expr(func), op_list
)
result_series.name = None

Expand Down
8 changes: 8 additions & 0 deletions packages/bigframes/bigframes/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ class MaximumResultRowsExceeded(RuntimeError):
"""Maximum number of rows in the result was exceeded."""


class TranspilationError(RuntimeError):
"""Failed to transpile a Python function to BigFrames Expression."""


class TimeTravelDisabledWarning(Warning):
"""A query was reattempted without time travel."""

Expand Down Expand Up @@ -126,6 +130,10 @@ class FunctionPackageVersionWarning(PreviewWarning):
"""


class PythonTranspilerPreviewWarning(PreviewWarning):
"""Python Transpiler is a preview feature."""


def format_message(message: str, fill: bool = True):
"""[Private] Formats a warning message.

Expand Down
4 changes: 2 additions & 2 deletions packages/bigframes/bigframes/operations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@
timestamp_add_op,
timestamp_sub_op,
)
from bigframes.operations.to_op import func_to_op
from bigframes.operations.to_op import func_to_expr

__all__ = [
# Base ops
Expand Down Expand Up @@ -437,7 +437,7 @@
"AIScore",
"AISimilarity",
# Helper functions
"func_to_op",
"func_to_expr",
# Numpy ops mapping
"NUMPY_TO_BINOP",
"NUMPY_TO_OP",
Expand Down
Loading
Loading