diff --git a/gigl/analytics/README.md b/gigl/analytics/README.md new file mode 100644 index 000000000..25ee7b3e5 --- /dev/null +++ b/gigl/analytics/README.md @@ -0,0 +1,188 @@ +# GiGL Analytics + +Pre-training graph data validation and analysis tooling. Use this module before committing to a GNN training run to +catch data quality and structural issues that silently degrade model quality. + +Two subpackages: + +- [`data_analyzer/`](data_analyzer/) — end-to-end `DataAnalyzer` that runs BigQuery checks and produces a single + self-contained HTML report. **Start here.** +- [`graph_validation/`](graph_validation/) — lightweight standalone validators (currently: `BQGraphValidator` for + dangling-edge checks). Use when you only need one check and not the full report. + +## Quickstart + +**Prerequisites.** Follow the [GiGL installation guide](../../docs/user_guide/getting_started/installation.md) so that +`uv` and GiGL's Python dependencies are available. Then authenticate to BigQuery: + +```bash +gcloud auth application-default login +``` + +**1. Write a YAML config.** Save as `my_analyzer_config.yaml`: + +```yaml +node_tables: + - bq_table: "your-project.your_dataset.user_nodes" + node_type: "user" + id_column: "user_id" + feature_columns: ["age", "country"] # optional; [] or omit if the node has no features + # label_column: "label" # optional; enables Tier 3 label checks + +edge_tables: + - bq_table: "your-project.your_dataset.user_edges" + edge_type: "follows" + src_id_column: "src_user_id" + dst_id_column: "dst_user_id" + +# Where to write the HTML report. Local path for quick iteration, or a gs:// URI. +output_gcs_path: "/tmp/my_analysis/" + +# Optional: sizing for the neighbor-explosion estimate (fan-out per GNN layer). +fan_out: [15, 10, 5] +``` + +**2. Run the analyzer.** + +```bash +uv run python -m gigl.analytics.data_analyzer \ + --analyzer_config_uri my_analyzer_config.yaml +``` + +**3. Open the report.** When the run completes: + +``` +[INFO] Report written to /tmp/my_analysis/report.html +``` + +Open the file in any browser. No server, no external dependencies, fully offline. + +## What it checks + +The analyzer organizes checks into four tiers. Tiers 1 and 2 always run; Tier 3 auto-enables when your config supports +it; Tier 4 is opt-in. + +| Tier | When | What it checks | +| ---------------------------- | ------------------------------------------------------------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| **1. Hard fails** | Always | Dangling edges (NULL src/dst), referential integrity (edges pointing to nodes not in the node table), duplicate nodes. Raises `DataQualityError` — the report still renders to show partial results. | +| **2. Core metrics** | Always | Node/edge counts, degree distribution (in/out) with percentiles, degree buckets, top-K hubs, super-hub int16 clamp count, cold-start node count, self-loops, duplicate edges, NULL rates per column, feature memory budget estimate, neighbor-explosion estimate (requires `fan_out`). | +| **3. Label + heterogeneous** | Auto when `label_column` is set on any node table, or when multiple edge types exist | Class imbalance, label coverage, edge type distribution, per-edge-type node coverage. | +| **4. Advanced** | Opt-in via config flags | Power-law exponent (implemented as a degree-stats approximation). Reciprocity, homophily, connected components, clustering coefficient are **not yet implemented** — the flags are accepted but currently no-op. | + +The thresholds below come from a review of production GNN papers (PinSage, BLADE, LiGNN, TwHIN, AliGraph, GraphSMOTE, +Beyond Homophily, Feature Propagation, and others). See the inline citations in the threshold table for what each paper +contributes. + +## Interpreting the report + +The report color-codes every numeric finding. Summary of the most important thresholds: + +| Metric | Green | Yellow | Red | What to do when yellow/red | +| -------------------------------------------------------- | ----- | ---------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| Dangling edges / referential integrity / duplicate nodes | 0 | — | any > 0 | Fix the input tables. Training will fail or silently corrupt otherwise. | +| Feature missing rate | < 10% | 10–50% | > 90% | Plan an imputation strategy; above ~95% the Feature Propagation phase transition (Rossi et al., ICLR 2022) hits and GNNs stop recovering signal reliably. | +| Isolated node fraction | < 1% | 1–5% | > 5% | Filter isolated nodes or densify (LiGNN, KDD 2024) for cold-start cohorts. | +| Cold-start fraction (degree 0–1) | < 5% | 5–10% | > 10% | Candidates for graph densification; also flag for special handling at serving time. | +| Super-hub int16 clamp (degree > 32,767) | 0 | — | any > 0 | GiGL silently truncates super-hub degrees in `gigl/distributed/utils/degree.py`. Either cap the hub's edges upstream or plan to address the clamp. | +| Degree p99 / median | < 50 | 50–100 | > 100 | Use importance sampling (PinSage, KDD 2018) or degree-adaptive neighborhoods (BLADE, WSDM 2023) — degree skew is the single biggest lever in production GNNs. | +| Class imbalance ratio | < 1:5 | 1:5 – 1:10 | > 1:10 | Message passing amplifies label imbalance 2–3× in representation space (GraphSMOTE, WSDM 2021). Consider resampling or GraphSMOTE-style synthetic nodes. | +| Edge homophily (Tier 4, future) | > 0.7 | 0.3 – 0.7 | < 0.3 | Standard GCN/GAT fail at low h (Zhu et al., NeurIPS 2020). Consider H2GCN-style architectures; below h ≈ 0.2 a plain MLP often wins. | + +## Advanced config + +Optional YAML keys beyond the minimal quickstart: + +```yaml +# Enable Tier 3 class-imbalance + label-coverage checks for a node type: +node_tables: + - bq_table: ... + label_column: "label" + +# Neighbor explosion estimation — the fan-out per GNN layer you plan to train with: +fan_out: [15, 10, 5] + +# Tier 4 opt-in flags. Default false. +# NOTE: Only `compute_reciprocity` is wired into the analyzer today and it logs a +# warning rather than computing a result. The other three flags are placeholders +# for future work (see "Scope and limitations" below). +compute_reciprocity: true +compute_homophily: true +compute_connected_components: true +compute_clustering: true + +# Per-edge-type timestamp hint. NOTE: accepted by the config schema but not yet +# consumed by any Tier 4 query (temporal freshness check is planned). +edge_tables: + - bq_table: ... + timestamp_column: "created_at" +``` + +## Python API + +The CLI wraps a regular class. Call from your own code when you want programmatic access to the `GraphAnalysisResult`: + +```python +from gigl.analytics.data_analyzer import DataAnalyzer +from gigl.analytics.data_analyzer.config import load_analyzer_config + +config = load_analyzer_config("my_analyzer_config.yaml") +analyzer = DataAnalyzer() +report_path = analyzer.run(config=config) +# report_path points to the written report.html (local path or gs:// URI) +``` + +The underlying `GraphStructureAnalyzer` is also callable directly if you want the raw result dataclass and no HTML: + +```python +from gigl.analytics.data_analyzer.graph_structure_analyzer import GraphStructureAnalyzer + +result = GraphStructureAnalyzer().analyze(config) +print(result.degree_stats) +``` + +See a rendered report example at +[`tests/test_assets/analytics/golden_report.html`](../../tests/test_assets/analytics/golden_report.html) to preview the +output format before authenticating to BQ. + +## graph_validation + +One-off validators for the subset of cases where the full analyzer is overkill. Today the only check is dangling-edge +detection: + +```python +from gigl.analytics.graph_validation import BQGraphValidator + +has_dangling = BQGraphValidator.does_edge_table_have_dangling_edges( + edge_table="your-project.your_dataset.user_edges", + src_node_column_name="src_user_id", + dst_node_column_name="dst_user_id", +) +``` + +The `DataAnalyzer` runs this check (and many more) as part of Tier 1, so prefer the full analyzer unless you +specifically need a one-line gate (e.g., inside an Airflow task or a preprocessing job). This subpackage is the intended +home for additional standalone validators in the future. + +## Scope and limitations + +Current implementation status: + +- **FeatureProfiler is a stub.** The class is wired in but the TFDV/Dataflow pipeline that would produce FACETS HTML per + table is deferred to a follow-up PR. Calling it today logs a warning and returns an empty `FeatureProfileResult`. The + main report is fully functional without it. +- **Tier 4 checks are partial.** Power-law exponent is computed as a degree-stats approximation. Reciprocity, homophily, + connected components, and clustering coefficient config flags are accepted but currently no-op. The `timestamp_column` + edge field is accepted but no temporal-freshness query runs yet. +- **Heterogeneous graphs: referential integrity caveat.** For each edge table, the referential-integrity check joins + against `config.node_tables[0]`. On heterogeneous graphs where different edges reference different node types, the + current implementation will under-report integrity violations — fix is tracked for a follow-up. +- **GCS upload** works via `GcsUtils.upload_from_string` when `output_gcs_path` is a `gs://` URI, and falls back to + local filesystem write otherwise. + +## Related documents + +Within this module: + +- [`data_analyzer/report/PRD.md`](data_analyzer/report/PRD.md) — product intent for the HTML report (AI-owned) +- [`data_analyzer/report/SPEC.md`](data_analyzer/report/SPEC.md) — technical contract for the AI-owned HTML/JS/CSS + assets diff --git a/gigl/analytics/data_analyzer/__init__.py b/gigl/analytics/data_analyzer/__init__.py new file mode 100644 index 000000000..45304dacc --- /dev/null +++ b/gigl/analytics/data_analyzer/__init__.py @@ -0,0 +1,10 @@ +""" +BQ Data Analyzer for pre-training graph data analysis. + +Produces a single HTML report covering data quality, feature distributions, +and graph structure metrics from BigQuery node/edge tables. +""" + +from gigl.analytics.data_analyzer.data_analyzer import DataAnalyzer + +__all__ = ["DataAnalyzer"] diff --git a/gigl/analytics/data_analyzer/__main__.py b/gigl/analytics/data_analyzer/__main__.py new file mode 100644 index 000000000..693551d33 --- /dev/null +++ b/gigl/analytics/data_analyzer/__main__.py @@ -0,0 +1,6 @@ +"""Entry point for running the BQ Data Analyzer as a module: python -m gigl.analytics.data_analyzer.""" + +from gigl.analytics.data_analyzer.data_analyzer import main + +if __name__ == "__main__": + main() diff --git a/gigl/analytics/data_analyzer/config.py b/gigl/analytics/data_analyzer/config.py new file mode 100644 index 000000000..c892edb0f --- /dev/null +++ b/gigl/analytics/data_analyzer/config.py @@ -0,0 +1,177 @@ +import re +from dataclasses import dataclass, field +from typing import Optional + +from omegaconf import MISSING, OmegaConf + +from gigl.common.logger import Logger + +logger = Logger() + +# BigQuery identifier regexes used to reject configs that would be interpolated +# directly into SQL. See https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical +# for the allowed grammar. Tables are of the form project.dataset.table; +# columns are simple unquoted identifiers. +_BQ_TABLE_REGEX = re.compile(r"^[A-Za-z0-9_.\-]+\.[A-Za-z0-9_\-]+\.[A-Za-z0-9_$\-]+$") +_BQ_COLUMN_REGEX = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") + + +def _validate_bq_table(name: str, field_label: str) -> None: + if not _BQ_TABLE_REGEX.fullmatch(name): + raise ValueError( + f"{field_label}={name!r} is not a valid BigQuery table reference. " + f"Expected project.dataset.table with no backticks, whitespace, or quotes." + ) + + +def _validate_bq_column(name: str, field_label: str) -> None: + if not _BQ_COLUMN_REGEX.fullmatch(name): + raise ValueError( + f"{field_label}={name!r} is not a valid BigQuery column identifier. " + f"Expected [A-Za-z_][A-Za-z0-9_]* with no backticks, whitespace, or quotes." + ) + + +@dataclass +class NodeTableSpec: + """Specification for a node table in BigQuery.""" + + bq_table: str = MISSING + node_type: str = MISSING + id_column: str = MISSING + feature_columns: list[str] = field(default_factory=list) + label_column: Optional[str] = None + + +@dataclass +class EdgeTableSpec: + """Specification for an edge table in BigQuery. + + For heterogeneous graphs (more than one node table), src_node_type and + dst_node_type must be set to the node_type of the matching node table. + For homogeneous graphs (single node table) they default to that node_type. + """ + + bq_table: str = MISSING + edge_type: str = MISSING + src_id_column: str = MISSING + dst_id_column: str = MISSING + src_node_type: Optional[str] = None + dst_node_type: Optional[str] = None + feature_columns: list[str] = field(default_factory=list) + timestamp_column: Optional[str] = None + + +@dataclass +class DataAnalyzerConfig: + """Configuration for the BQ Data Analyzer. + + Parsed from YAML via OmegaConf. + + Example: + >>> config = load_analyzer_config("gs://bucket/config.yaml") + >>> config.node_tables[0].bq_table + 'project.dataset.user_nodes' + """ + + node_tables: list[NodeTableSpec] = MISSING + edge_tables: list[EdgeTableSpec] = MISSING + output_gcs_path: str = MISSING + fan_out: Optional[list[int]] = None + compute_reciprocity: bool = False + compute_homophily: bool = False + compute_connected_components: bool = False + compute_clustering: bool = False + + +def _validate_and_backfill(config: DataAnalyzerConfig) -> None: + """Run identifier validation and backfill default node-type references. + + - Every bq_table must match project.dataset.table. + - Every id_column / src_id_column / dst_id_column / feature_column / + label_column / timestamp_column must be a bare BQ identifier. + - For homogeneous configs, an edge table with no src_node_type / + dst_node_type inherits the single node table's node_type. + - For heterogeneous configs, every edge table must explicitly declare + src_node_type and dst_node_type, and both must resolve to a known + node_type. + """ + known_node_types = {nt.node_type for nt in config.node_tables} + single_node_type: Optional[str] = ( + next(iter(known_node_types)) if len(config.node_tables) == 1 else None + ) + + for node_table in config.node_tables: + _validate_bq_table(node_table.bq_table, "node_tables.bq_table") + _validate_bq_column(node_table.id_column, "node_tables.id_column") + for col in node_table.feature_columns: + _validate_bq_column(col, "node_tables.feature_columns") + if node_table.label_column is not None: + _validate_bq_column(node_table.label_column, "node_tables.label_column") + + for edge_table in config.edge_tables: + _validate_bq_table(edge_table.bq_table, "edge_tables.bq_table") + _validate_bq_column(edge_table.src_id_column, "edge_tables.src_id_column") + _validate_bq_column(edge_table.dst_id_column, "edge_tables.dst_id_column") + for col in edge_table.feature_columns: + _validate_bq_column(col, "edge_tables.feature_columns") + if edge_table.timestamp_column is not None: + _validate_bq_column( + edge_table.timestamp_column, "edge_tables.timestamp_column" + ) + + if edge_table.src_node_type is None: + if single_node_type is not None: + edge_table.src_node_type = single_node_type + else: + raise ValueError( + f"edge_type={edge_table.edge_type}: src_node_type is required " + f"when there are multiple node tables" + ) + if edge_table.dst_node_type is None: + if single_node_type is not None: + edge_table.dst_node_type = single_node_type + else: + raise ValueError( + f"edge_type={edge_table.edge_type}: dst_node_type is required " + f"when there are multiple node tables" + ) + if edge_table.src_node_type not in known_node_types: + raise ValueError( + f"edge_type={edge_table.edge_type}: src_node_type=" + f"{edge_table.src_node_type!r} is not a declared node_type. " + f"Known: {sorted(known_node_types)}" + ) + if edge_table.dst_node_type not in known_node_types: + raise ValueError( + f"edge_type={edge_table.edge_type}: dst_node_type=" + f"{edge_table.dst_node_type!r} is not a declared node_type. " + f"Known: {sorted(known_node_types)}" + ) + + +def load_analyzer_config(config_path: str) -> DataAnalyzerConfig: + """Load and validate a DataAnalyzerConfig from a YAML file. + + Args: + config_path: Local file path or GCS URI to the YAML config. + + Returns: + Validated DataAnalyzerConfig instance with node-type references + backfilled on edge tables. + + Raises: + omegaconf.errors.MissingMandatoryValue: If required fields are missing. + ValueError: If any bq_table or column name is not a valid BigQuery + identifier, or if a heterogeneous config is missing a required + src_node_type / dst_node_type. + """ + raw = OmegaConf.load(config_path) + merged = OmegaConf.merge(OmegaConf.structured(DataAnalyzerConfig), raw) + config: DataAnalyzerConfig = OmegaConf.to_object(merged) # type: ignore + _validate_and_backfill(config) + logger.info( + f"Loaded analyzer config with {len(config.node_tables)} node tables " + f"and {len(config.edge_tables)} edge tables" + ) + return config diff --git a/gigl/analytics/data_analyzer/data_analyzer.py b/gigl/analytics/data_analyzer/data_analyzer.py new file mode 100644 index 000000000..f8062fa56 --- /dev/null +++ b/gigl/analytics/data_analyzer/data_analyzer.py @@ -0,0 +1,137 @@ +"""Main orchestrator and CLI entry point for the BQ Data Analyzer.""" +import argparse +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Optional + +from gigl.analytics.data_analyzer.config import DataAnalyzerConfig, load_analyzer_config +from gigl.analytics.data_analyzer.feature_profiler import FeatureProfiler +from gigl.analytics.data_analyzer.graph_structure_analyzer import ( + DataQualityError, + GraphStructureAnalyzer, +) +from gigl.analytics.data_analyzer.report.report_generator import generate_report +from gigl.analytics.data_analyzer.types import FeatureProfileResult, GraphAnalysisResult +from gigl.common import GcsUri, Uri, UriFactory +from gigl.common.logger import Logger +from gigl.common.utils.gcs import GcsUtils + +logger = Logger() + + +def _write_report(html: str, output_gcs_path: str) -> str: + """Write the HTML report to a GCS URI or local path. + + Args: + html: Rendered HTML string. + output_gcs_path: Output directory. If it starts with ``gs://`` the + report is uploaded via ``GcsUtils``. Otherwise it is written to + the local filesystem (the directory is created if missing). + + Returns: + The full path to the written ``report.html`` file. + """ + trimmed = output_gcs_path.rstrip("/") + report_path = f"{trimmed}/report.html" + if trimmed.startswith("gs://"): + GcsUtils().upload_from_string(GcsUri(report_path), html) + else: + local_path = Path(report_path).expanduser().resolve() + local_path.parent.mkdir(parents=True, exist_ok=True) + local_path.write_text(html) + report_path = str(local_path) + return report_path + + +class DataAnalyzer: + """Orchestrates graph structure analysis, feature profiling, and report generation. + + Example: + >>> from gigl.analytics.data_analyzer.config import load_analyzer_config + >>> config = load_analyzer_config("gs://bucket/config.yaml") + >>> analyzer = DataAnalyzer() + >>> report_path = analyzer.run(config=config) + """ + + def run( + self, + config: DataAnalyzerConfig, + resource_config_uri: Optional[Uri] = None, + ) -> str: + """Run the full analysis pipeline and write an HTML report. + + The report is written to ``{config.output_gcs_path}/report.html`` via + ``GcsUtils`` when the output path is a ``gs://`` URI, or to the local + filesystem otherwise (the parent directory is created if missing). + + Args: + config: Analyzer configuration. + resource_config_uri: Optional resource config for Dataflow sizing. + + Returns: + The path to the written ``report.html`` (GCS URI or local path). + """ + structure_analyzer = GraphStructureAnalyzer() + feature_profiler = FeatureProfiler() + + with ThreadPoolExecutor(max_workers=2) as executor: + structure_future = executor.submit(structure_analyzer.analyze, config) + profile_future = executor.submit( + feature_profiler.profile, config, resource_config_uri + ) + + analysis_result: GraphAnalysisResult + try: + analysis_result = structure_future.result() + except DataQualityError as e: + logger.error(f"Tier 1 data quality failure: {e}") + analysis_result = e.partial_result + + profile_result: FeatureProfileResult + try: + profile_result = profile_future.result() + except Exception as e: + logger.exception(f"Feature profiler failed: {e}") + profile_result = FeatureProfileResult() + + html = generate_report( + analysis_result=analysis_result, + profile_result=profile_result, + config=config, + ) + + report_path = _write_report(html, config.output_gcs_path) + logger.info(f"Report written to {report_path}") + return report_path + + +def main() -> None: + """CLI entry point for the BQ Data Analyzer.""" + parser = argparse.ArgumentParser( + description="BQ Data Analyzer: analyze graph data in BigQuery before GNN training" + ) + parser.add_argument( + "--analyzer_config_uri", + required=True, + help="Path or GCS URI to the analyzer YAML config", + ) + parser.add_argument( + "--resource_config_uri", + required=False, + help="Path or GCS URI to the resource config for Dataflow sizing", + ) + args = parser.parse_args() + + config = load_analyzer_config(args.analyzer_config_uri) + resource_config_uri: Optional[Uri] = ( + UriFactory.create_uri(args.resource_config_uri) + if args.resource_config_uri + else None + ) + analyzer = DataAnalyzer() + report_path = analyzer.run(config=config, resource_config_uri=resource_config_uri) + logger.info(f"Report generated at: {report_path}") + + +if __name__ == "__main__": + main() diff --git a/gigl/analytics/data_analyzer/feature_profiler.py b/gigl/analytics/data_analyzer/feature_profiler.py new file mode 100644 index 000000000..e1227ac08 --- /dev/null +++ b/gigl/analytics/data_analyzer/feature_profiler.py @@ -0,0 +1,189 @@ +"""TFDV feature profiling via Beam/Dataflow. + +Launches one Dataflow pipeline per (node or edge) table that declares +``feature_columns`` in the analyzer config. Each pipeline reads the +selected columns from BigQuery, emits ``pa.RecordBatch`` batches, and +runs ``tfdv.GenerateStatistics`` to write a Facets HTML visualization +plus a TFDV stats TFRecord to GCS. + +Pipelines are launched concurrently using an internal +``ThreadPoolExecutor``; each worker blocks on +``p.run().wait_until_finish()`` for its table. Per-table exceptions are +logged and the failed table is omitted from the returned +``FeatureProfileResult`` - callers (and the HTML report) already handle +missing keys. +""" +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass +from typing import Optional + +import apache_beam as beam + +from gigl.analytics.data_analyzer.config import DataAnalyzerConfig +from gigl.analytics.data_analyzer.types import FeatureProfileResult +from gigl.common import Uri, UriFactory +from gigl.common.beam.tfdv_transforms import ( + BqTableToRecordBatch, + GenerateAndVisualizeStats, +) +from gigl.common.logger import Logger +from gigl.env.pipelines_config import get_resource_config +from gigl.src.common.constants.components import GiGLComponents +from gigl.src.common.types import AppliedTaskIdentifier +from gigl.src.common.utils.dataflow import init_beam_pipeline_options + +logger = Logger() + +_PARALLEL_DATAFLOW_WORKERS = 10 +_APPLIED_TASK_IDENTIFIER = AppliedTaskIdentifier("data-analyzer") + + +@dataclass(frozen=True) +class _ProfileTask: + """One profiling unit: all features of a single node or edge table. + + ``kind`` is ``"node"`` or ``"edge"`` (singular) and is used to build + the GCS output path and the result key (``"node:user"``, etc.). + """ + + kind: str + type_name: str + bq_table: str + feature_columns: list[str] + + @property + def result_key(self) -> str: + return f"{self.kind}:{self.type_name}" + + +class FeatureProfiler: + """Runs TFDV feature profiling on BQ tables via Dataflow. + + Example: + >>> profiler = FeatureProfiler() + >>> result = profiler.profile(config, resource_config_uri=uri) + >>> result.facets_html_paths["node:user"] + 'gs://bucket/analyzer/feature_profiler/nodes/user/facets.html' + """ + + def profile( + self, + config: DataAnalyzerConfig, + resource_config_uri: Optional[Uri] = None, + ) -> FeatureProfileResult: + """Run TFDV profiling on all tables with declared feature columns. + + Launches one Dataflow pipeline per table concurrently. Tables with + no ``feature_columns`` are skipped. Per-table failures are logged + and omitted from the result. + + Args: + config: Analyzer configuration with node and edge table specs. + resource_config_uri: Resource config for Dataflow sizing. + Required - TFDV profiling needs Dataflow. + + Returns: + ``FeatureProfileResult`` with GCS paths keyed by + ``"node:{type}"`` / ``"edge:{type}"``. Empty if no tables + declared feature columns. + + Raises: + ValueError: If ``resource_config_uri`` is None. + """ + if resource_config_uri is None: + raise ValueError( + "FeatureProfiler requires a resource_config_uri for Dataflow sizing. " + "Pass --resource_config_uri when invoking the DataAnalyzer CLI." + ) + # Eagerly populate the process-global resource config so that + # `init_beam_pipeline_options` (called on worker threads below) + # can resolve it without args. + get_resource_config(resource_config_uri=resource_config_uri) + + tasks = _collect_profile_tasks(config) + if not tasks: + logger.info("No tables declared feature_columns; returning empty result.") + return FeatureProfileResult() + + logger.info(f"Launching {len(tasks)} Dataflow feature-profile job(s).") + result = FeatureProfileResult() + with ThreadPoolExecutor(max_workers=_PARALLEL_DATAFLOW_WORKERS) as executor: + future_to_task = { + executor.submit( + self._run_single_pipeline, task, config.output_gcs_path + ): task + for task in tasks + } + for future in as_completed(future_to_task): + task = future_to_task[future] + try: + facets_uri, stats_uri = future.result() + result.facets_html_paths[task.result_key] = facets_uri + result.stats_paths[task.result_key] = stats_uri + except Exception as exc: + logger.exception( + f"Feature profiling failed for {task.result_key} " + f"(table={task.bq_table}): {exc}" + ) + return result + + def _run_single_pipeline( + self, task: _ProfileTask, output_gcs_path: str + ) -> tuple[str, str]: + """Build, run, and block on a single table's Dataflow pipeline. + + Returns the ``(facets_uri, stats_uri)`` strings on success. + """ + base = f"{output_gcs_path.rstrip('/')}/feature_profiler/{task.kind}s/{task.type_name}" + facets_uri = UriFactory.create_uri(f"{base}/facets.html") + stats_uri = UriFactory.create_uri(f"{base}/stats.tfrecord") + + options = init_beam_pipeline_options( + applied_task_identifier=_APPLIED_TASK_IDENTIFIER, + job_name_suffix=f"profile-{task.kind}-{task.type_name}", + component=GiGLComponents.DataAnalyzer, + ) + with beam.Pipeline(options=options) as p: + _ = ( + p + | f"Read {task.result_key} from BQ" + >> BqTableToRecordBatch( + bq_table=task.bq_table, + feature_columns=task.feature_columns, + ) + | f"Generate TFDV stats for {task.result_key}" + >> GenerateAndVisualizeStats( + facets_report_uri=facets_uri, + stats_output_uri=stats_uri, + ) + ) + logger.info(f"Finished feature profiling for {task.result_key}.") + return facets_uri.uri, stats_uri.uri + + +def _collect_profile_tasks(config: DataAnalyzerConfig) -> list[_ProfileTask]: + """Flatten the analyzer config into one ``_ProfileTask`` per table that + has non-empty ``feature_columns``. Tables without features are skipped. + """ + tasks: list[_ProfileTask] = [] + for node_table in config.node_tables: + if node_table.feature_columns: + tasks.append( + _ProfileTask( + kind="node", + type_name=node_table.node_type, + bq_table=node_table.bq_table, + feature_columns=list(node_table.feature_columns), + ) + ) + for edge_table in config.edge_tables: + if edge_table.feature_columns: + tasks.append( + _ProfileTask( + kind="edge", + type_name=edge_table.edge_type, + bq_table=edge_table.bq_table, + feature_columns=list(edge_table.feature_columns), + ) + ) + return tasks diff --git a/gigl/analytics/data_analyzer/graph_structure_analyzer.py b/gigl/analytics/data_analyzer/graph_structure_analyzer.py new file mode 100644 index 000000000..b8a97086f --- /dev/null +++ b/gigl/analytics/data_analyzer/graph_structure_analyzer.py @@ -0,0 +1,561 @@ +"""GraphStructureAnalyzer: 4-tier BigQuery-based graph data quality checks. + +Tier 1 (hard fails) + dangling edges, referential integrity, duplicate nodes. Any violation + raises DataQualityError with a partially populated GraphAnalysisResult. + +Tier 2 (core metrics) + node/edge counts, degree distribution, top-K hubs, INT16 clamp hazards, + isolated/cold-start nodes, duplicate edges, self-loops, NULL rates, and + two Python-side computations (feature memory budget, neighbor explosion). + +Tier 3 (label and heterogeneous) + class imbalance and label coverage (auto-enabled when node_tables have a + label_column); edge-type distribution and per-edge-type node coverage + (auto-enabled when more than one edge table is declared). + +Tier 4 (opt-in) + reciprocity, power-law exponent estimate. Gated by config flags. +""" + +import math +from concurrent.futures import ThreadPoolExecutor +from typing import Optional + +from gigl.analytics.data_analyzer.config import ( + DataAnalyzerConfig, + EdgeTableSpec, + NodeTableSpec, +) +from gigl.analytics.data_analyzer.queries import ( + CLASS_IMBALANCE_QUERY, + COLD_START_NODE_COUNT_QUERY, + DANGLING_EDGES_QUERY, + DEGREE_BUCKET_QUERY, + DEGREE_DISTRIBUTION_QUERY, + DUPLICATE_EDGE_COUNT_QUERY, + DUPLICATE_NODE_COUNT_QUERY, + EDGE_COUNT_QUERY, + EDGE_REFERENTIAL_INTEGRITY_QUERY, + EDGE_TYPE_DISTRIBUTION_QUERY, + EDGE_TYPE_NODE_COVERAGE_QUERY, + ISOLATED_NODE_COUNT_QUERY, + LABEL_COVERAGE_QUERY, + NODE_COUNT_QUERY, + SELF_LOOP_COUNT_QUERY, + SUPER_HUB_INT16_CLAMP_QUERY, + TOP_K_HUBS_QUERY, + build_null_rates_query, +) +from gigl.analytics.data_analyzer.types import DegreeStats, GraphAnalysisResult +from gigl.common.logger import Logger +from gigl.src.common.utils.bq import BqUtils + +logger = Logger() + +# Default assumption for feature memory budget: float64 per feature column. +_BYTES_PER_FEATURE = 8 +_TOP_K_HUBS = 20 +_PARALLEL_BQ_WORKERS = 10 + + +class DataQualityError(Exception): + """Raised when Tier 1 hard-fail checks detect data quality violations. + + Carries a partially populated GraphAnalysisResult so callers can inspect + which specific checks failed without re-running the analyzer. + """ + + def __init__(self, message: str, partial_result: GraphAnalysisResult) -> None: + super().__init__(message) + self.partial_result = partial_result + + +class GraphStructureAnalyzer: + """Runs BigQuery SQL checks across 4 tiers against the tables declared in a config. + + Example: + >>> config = load_analyzer_config("gs://bucket/config.yaml") + >>> analyzer = GraphStructureAnalyzer() + >>> result = analyzer.analyze(config) + >>> result.node_counts["user"] + 1000000 + + Tier 1 is blocking: a violation raises DataQualityError before Tiers 2-4 run. + Tiers 2-4 are aggregated best-effort into a single GraphAnalysisResult. + """ + + def __init__(self, bq_project: Optional[str] = None) -> None: + self._bq_utils = BqUtils(project=bq_project) + + def analyze(self, config: DataAnalyzerConfig) -> GraphAnalysisResult: + """Run all applicable tiers and return aggregated results. + + Args: + config: Data analyzer configuration declaring node and edge tables + plus any opt-in expensive checks (reciprocity, etc.). + + Returns: + GraphAnalysisResult with tier 1-4 fields populated per config. + + Raises: + DataQualityError: If tier 1 checks find any violations. The + exception carries a partial result with the specific counts. + """ + result = GraphAnalysisResult() + logger.info("Starting graph structure analysis (Tier 1: hard fails)") + self._run_tier1(config, result) + + logger.info("Tier 1 passed. Running Tier 2 (core metrics)") + self._run_tier2(config, result) + + logger.info("Running Tier 3 (label / heterogeneous)") + self._run_tier3(config, result) + + logger.info("Running Tier 4 (opt-in)") + self._run_tier4(config, result) + return result + + # ------------------------------------------------------------------ # + # Tier 1: hard fails # + # ------------------------------------------------------------------ # + + def _run_tier1( + self, config: DataAnalyzerConfig, result: GraphAnalysisResult + ) -> None: + """Run all tier 1 checks; raise DataQualityError on any violation.""" + violations: list[str] = [] + node_tables_by_type = {nt.node_type: nt for nt in config.node_tables} + + # Duplicate nodes (per node table). + for node_table in config.node_tables: + query = DUPLICATE_NODE_COUNT_QUERY.format( + table=node_table.bq_table, id_column=node_table.id_column + ) + count = self._query_scalar(query, "duplicate_count") + result.duplicate_node_counts[node_table.node_type] = count + if count > 0: + violations.append( + f"node_type={node_table.node_type} has {count} duplicate IDs" + ) + + # Dangling edges and referential integrity (per edge table). + for edge_table in config.edge_tables: + dangling_query = DANGLING_EDGES_QUERY.format( + table=edge_table.bq_table, + src_id_column=edge_table.src_id_column, + dst_id_column=edge_table.dst_id_column, + ) + dangling = self._query_scalar(dangling_query, "dangling_count") + result.dangling_edge_counts[edge_table.edge_type] = dangling + if dangling > 0: + violations.append( + f"edge_type={edge_table.edge_type} has {dangling} dangling edges" + ) + + # Referential integrity: src and dst can resolve to different node + # tables on heterogeneous graphs. `load_analyzer_config` guarantees + # src_node_type / dst_node_type are populated and known. + if not config.node_tables: + continue + assert edge_table.src_node_type is not None, ( + f"edge_type={edge_table.edge_type} has no src_node_type; " + "load the config via load_analyzer_config to backfill it." + ) + assert edge_table.dst_node_type is not None, ( + f"edge_type={edge_table.edge_type} has no dst_node_type; " + "load the config via load_analyzer_config to backfill it." + ) + src_node_table = node_tables_by_type[edge_table.src_node_type] + dst_node_table = node_tables_by_type[edge_table.dst_node_type] + ref_query = EDGE_REFERENTIAL_INTEGRITY_QUERY.format( + edge_table=edge_table.bq_table, + src_node_table=src_node_table.bq_table, + dst_node_table=dst_node_table.bq_table, + src_id_column=edge_table.src_id_column, + dst_id_column=edge_table.dst_id_column, + src_node_id_column=src_node_table.id_column, + dst_node_id_column=dst_node_table.id_column, + ) + rows = list(self._bq_utils.run_query(query=ref_query, labels={})) + if len(rows) != 1: + raise RuntimeError( + f"Referential integrity query expected exactly 1 row; " + f"got {len(rows)}. Query: {ref_query.strip()[:200]}" + ) + missing_src = int(rows[0]["missing_src_count"] or 0) + missing_dst = int(rows[0]["missing_dst_count"] or 0) + total_missing = missing_src + missing_dst + result.referential_integrity_violations[ + edge_table.edge_type + ] = total_missing + if total_missing > 0: + violations.append( + f"edge_type={edge_table.edge_type} has {total_missing} " + "referential integrity violations" + ) + + if violations: + msg = "Tier 1 data quality violations detected:\n - " + "\n - ".join( + violations + ) + logger.error(msg) + raise DataQualityError(msg, partial_result=result) + + # ------------------------------------------------------------------ # + # Tier 2: core metrics # + # ------------------------------------------------------------------ # + + def _run_tier2( + self, config: DataAnalyzerConfig, result: GraphAnalysisResult + ) -> None: + """Collect core structural metrics, fanning out BQ jobs in parallel. + + Edge-level metrics are computed from the src-side perspective: + isolated/cold-start joins pair each edge with its src_node_type's + table. Hetero dst-perspective coverage is exposed separately via + Tier 3 edge_type_node_coverage. + + BQ jobs are I/O-bound so ThreadPoolExecutor is used. Each worker + writes to distinct keys of the shared `result` dict (one key per + node_type / edge_type), so no lock is required under CPython's GIL. + """ + node_tables_by_type = {nt.node_type: nt for nt in config.node_tables} + + with ThreadPoolExecutor(max_workers=_PARALLEL_BQ_WORKERS) as executor: + futures = [] + for node_table in config.node_tables: + futures.append( + executor.submit(self._tier2_node_metrics, node_table, result) + ) + for edge_table in config.edge_tables: + src_node_table = node_tables_by_type.get(edge_table.src_node_type or "") + futures.append( + executor.submit( + self._tier2_edge_metrics, edge_table, src_node_table, result + ) + ) + for future in futures: + future.result() # re-raise any exception + + # Python-side computations run after all BQ data is collected. + self._compute_feature_memory_budget(config, result) + self._compute_neighbor_explosion_estimate(config, result) + + def _tier2_node_metrics( + self, node_table: NodeTableSpec, result: GraphAnalysisResult + ) -> None: + node_count = self._query_scalar( + NODE_COUNT_QUERY.format(table=node_table.bq_table), "node_count" + ) + result.node_counts[node_table.node_type] = node_count + + columns_to_check: list[str] = [node_table.id_column] + columns_to_check.extend(node_table.feature_columns) + if node_table.label_column: + columns_to_check.append(node_table.label_column) + + null_query = build_null_rates_query( + table=node_table.bq_table, columns=columns_to_check + ) + rows = list(self._bq_utils.run_query(query=null_query, labels={})) + if rows: + row = rows[0] + rates: dict[str, float] = {} + for col in columns_to_check: + key = f"{col}_null_rate" + rate = row[key] + rates[col] = float(rate) if rate is not None else 0.0 + result.null_rates[node_table.node_type] = rates + + def _tier2_edge_metrics( + self, + edge_table: EdgeTableSpec, + node_table: Optional[NodeTableSpec], + result: GraphAnalysisResult, + ) -> None: + edge_type = edge_table.edge_type + + # Scalar counts. + result.edge_counts[edge_type] = self._query_scalar( + EDGE_COUNT_QUERY.format(table=edge_table.bq_table), "edge_count" + ) + result.duplicate_edge_counts[edge_type] = self._query_scalar( + DUPLICATE_EDGE_COUNT_QUERY.format( + table=edge_table.bq_table, + src_id_column=edge_table.src_id_column, + dst_id_column=edge_table.dst_id_column, + ), + "duplicate_count", + ) + result.self_loop_counts[edge_type] = self._query_scalar( + SELF_LOOP_COUNT_QUERY.format( + table=edge_table.bq_table, + src_id_column=edge_table.src_id_column, + dst_id_column=edge_table.dst_id_column, + ), + "self_loop_count", + ) + + # Super-hub INT16 clamp check (indexed by src). + result.super_hub_int16_clamp_count[edge_type] = self._query_scalar( + SUPER_HUB_INT16_CLAMP_QUERY.format( + table=edge_table.bq_table, id_column=edge_table.src_id_column + ), + "super_hub_count", + ) + + # Isolated and cold-start require a node table join. + if node_table is not None: + result.isolated_node_counts[edge_type] = self._query_scalar( + ISOLATED_NODE_COUNT_QUERY.format( + node_table=node_table.bq_table, + edge_table=edge_table.bq_table, + node_id_column=node_table.id_column, + src_id_column=edge_table.src_id_column, + dst_id_column=edge_table.dst_id_column, + ), + "isolated_count", + ) + result.cold_start_node_counts[edge_type] = self._query_scalar( + COLD_START_NODE_COUNT_QUERY.format( + node_table=node_table.bq_table, + edge_table=edge_table.bq_table, + node_id_column=node_table.id_column, + src_id_column=edge_table.src_id_column, + dst_id_column=edge_table.dst_id_column, + ), + "cold_start_count", + ) + + # Top-K hubs (by src). + top_hub_rows = list( + self._bq_utils.run_query( + query=TOP_K_HUBS_QUERY.format( + table=edge_table.bq_table, + id_column=edge_table.src_id_column, + k=_TOP_K_HUBS, + ), + labels={}, + ) + ) + result.top_hubs[edge_type] = [ + (str(row["node_id"]), int(row["degree"])) for row in top_hub_rows + ] + + # Degree statistics: distribution + buckets, in + out directions. + for direction, id_column in ( + ("out", edge_table.src_id_column), + ("in", edge_table.dst_id_column), + ): + result.degree_stats[f"{edge_type}_{direction}"] = self._build_degree_stats( + table=edge_table.bq_table, id_column=id_column + ) + + def _build_degree_stats(self, table: str, id_column: str) -> DegreeStats: + """Run degree distribution + bucket queries and pack into DegreeStats.""" + dist_rows = list( + self._bq_utils.run_query( + query=DEGREE_DISTRIBUTION_QUERY.format( + table=table, id_column=id_column + ), + labels={}, + ) + ) + bucket_rows = list( + self._bq_utils.run_query( + query=DEGREE_BUCKET_QUERY.format(table=table, id_column=id_column), + labels={}, + ) + ) + dist_row = dist_rows[0] + bucket_row = bucket_rows[0] + + percentiles_raw = list(dist_row["percentiles"]) + percentiles = [int(p) if p is not None else 0 for p in percentiles_raw] + # APPROX_QUANTILES(degree, 100) returns 101 values: index 0..100. + median = percentiles[50] if len(percentiles) > 50 else 0 + p90 = percentiles[90] if len(percentiles) > 90 else percentiles[-1] + p99 = percentiles[99] if len(percentiles) > 99 else percentiles[-1] + # We only have 100-bucket quantiles, so p999 ~= p99 as best-effort. + p999 = p99 + + # Bucket keys must match BUCKET_ORDER in report/charts.ai.js for the + # histogram to render correctly; keep uppercase K. + buckets: dict[str, int] = { + "0-1": int(bucket_row["bucket_0_1"]), + "2-10": int(bucket_row["bucket_2_10"]), + "11-100": int(bucket_row["bucket_11_100"]), + "101-1K": int(bucket_row["bucket_101_1k"]), + "1K-10K": int(bucket_row["bucket_1k_10k"]), + "10K+": int(bucket_row["bucket_10k_plus"]), + } + + return DegreeStats( + min=int(dist_row["min_degree"] or 0), + max=int(dist_row["max_degree"] or 0), + mean=float(dist_row["avg_degree"] or 0.0), + median=median, + p90=p90, + p99=p99, + p999=p999, + percentiles=percentiles, + buckets=buckets, + ) + + # ------------------------------------------------------------------ # + # Tier 3: label and heterogeneous # + # ------------------------------------------------------------------ # + + def _run_tier3( + self, config: DataAnalyzerConfig, result: GraphAnalysisResult + ) -> None: + # Label-related checks per node table with a label column. + for node_table in config.node_tables: + if not node_table.label_column: + continue + class_rows = list( + self._bq_utils.run_query( + query=CLASS_IMBALANCE_QUERY.format( + table=node_table.bq_table, + label_column=node_table.label_column, + ), + labels={}, + ) + ) + result.class_imbalance[node_table.node_type] = { + str(row["label"]): int(row["count"]) for row in class_rows + } + + coverage_rows = list( + self._bq_utils.run_query( + query=LABEL_COVERAGE_QUERY.format( + table=node_table.bq_table, + label_column=node_table.label_column, + ), + labels={}, + ) + ) + if coverage_rows: + coverage = coverage_rows[0]["coverage"] + result.label_coverage[node_table.node_type] = ( + float(coverage) if coverage is not None else 0.0 + ) + + # Heterogeneous distribution only if more than one edge type. + if len(config.edge_tables) > 1: + for edge_table in config.edge_tables: + edge_type = edge_table.edge_type + # Edge-type distribution is effectively the edge count; reuse. + if edge_type in result.edge_counts: + result.edge_type_distribution[edge_type] = result.edge_counts[ + edge_type + ] + else: + result.edge_type_distribution[edge_type] = self._query_scalar( + EDGE_TYPE_DISTRIBUTION_QUERY.format(table=edge_table.bq_table), + "edge_count", + ) + coverage_rows = list( + self._bq_utils.run_query( + query=EDGE_TYPE_NODE_COVERAGE_QUERY.format( + table=edge_table.bq_table, + src_id_column=edge_table.src_id_column, + dst_id_column=edge_table.dst_id_column, + ), + labels={}, + ) + ) + if coverage_rows: + row = coverage_rows[0] + result.edge_type_node_coverage[edge_type] = { + "distinct_src_count": int(row["distinct_src_count"] or 0), + "distinct_dst_count": int(row["distinct_dst_count"] or 0), + } + + # ------------------------------------------------------------------ # + # Tier 4: opt-in # + # ------------------------------------------------------------------ # + + def _run_tier4( + self, config: DataAnalyzerConfig, result: GraphAnalysisResult + ) -> None: + """Populate opt-in metrics gated by config flags. + + Power-law exponent is always cheap (derived from existing degree stats) + and is computed whenever degree stats are available. Reciprocity, + homophily, connected components and clustering require dedicated + queries not yet defined; they remain empty unless the corresponding + flag is enabled AND a query is implemented. + """ + # Power-law exponent: approximate from degree stats using a simple + # heuristic: alpha ~= 1 + log(max) / log(median) for median > 1. + for degree_key, stats in result.degree_stats.items(): + if stats.median > 1 and stats.max > stats.median: + exponent = 1.0 + math.log(stats.max) / math.log(stats.median) + result.power_law_exponent[degree_key] = exponent + + if config.compute_reciprocity: + # Query not yet defined; log and skip. + logger.warning( + "compute_reciprocity=True but reciprocity query is not implemented; " + "skipping Tier 4 reciprocity." + ) + + # ------------------------------------------------------------------ # + # Python-only computations # + # ------------------------------------------------------------------ # + + def _compute_feature_memory_budget( + self, config: DataAnalyzerConfig, result: GraphAnalysisResult + ) -> None: + """Estimate per-node-type memory footprint of features (float64 assumed).""" + for node_table in config.node_tables: + node_count = result.node_counts.get(node_table.node_type, 0) + num_features = len(node_table.feature_columns) + result.feature_memory_bytes[node_table.node_type] = ( + node_count * num_features * _BYTES_PER_FEATURE + ) + + def _compute_neighbor_explosion_estimate( + self, config: DataAnalyzerConfig, result: GraphAnalysisResult + ) -> None: + """Multiply fan-out factors and scale by out-degree mean per edge type.""" + if not config.fan_out: + return + fan_out_product = 1 + for hop in config.fan_out: + fan_out_product *= int(hop) + for edge_table in config.edge_tables: + out_stats = result.degree_stats.get(f"{edge_table.edge_type}_out") + if out_stats is None: + continue + estimate = int(fan_out_product * max(out_stats.mean, 1.0)) + result.neighbor_explosion_estimate[edge_table.edge_type] = estimate + + # ------------------------------------------------------------------ # + # Helpers # + # ------------------------------------------------------------------ # + + def _query_scalar(self, query: str, column: str) -> int: + """Run a single-row, single-column query and return the scalar as int. + + Scalar queries (COUNT, COUNTIF) must return exactly one row with a + non-NULL value for the requested column. Any deviation indicates a + driver, auth, or schema mismatch rather than legitimate data — raise + loudly instead of silently coercing to 0, which would let a broken run + pass through as a green-light result. + """ + rows = list(self._bq_utils.run_query(query=query, labels={})) + if len(rows) != 1: + raise RuntimeError( + f"Scalar query expected exactly 1 row; got {len(rows)}. " + f"Query: {query.strip()[:200]}" + ) + value = rows[0][column] + if value is None: + raise RuntimeError( + f"Scalar query returned NULL for column '{column}'. " + f"Query: {query.strip()[:200]}" + ) + return int(value) diff --git a/gigl/analytics/data_analyzer/queries.py b/gigl/analytics/data_analyzer/queries.py new file mode 100644 index 000000000..3243d2727 --- /dev/null +++ b/gigl/analytics/data_analyzer/queries.py @@ -0,0 +1,189 @@ +"""SQL query templates for graph structure analysis. + +Each constant is a format-string template parameterized with table names +and column names. Pattern matches gigl/src/data_preprocessor/lib/enumerate/queries.py. +""" + +import torch + +INT16_MAX = int(torch.iinfo(torch.int16).max) # 32767 + +# --- Tier 1: Hard fails --- + +DANGLING_EDGES_QUERY = """ +SELECT COUNT(*) AS dangling_count +FROM `{table}` +WHERE {src_id_column} IS NULL OR {dst_id_column} IS NULL +""" + +EDGE_REFERENTIAL_INTEGRITY_QUERY = """ +SELECT + COUNTIF(src_node.{src_node_id_column} IS NULL) AS missing_src_count, + COUNTIF(dst_node.{dst_node_id_column} IS NULL) AS missing_dst_count +FROM `{edge_table}` AS e +LEFT JOIN `{src_node_table}` AS src_node + ON e.{src_id_column} = src_node.{src_node_id_column} +LEFT JOIN `{dst_node_table}` AS dst_node + ON e.{dst_id_column} = dst_node.{dst_node_id_column} +""" + +DUPLICATE_NODE_COUNT_QUERY = """ +SELECT COUNT(*) AS duplicate_count FROM ( + SELECT {id_column} + FROM `{table}` + GROUP BY {id_column} + HAVING COUNT(*) > 1 +) +""" + +# --- Tier 2: Core metrics --- + +NODE_COUNT_QUERY = """ +SELECT COUNT(*) AS node_count FROM `{table}` +""" + +EDGE_COUNT_QUERY = """ +SELECT COUNT(*) AS edge_count FROM `{table}` +""" + +DUPLICATE_EDGE_COUNT_QUERY = """ +SELECT COUNT(*) AS duplicate_count FROM ( + SELECT {src_id_column}, {dst_id_column} + FROM `{table}` + GROUP BY {src_id_column}, {dst_id_column} + HAVING COUNT(*) > 1 +) +""" + +SELF_LOOP_COUNT_QUERY = """ +SELECT COUNT(*) AS self_loop_count +FROM `{table}` +WHERE {src_id_column} = {dst_id_column} +""" + +ISOLATED_NODE_COUNT_QUERY = """ +SELECT COUNT(*) AS isolated_count FROM ( + SELECT n.{node_id_column} + FROM `{node_table}` AS n + LEFT JOIN `{edge_table}` AS e_src + ON n.{node_id_column} = e_src.{src_id_column} + LEFT JOIN `{edge_table}` AS e_dst + ON n.{node_id_column} = e_dst.{dst_id_column} + WHERE e_src.{src_id_column} IS NULL + AND e_dst.{dst_id_column} IS NULL +) +""" + +DEGREE_DISTRIBUTION_QUERY = """ +SELECT + MIN(degree) AS min_degree, + MAX(degree) AS max_degree, + AVG(degree) AS avg_degree, + APPROX_QUANTILES(degree, 100) AS percentiles +FROM ( + SELECT {id_column}, COUNT(*) AS degree + FROM `{table}` + GROUP BY {id_column} +) +""" + +DEGREE_BUCKET_QUERY = """ +SELECT + COUNTIF(degree BETWEEN 0 AND 1) AS bucket_0_1, + COUNTIF(degree BETWEEN 2 AND 10) AS bucket_2_10, + COUNTIF(degree BETWEEN 11 AND 100) AS bucket_11_100, + COUNTIF(degree BETWEEN 101 AND 1000) AS bucket_101_1k, + COUNTIF(degree BETWEEN 1001 AND 10000) AS bucket_1k_10k, + COUNTIF(degree > 10000) AS bucket_10k_plus +FROM ( + SELECT {id_column}, COUNT(*) AS degree + FROM `{table}` + GROUP BY {id_column} +) +""" + +TOP_K_HUBS_QUERY = """ +SELECT {id_column} AS node_id, COUNT(*) AS degree +FROM `{table}` +GROUP BY {id_column} +ORDER BY degree DESC +LIMIT {k} +""" + +SUPER_HUB_INT16_CLAMP_QUERY = f""" +SELECT COUNT(*) AS super_hub_count FROM ( + SELECT {{id_column}}, COUNT(*) AS degree + FROM `{{table}}` + GROUP BY {{id_column}} + HAVING COUNT(*) > {INT16_MAX} +) +""" + +COLD_START_NODE_COUNT_QUERY = """ +SELECT COUNT(*) AS cold_start_count FROM ( + SELECT n.{node_id_column}, COALESCE(e.degree, 0) AS degree + FROM `{node_table}` AS n + LEFT JOIN ( + SELECT nid, COUNT(*) AS degree FROM ( + SELECT {src_id_column} AS nid FROM `{edge_table}` + UNION ALL + SELECT {dst_id_column} AS nid FROM `{edge_table}` + ) + GROUP BY nid + ) AS e ON n.{node_id_column} = e.nid + WHERE COALESCE(e.degree, 0) <= 1 +) +""" + +# --- Tier 3: Label and heterogeneous --- + +CLASS_IMBALANCE_QUERY = """ +SELECT {label_column} AS label, COUNT(*) AS count +FROM `{table}` +WHERE {label_column} IS NOT NULL +GROUP BY {label_column} +ORDER BY count DESC +""" + +LABEL_COVERAGE_QUERY = """ +SELECT + COUNT(*) AS total, + COUNTIF({label_column} IS NOT NULL) AS labeled, + SAFE_DIVIDE(COUNTIF({label_column} IS NOT NULL), COUNT(*)) AS coverage +FROM `{table}` +""" + +EDGE_TYPE_DISTRIBUTION_QUERY = """ +SELECT COUNT(*) AS edge_count FROM `{table}` +""" + +EDGE_TYPE_NODE_COVERAGE_QUERY = """ +SELECT + APPROX_COUNT_DISTINCT({src_id_column}) AS distinct_src_count, + APPROX_COUNT_DISTINCT({dst_id_column}) AS distinct_dst_count +FROM `{table}` +""" + + +def build_null_rates_query(table: str, columns: list[str]) -> str: + """Build a batched NULL rates query for multiple columns. + + One query, one table scan, one COUNTIF per column. + + Args: + table: Fully qualified BQ table name. + columns: List of column names to check. + + Returns: + SQL query string. + """ + countif_clauses = ",\n ".join( + f"SAFE_DIVIDE(COUNTIF({col} IS NULL), COUNT(*)) AS {col}_null_rate" + for col in columns + ) + return f""" +SELECT + COUNT(*) AS total_rows, + {countif_clauses} +FROM `{table}` +""" diff --git a/gigl/analytics/data_analyzer/report/PRD.md b/gigl/analytics/data_analyzer/report/PRD.md new file mode 100644 index 000000000..43f5fc1e9 --- /dev/null +++ b/gigl/analytics/data_analyzer/report/PRD.md @@ -0,0 +1,170 @@ +# PRD: BQ Data Analyzer HTML Report + +## Status + +**AI-owned.** An AI agent reads this PRD together with the sibling `SPEC.md` and regenerates `report.ai.html`, +`charts.ai.js`, and `styles.ai.css` when the product intent or technical contract changes. This PRD describes *why* and +*what*; `SPEC.md` describes *how*. + +## Problem + +Before training a GNN on graph data in BigQuery, engineers need a fast way to see whether the data is healthy enough to +train on. Today they find out only after a Dataflow job crashes or a trainer produces a poor model, which costs days and +thousands of dollars per iteration. + +A review of 18 production GNN papers ([reference doc](../../../docs/plans/20260415-bq-data-analyzer-references.md)) +found that graph-specific data properties drive 30-230% model quality differences. None of these are caught by standard +tabular data quality tools. We need a report that surfaces these graph-specific issues in a form engineers can act on in +minutes, not days. + +## Users + +| Persona | Primary need | Frequency | +| ---------------------------------------- | ------------------------------------------------------------------------- | -------------------------- | +| **GNN engineer running an applied task** | Decide whether a new BQ dataset is trainable, and if not, what to fix | Per new dataset or refresh | +| **Applied task reviewer / tech lead** | Sanity-check a teammate's dataset choices before approving a training run | Per PR | +| **On-call engineer** | Triage why a training run degraded vs last week | Per incident | + +Out of scope: data scientists doing generic exploratory data analysis, product managers, non-technical stakeholders. + +## User Stories + +1. **As a GNN engineer**, I point the analyzer at a new BQ node/edge table pair and open the resulting HTML report. + Within 30 seconds of scrolling I know whether the dataset has any training-blocking issues (dangling edges, + referential integrity, duplicates). +2. **As a GNN engineer**, I inspect the degree distribution histogram for each edge type and decide whether my planned + fan-out is realistic or will cause neighbor explosion. +3. **As a reviewer**, I share the GCS link to the report in a PR comment. My teammate opens it in a browser without + installing anything. +4. **As an on-call engineer**, I run the analyzer on today's data and last week's data and diff the two reports to see + what changed. +5. **As any of the above**, I expand the collapsed sections I do not care about so the overview stays scannable. + +## Goals + +1. **Zero-setup viewing.** The report opens in any modern browser with no server, no CDN, no authentication beyond the + GCS link. Works offline once downloaded. +2. **Action-oriented.** Every numeric finding is color-coded against a literature-derived threshold (green/yellow/red) + so the reader knows what to do about it. +3. **Traceable.** Every color-coded threshold and every check cites the paper or codebase location that justifies it, so + readers can verify claims. +4. **Portable.** A single `.html` file that can be shared in chat, stored indefinitely in GCS, and archived alongside + the training run it describes. +5. **Graph-native.** Surfaces metrics that matter for GNNs specifically (degree distribution, super-hub int16 clamp, + cold-start fraction, homophily, neighbor explosion), not just generic tabular stats. +6. **AI-regenerable.** The three `.ai.*` assets can be regenerated deterministically from this PRD plus `SPEC.md` + without human intervention on the HTML/JS/CSS. + +## Non-Goals + +- **Not a real-time monitoring dashboard.** Aegis covers that + ([Phase 2](../../../docs/plans/20260415-bq-data-analyzer.md#aegis-integration-phase-2)). This report is a + point-in-time snapshot. +- **Not a BI tool.** No filtering, drill-down, or ad-hoc querying. The report is a rendered artifact, not an interactive + app. +- **Not cross-dataset comparison.** Diffing reports is a user workflow (open two tabs), not a report feature. +- **Not a model evaluation report.** This is about training data, not trained model performance. +- **Not accessible (WCAG AA) in v1.** We document this gap and will address it if the report is used by users who need + it. + +## Functional Requirements + +Each requirement maps to a section of `SPEC.md` where the implementation contract lives. + +**FR-1: Overview at a glance.** The first screen (above the fold) shows total nodes, total edges, node/edge type counts, +and a single green/yellow/red status light summarizing the worst issue found. Rationale: engineers decide "do I need to +look deeper" in the first 5 seconds. + +**FR-2: Hard-fail visibility.** Dangling edges, referential integrity violations, and duplicate nodes render red +regardless of magnitude. These block training entirely. The report shows them prominently even if count is exactly one. +Rationale: [GiGL](../../../docs/plans/20260415-bq-data-analyzer-references.md#6-gigl), +[AliGraph (7.1)](../../../docs/plans/20260415-bq-data-analyzer-references.md#7-aligraph) — silent NaN propagation from +referential integrity violations is a production-documented failure mode. + +**FR-3: Degree distribution per edge type.** Inline SVG histogram using the six literature-aligned buckets: `0-1`, +`2-10`, `11-100`, `101-1K`, `1K-10K`, `10K+`. Separate in-degree and out-degree. Rationale: +[BLADE](../../../docs/plans/20260415-bq-data-analyzer-references.md#3-blade) showed 230% embedding improvement from +degree-adaptive neighborhoods; the reader needs to see which buckets dominate. + +**FR-4: Super-hub warning.** A red call-out appears when any node exceeds the GiGL int16 degree clamp (32,767). Include +the count and the affected edge type. Rationale: +[GiGL (6.2)](../../../docs/plans/20260415-bq-data-analyzer-references.md#6-gigl) — the clamp is silent in production and +corrupts PPR sampling probabilities. Users have no other way to discover this. + +**FR-5: Cold-start visibility.** Show the count and fraction of degree-0-1 nodes per type. Color-code the fraction +against the 5% / 10% threshold. Rationale: +[LiGNN (4.1)](../../../docs/plans/20260415-bq-data-analyzer-references.md#4-lignn) — +0.28% AUC from cold-start +densification; the reader decides whether densification is worth investigating. + +**FR-6: Optional Tier 3 visibility.** Class imbalance, label coverage, edge type distribution, and per-edge-type node +coverage are shown only when the input data supports them. Rationale: a report full of "not applicable" sections is +noise. + +**FR-7: Embedded FACETS.** When feature profiling is available, the FACETS HTML output is embedded inline via +`