Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/bigframes/bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
from bigframes import version
from bigframes.core import blocks, utils
from bigframes.core.logging import log_adapter
from bigframes.session import bigquery_session, bq_caching_executor, executor
from bigframes.session import bigquery_session, executor, proxy_executor

# Avoid circular imports.
if typing.TYPE_CHECKING:
Expand Down Expand Up @@ -316,7 +316,7 @@ def __init__(
if not self._strictly_ordered:
labels["bigframes-mode"] = "unordered"

self._executor: executor.Executor = bq_caching_executor.BigQueryCachingExecutor(
self._executor: executor.Executor = proxy_executor.DualCompilerProxyExecutor(
bqclient=self._clients_provider.bqclient,
bqstoragereadclient=self._clients_provider.bqstoragereadclient,
loader=self._loader,
Expand Down
214 changes: 90 additions & 124 deletions packages/bigframes/bigframes/session/bq_caching_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@
import concurrent.futures
import math
import threading
import uuid
import warnings
from typing import Literal, Mapping, Optional, Sequence, Tuple

import google.api_core.exceptions
import google.cloud.bigquery.job as bq_job
import google.cloud.bigquery.table as bq_table
import google.cloud.bigquery_storage_v1
import google.cloud.exceptions
from google.cloud import bigquery

import bigframes
Expand Down Expand Up @@ -84,10 +81,14 @@ def __init__(
enable_polars_execution: bool = False,
publisher: bigframes.core.events.Publisher,
labels: Mapping[str, str] = {},
compiler_name: Literal["ibis", "sqlglot"] = "ibis",
cache: Optional[execution_cache.ExecutionCache] = None,
):
self.bqclient = bqclient
self.storage_manager = storage_manager
self.cache: execution_cache.ExecutionCache = execution_cache.ExecutionCache()
self.cache: execution_cache.ExecutionCache = (
cache or execution_cache.ExecutionCache()
)
self.metrics = metrics
self.loader = loader
self.bqstoragereadclient = bqstoragereadclient
Expand All @@ -111,6 +112,7 @@ def __init__(
polars_executor.PolarsExecutor(),
)
self._upload_lock = threading.Lock()
self._compiler_name = compiler_name

def to_sql(
self,
Expand All @@ -127,7 +129,10 @@ def to_sql(
else array_value.node
)
node = self._substitute_large_local_sources(node)
compiled = self._compile(node, ordered=ordered)
compiled = compile.compile_sql(
compile.CompileRequest(node, sort_rows=ordered),
compiler_name=self._compiler_name,
)
return compiled.sql

def execute(
Expand Down Expand Up @@ -158,7 +163,11 @@ def execute(
"Ordering and peeking not supported for gbq export"
)
# separate path for export_gbq, as it has all sorts of annoying logic, such as possibly running as dml
result = self._export_gbq(array_value, execution_spec.destination_spec)
result = self._export_gbq(
array_value,
execution_spec.destination_spec,
extra_labels=dict(execution_spec.labels),
)
self._publisher.publish(
bigframes.core.events.ExecutionFinished(
result=result,
Expand All @@ -174,6 +183,7 @@ def execute(
if isinstance(execution_spec.destination_spec, ex_spec.CacheSpec)
else None,
must_create_table=not execution_spec.promise_under_10gb,
extra_labels=dict(execution_spec.labels),
)
# post steps: export
if isinstance(execution_spec.destination_spec, ex_spec.GcsOutputSpec):
Expand Down Expand Up @@ -233,7 +243,10 @@ def _maybe_find_existing_table(
return None

def _export_gbq(
self, array_value: bigframes.core.ArrayValue, spec: ex_spec.TableOutputSpec
self,
array_value: bigframes.core.ArrayValue,
spec: ex_spec.TableOutputSpec,
extra_labels: Mapping[str, str] = {},
) -> executor.ExecuteResult:
"""
Export the ArrayValue to an existing BigQuery table.
Expand All @@ -243,55 +256,48 @@ def _export_gbq(
# validate destination table
existing_table = self._maybe_find_existing_table(spec)

def run_with_compiler(compiler_name, compiler_id=None):
compiled = self._compile(plan, ordered=False, compiler_name=compiler_name)
sql = compiled.sql
compiled = compile.compile_sql(
compile.CompileRequest(plan, sort_rows=False),
compiler_name=self._compiler_name,
)
sql = compiled.sql

if (existing_table is not None) and _is_schema_match(
existing_table.schema, array_value.schema
):
# b/409086472: Uses DML for table appends and replacements to avoid
# BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits:
# https://cloud.google.com/bigquery/quotas#standard_tables
job_config = bigquery.QueryJobConfig()

ir = sqlglot_ir.SQLGlotIR.from_unparsed_query(sql)
if spec.if_exists == "append":
sql = sg_sql.to_sql(
sg_sql.insert(ir.expr.as_select_all(), spec.table)
)
else: # for "replace"
assert spec.if_exists == "replace"
sql = sg_sql.to_sql(
sg_sql.replace(ir.expr.as_select_all(), spec.table)
)
else:
dispositions = {
"fail": bigquery.WriteDisposition.WRITE_EMPTY,
"replace": bigquery.WriteDisposition.WRITE_TRUNCATE,
"append": bigquery.WriteDisposition.WRITE_APPEND,
}
job_config = bigquery.QueryJobConfig(
write_disposition=dispositions[spec.if_exists],
destination=spec.table,
clustering_fields=spec.cluster_cols if spec.cluster_cols else None,
)
if (existing_table is not None) and _is_schema_match(
existing_table.schema, array_value.schema
):
# b/409086472: Uses DML for table appends and replacements to avoid
# BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits:
# https://cloud.google.com/bigquery/quotas#standard_tables
job_config = bigquery.QueryJobConfig()

# Attach data type usage to the job labels
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
job_config.labels["bigframes-compiler"] = (
f"{compiler_name}-{compiler_id}" if compiler_id else compiler_name
)
# TODO(swast): plumb through the api_name of the user-facing api that
# caused this query.
iterator, job = self._run_execute_query(
sql=sql,
job_config=job_config,
session=array_value.session,
ir = sqlglot_ir.SQLGlotIR.from_unparsed_query(sql)
if spec.if_exists == "append":
sql = sg_sql.to_sql(sg_sql.insert(ir.expr.as_select_all(), spec.table))
else: # for "replace"
assert spec.if_exists == "replace"
sql = sg_sql.to_sql(sg_sql.replace(ir.expr.as_select_all(), spec.table))
else:
dispositions = {
"fail": bigquery.WriteDisposition.WRITE_EMPTY,
"replace": bigquery.WriteDisposition.WRITE_TRUNCATE,
"append": bigquery.WriteDisposition.WRITE_APPEND,
}
job_config = bigquery.QueryJobConfig(
write_disposition=dispositions[spec.if_exists],
destination=spec.table,
clustering_fields=spec.cluster_cols if spec.cluster_cols else None,
)
return iterator, job

iterator, job = self._compile_with_fallback(run_with_compiler)
# Attach data type usage to the job labels
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
# TODO(swast): plumb through the api_name of the user-facing api that
# caused this query.
iterator, job = self._run_execute_query(
sql=sql,
job_config=job_config,
session=array_value.session,
extra_labels=extra_labels,
)

has_special_dtype_col = any(
t in (bigframes.dtypes.TIMEDELTA_DTYPE, bigframes.dtypes.OBJ_REF_DTYPE)
Expand Down Expand Up @@ -359,6 +365,7 @@ def _run_execute_query(
job_config: Optional[bq_job.QueryJobConfig] = None,
query_with_job: bool = True,
session=None,
extra_labels: Mapping[str, str] = {},
) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]:
"""
Starts BigQuery query job and waits for results.
Expand All @@ -369,8 +376,9 @@ def _run_execute_query(
bigframes.options.compute.maximum_bytes_billed
)

if self._labels:
if self._labels or extra_labels:
job_config.labels.update(self._labels)
job_config.labels.update(extra_labels)

try:
# Trick the type checker into thinking we got a literal.
Expand Down Expand Up @@ -420,43 +428,6 @@ def _is_trivially_executable(self, array_value: bigframes.core.ArrayValue):
self.prepare_plan(array_value.node)
)

def _compile(
self,
node: nodes.BigFrameNode,
*,
ordered: bool = False,
peek: Optional[int] = None,
materialize_all_order_keys: bool = False,
compiler_name: Literal["sqlglot", "ibis"] = "sqlglot",
) -> compile.CompileResult:
return compile.compile_sql(
compile.CompileRequest(
node,
sort_rows=ordered,
peek_count=peek,
materialize_all_order_keys=materialize_all_order_keys,
),
compiler_name=compiler_name,
)

def _compile_with_fallback(self, run_fn):
compiler_option = bigframes.options.experiments.sql_compiler
if compiler_option == "legacy":
return run_fn("ibis")
elif compiler_option == "experimental":
return run_fn("sqlglot")
else: # stable
compiler_id = f"{uuid.uuid1().hex[:12]}"
try:
return run_fn("sqlglot", compiler_id=compiler_id)
except google.cloud.exceptions.BadRequest as e:
msg = bfe.format_message(
f"Compiler ID {compiler_id}: BadRequest on sqlglot. "
f"Falling back to ibis. Details: {e.message}"
)
warnings.warn(msg, category=UserWarning)
return run_fn("ibis", compiler_id=compiler_id)

def prepare_plan(
self,
plan: nodes.BigFrameNode,
Expand Down Expand Up @@ -547,8 +518,9 @@ def _cache_most_complex_subtree(self, node: nodes.BigFrameNode) -> bool:
max_complexity=QUERY_COMPLEXITY_LIMIT,
cache=self.cache,
# Heuristic: subtree_compleixty * (copies of subtree)^2
heuristic=lambda complexity, count: math.log(complexity)
+ 2 * math.log(count),
heuristic=lambda complexity, count: (
math.log(complexity) + 2 * math.log(count)
),
)
if selection is None:
# No good subtrees to cache, just return original tree
Expand Down Expand Up @@ -621,6 +593,7 @@ def _execute_plan_gbq(
peek: Optional[int] = None,
cache_spec: Optional[ex_spec.CacheSpec] = None,
must_create_table: bool = True,
extra_labels: Mapping[str, str] = {},
) -> executor.ExecuteResult:
"""Just execute whatever plan as is, without further caching or decomposition."""
# TODO(swast): plumb through the api_name of the user-facing api that
Expand Down Expand Up @@ -651,43 +624,36 @@ def _execute_plan_gbq(
]
cluster_cols = cluster_cols[:_MAX_CLUSTER_COLUMNS]

def run_with_compiler(compiler_name, compiler_id=None):
compiled = self._compile(
compiled = compile.compile_sql(
compile.CompileRequest(
plan,
ordered=ordered,
peek=peek,
sort_rows=ordered,
peek_count=peek,
materialize_all_order_keys=(cache_spec is not None),
compiler_name=compiler_name,
)
# might have more columns than og schema, for hidden ordering columns
compiled_schema = compiled.sql_schema

destination_table: Optional[bigquery.TableReference] = None
),
compiler_name=self._compiler_name,
)
# might have more columns than og schema, for hidden ordering columns
compiled_schema = compiled.sql_schema

job_config = bigquery.QueryJobConfig()
if create_table:
destination_table = self.storage_manager.create_temp_table(
compiled_schema, cluster_cols
)
job_config.destination = destination_table
destination_table: Optional[bigquery.TableReference] = None

# Attach data type usage to the job labels
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
job_config.labels["bigframes-compiler"] = (
f"{compiler_name}-{compiler_id}" if compiler_id else compiler_name
job_config = bigquery.QueryJobConfig()
if create_table:
destination_table = self.storage_manager.create_temp_table(
compiled_schema, cluster_cols
)
iterator, query_job = self._run_execute_query(
sql=compiled.sql,
job_config=job_config,
query_with_job=(destination_table is not None),
session=plan.session,
)
return iterator, query_job, compiled

iterator, query_job, compiled = self._compile_with_fallback(run_with_compiler)

# might have more columns than og schema, for hidden ordering columns
compiled_schema = compiled.sql_schema
job_config.destination = destination_table

# Attach data type usage to the job labels
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
iterator, query_job = self._run_execute_query(
sql=compiled.sql,
job_config=job_config,
query_with_job=(destination_table is not None),
session=plan.session,
extra_labels=extra_labels,
)

# we could actually cache even when caching is not explicitly requested, but being conservative for now
result_bq_data = None
Expand Down
7 changes: 6 additions & 1 deletion packages/bigframes/bigframes/session/execution_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from __future__ import annotations

import dataclasses
from typing import Literal, Optional, Union
from typing import Literal, Mapping, Optional, Union

from google.cloud import bigquery

Expand All @@ -30,6 +30,11 @@ class ExecutionSpec:
# This is an optimization flag for gbq execution, it doesn't change semantics, but if promise is falsely made, errors may occur
promise_under_10gb: bool = False

labels: tuple[tuple[str, str], ...] = ()

def add_labels(self, labels: Mapping[str, str]) -> ExecutionSpec:
return dataclasses.replace(self, labels=self.labels + tuple(labels.items()))


# This one is temporary, in future, caching will not be done through immediate execution, but will label nodes
# that will be cached only when a super-tree is executed
Expand Down
Loading
Loading