From c079b9fb0ef49d3a05e923a209e70eb8ba25c68e Mon Sep 17 00:00:00 2001 From: svij Date: Fri, 17 Apr 2026 20:25:15 +0000 Subject: [PATCH 01/20] feat(analytics): scaffold data_analyzer package structure Co-Authored-By: shubhamvij --- gigl/analytics/data_analyzer/__init__.py | 6 ++++++ gigl/analytics/data_analyzer/report/__init__.py | 6 ++++++ tests/test_assets/analytics/__init__.py | 0 tests/unit/analytics/__init__.py | 0 tests/unit/analytics/data_analyzer/__init__.py | 0 tests/unit/analytics/data_analyzer/report/__init__.py | 0 6 files changed, 12 insertions(+) create mode 100644 gigl/analytics/data_analyzer/__init__.py create mode 100644 gigl/analytics/data_analyzer/report/__init__.py create mode 100644 tests/test_assets/analytics/__init__.py create mode 100644 tests/unit/analytics/__init__.py create mode 100644 tests/unit/analytics/data_analyzer/__init__.py create mode 100644 tests/unit/analytics/data_analyzer/report/__init__.py diff --git a/gigl/analytics/data_analyzer/__init__.py b/gigl/analytics/data_analyzer/__init__.py new file mode 100644 index 000000000..0b681ff11 --- /dev/null +++ b/gigl/analytics/data_analyzer/__init__.py @@ -0,0 +1,6 @@ +""" +BQ Data Analyzer for pre-training graph data analysis. + +Produces a single HTML report covering data quality, feature distributions, +and graph structure metrics from BigQuery node/edge tables. +""" diff --git a/gigl/analytics/data_analyzer/report/__init__.py b/gigl/analytics/data_analyzer/report/__init__.py new file mode 100644 index 000000000..8cde20291 --- /dev/null +++ b/gigl/analytics/data_analyzer/report/__init__.py @@ -0,0 +1,6 @@ +""" +HTML report generation for the BQ Data Analyzer. + +AI-owned assets (*.ai.html, *.ai.js, *.ai.css) are defined by SPEC.md +in this directory and can be regenerated from that spec. +""" diff --git a/tests/test_assets/analytics/__init__.py b/tests/test_assets/analytics/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/analytics/__init__.py b/tests/unit/analytics/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/analytics/data_analyzer/__init__.py b/tests/unit/analytics/data_analyzer/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/analytics/data_analyzer/report/__init__.py b/tests/unit/analytics/data_analyzer/report/__init__.py new file mode 100644 index 000000000..e69de29bb From 398849371fe2351dd29e6586041a8b1a0b0d40cc Mon Sep 17 00:00:00 2001 From: svij Date: Fri, 17 Apr 2026 20:44:24 +0000 Subject: [PATCH 02/20] feat(analytics): add DataAnalyzerConfig with YAML loading and tests Co-Authored-By: shubhamvij --- gigl/analytics/data_analyzer/config.py | 75 +++++++++++++++++++ .../analytics/sample_analyzer_config.yaml | 16 ++++ .../analytics/data_analyzer/config_test.py | 64 ++++++++++++++++ 3 files changed, 155 insertions(+) create mode 100644 gigl/analytics/data_analyzer/config.py create mode 100644 tests/test_assets/analytics/sample_analyzer_config.yaml create mode 100644 tests/unit/analytics/data_analyzer/config_test.py diff --git a/gigl/analytics/data_analyzer/config.py b/gigl/analytics/data_analyzer/config.py new file mode 100644 index 000000000..0ea9c721e --- /dev/null +++ b/gigl/analytics/data_analyzer/config.py @@ -0,0 +1,75 @@ +from dataclasses import dataclass, field +from typing import Optional + +from omegaconf import MISSING, OmegaConf + +from gigl.common.logger import Logger + +logger = Logger() + + +@dataclass +class NodeTableSpec: + """Specification for a node table in BigQuery.""" + + bq_table: str = MISSING + node_type: str = MISSING + id_column: str = MISSING + feature_columns: list[str] = MISSING + label_column: Optional[str] = None + + +@dataclass +class EdgeTableSpec: + """Specification for an edge table in BigQuery.""" + + bq_table: str = MISSING + edge_type: str = MISSING + src_id_column: str = MISSING + dst_id_column: str = MISSING + feature_columns: list[str] = field(default_factory=list) + timestamp_column: Optional[str] = None + + +@dataclass +class DataAnalyzerConfig: + """Configuration for the BQ Data Analyzer. + + Parsed from YAML via OmegaConf. + + Example: + >>> config = load_analyzer_config("gs://bucket/config.yaml") + >>> config.node_tables[0].bq_table + 'project.dataset.user_nodes' + """ + + node_tables: list[NodeTableSpec] = MISSING + edge_tables: list[EdgeTableSpec] = MISSING + output_gcs_path: str = MISSING + fan_out: Optional[list[int]] = None + compute_reciprocity: bool = False + compute_homophily: bool = False + compute_connected_components: bool = False + compute_clustering: bool = False + + +def load_analyzer_config(config_path: str) -> DataAnalyzerConfig: + """Load and validate a DataAnalyzerConfig from a YAML file. + + Args: + config_path: Local file path or GCS URI to the YAML config. + + Returns: + Validated DataAnalyzerConfig instance. + + Raises: + omegaconf.errors.MissingMandatoryValue: If required fields are missing. + """ + raw = OmegaConf.load(config_path) + merged = OmegaConf.merge(OmegaConf.structured(DataAnalyzerConfig), raw) + config: DataAnalyzerConfig = OmegaConf.to_object(merged) # type: ignore + logger.info( + f"Loaded analyzer config with {len(config.node_tables)} node tables " + f"and {len(config.edge_tables)} edge tables" + ) + return config diff --git a/tests/test_assets/analytics/sample_analyzer_config.yaml b/tests/test_assets/analytics/sample_analyzer_config.yaml new file mode 100644 index 000000000..acd3fb7b9 --- /dev/null +++ b/tests/test_assets/analytics/sample_analyzer_config.yaml @@ -0,0 +1,16 @@ +node_tables: + - bq_table: "test_project.test_dataset.user_nodes" + node_type: "user" + id_column: "user_id" + feature_columns: ["age", "country"] + label_column: "label" + +edge_tables: + - bq_table: "test_project.test_dataset.user_edges" + edge_type: "follows" + src_id_column: "src_user_id" + dst_id_column: "dst_user_id" + feature_columns: ["weight"] + +output_gcs_path: "gs://test-bucket/analysis_output/" +fan_out: [15, 10, 5] diff --git a/tests/unit/analytics/data_analyzer/config_test.py b/tests/unit/analytics/data_analyzer/config_test.py new file mode 100644 index 000000000..866cdd64a --- /dev/null +++ b/tests/unit/analytics/data_analyzer/config_test.py @@ -0,0 +1,64 @@ +from pathlib import Path + +from omegaconf import OmegaConf + +from gigl.analytics.data_analyzer.config import ( + DataAnalyzerConfig, + EdgeTableSpec, + NodeTableSpec, + load_analyzer_config, +) +from tests.test_assets.test_case import TestCase + +SAMPLE_CONFIG_PATH = ( + Path(__file__).parents[3] / "test_assets" / "analytics" / "sample_analyzer_config.yaml" +) + + +class DataAnalyzerConfigTest(TestCase): + def test_load_valid_config(self) -> None: + config = load_analyzer_config(str(SAMPLE_CONFIG_PATH)) + self.assertIsInstance(config, DataAnalyzerConfig) + self.assertEqual(len(config.node_tables), 1) + self.assertEqual(len(config.edge_tables), 1) + self.assertEqual(config.node_tables[0].node_type, "user") + self.assertEqual(config.node_tables[0].label_column, "label") + self.assertEqual(config.edge_tables[0].edge_type, "follows") + self.assertEqual(config.output_gcs_path, "gs://test-bucket/analysis_output/") + self.assertEqual(config.fan_out, [15, 10, 5]) + + def test_optional_fields_default_to_none_or_false(self) -> None: + yaml_str = """ + node_tables: + - bq_table: "p.d.t" + node_type: "user" + id_column: "uid" + feature_columns: ["f1"] + edge_tables: + - bq_table: "p.d.e" + edge_type: "follows" + src_id_column: "src" + dst_id_column: "dst" + output_gcs_path: "gs://bucket/out/" + """ + raw = OmegaConf.create(yaml_str) + merged = OmegaConf.merge(OmegaConf.structured(DataAnalyzerConfig), raw) + config = OmegaConf.to_object(merged) + self.assertIsNone(config.node_tables[0].label_column) + self.assertIsNone(config.edge_tables[0].timestamp_column) + self.assertIsNone(config.fan_out) + self.assertFalse(config.compute_reciprocity) + self.assertFalse(config.compute_homophily) + + def test_missing_required_field_raises(self) -> None: + yaml_str = """ + node_tables: + - bq_table: "p.d.t" + node_type: "user" + edge_tables: [] + output_gcs_path: "gs://bucket/out/" + """ + raw = OmegaConf.create(yaml_str) + with self.assertRaises(Exception): + merged = OmegaConf.merge(OmegaConf.structured(DataAnalyzerConfig), raw) + OmegaConf.to_object(merged) From cf69b383b34cd2b866c15abe8396cc15b45ec83a Mon Sep 17 00:00:00 2001 From: svij Date: Fri, 17 Apr 2026 21:20:25 +0000 Subject: [PATCH 03/20] fix(analytics): remove unused imports in config_test.py Co-Authored-By: shubhamvij --- tests/unit/analytics/data_analyzer/config_test.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/unit/analytics/data_analyzer/config_test.py b/tests/unit/analytics/data_analyzer/config_test.py index 866cdd64a..a095a11c8 100644 --- a/tests/unit/analytics/data_analyzer/config_test.py +++ b/tests/unit/analytics/data_analyzer/config_test.py @@ -4,8 +4,6 @@ from gigl.analytics.data_analyzer.config import ( DataAnalyzerConfig, - EdgeTableSpec, - NodeTableSpec, load_analyzer_config, ) from tests.test_assets.test_case import TestCase From 8abae4a8bec6bece8ba2ea76a3506c7a641664a2 Mon Sep 17 00:00:00 2001 From: svij Date: Fri, 17 Apr 2026 21:21:08 +0000 Subject: [PATCH 04/20] feat(analytics): add result type dataclasses (DegreeStats, GraphAnalysisResult, FeatureProfileResult) Co-Authored-By: shubhamvij --- gigl/analytics/data_analyzer/types.py | 70 +++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 gigl/analytics/data_analyzer/types.py diff --git a/gigl/analytics/data_analyzer/types.py b/gigl/analytics/data_analyzer/types.py new file mode 100644 index 000000000..01d5b43eb --- /dev/null +++ b/gigl/analytics/data_analyzer/types.py @@ -0,0 +1,70 @@ +from dataclasses import dataclass, field + + +@dataclass +class DegreeStats: + """Degree distribution statistics for one edge type and direction. + + Computed from APPROX_QUANTILES(degree, 100) in BigQuery. + """ + + min: int + max: int + mean: float + median: int + p90: int + p99: int + p999: int + percentiles: list[int] + buckets: dict[str, int] # "0-1": count, "2-10": count, etc. + + +@dataclass +class GraphAnalysisResult: + """Complete result of graph structure analysis across all tiers. + + Tier 1 fields are always populated. Tier 3/4 fields may be empty + dicts if the corresponding checks were not applicable or not enabled. + """ + + # Tier 1: hard fails + duplicate_node_counts: dict[str, int] = field(default_factory=dict) + dangling_edge_counts: dict[str, int] = field(default_factory=dict) + referential_integrity_violations: dict[str, int] = field(default_factory=dict) + + # Tier 2: core metrics + node_counts: dict[str, int] = field(default_factory=dict) + edge_counts: dict[str, int] = field(default_factory=dict) + null_rates: dict[str, dict[str, float]] = field(default_factory=dict) + duplicate_edge_counts: dict[str, int] = field(default_factory=dict) + self_loop_counts: dict[str, int] = field(default_factory=dict) + isolated_node_counts: dict[str, int] = field(default_factory=dict) + degree_stats: dict[str, DegreeStats] = field(default_factory=dict) + top_hubs: dict[str, list[tuple[str, int]]] = field(default_factory=dict) + super_hub_int16_clamp_count: dict[str, int] = field(default_factory=dict) + cold_start_node_counts: dict[str, int] = field(default_factory=dict) + feature_memory_bytes: dict[str, int] = field(default_factory=dict) + neighbor_explosion_estimate: dict[str, int] = field(default_factory=dict) + + # Tier 3: label and heterogeneous + class_imbalance: dict[str, dict[str, int]] = field(default_factory=dict) + label_coverage: dict[str, float] = field(default_factory=dict) + edge_type_distribution: dict[str, int] = field(default_factory=dict) + edge_type_node_coverage: dict[str, dict[str, int]] = field(default_factory=dict) + + # Tier 4: opt-in + reciprocity: dict[str, float] = field(default_factory=dict) + power_law_exponent: dict[str, float] = field(default_factory=dict) + + +@dataclass +class FeatureProfileResult: + """Result of TFDV feature profiling across all tables. + + Contains GCS paths to generated artifacts. + """ + + facets_html_paths: dict[str, str] = field(default_factory=dict) + stats_paths: dict[str, str] = field(default_factory=dict) + schema_paths: dict[str, str] = field(default_factory=dict) + anomalies: dict[str, list[str]] = field(default_factory=dict) From f1c7f52ea1a2d600abf369c5033c830c999b44b7 Mon Sep 17 00:00:00 2001 From: svij Date: Fri, 17 Apr 2026 22:03:12 +0000 Subject: [PATCH 05/20] feat(analytics): add 18 SQL query templates for graph structure analysis Co-Authored-By: shubhamvij --- gigl/analytics/data_analyzer/queries.py | 186 ++++++++++++++++++ .../analytics/data_analyzer/queries_test.py | 103 ++++++++++ 2 files changed, 289 insertions(+) create mode 100644 gigl/analytics/data_analyzer/queries.py create mode 100644 tests/unit/analytics/data_analyzer/queries_test.py diff --git a/gigl/analytics/data_analyzer/queries.py b/gigl/analytics/data_analyzer/queries.py new file mode 100644 index 000000000..19b476a21 --- /dev/null +++ b/gigl/analytics/data_analyzer/queries.py @@ -0,0 +1,186 @@ +"""SQL query templates for graph structure analysis. + +Each constant is a format-string template parameterized with table names +and column names. Pattern matches gigl/src/data_preprocessor/lib/enumerate/queries.py. +""" + +import torch + +INT16_MAX = int(torch.iinfo(torch.int16).max) # 32767 + +# --- Tier 1: Hard fails --- + +DANGLING_EDGES_QUERY = """ +SELECT COUNT(*) AS dangling_count +FROM `{table}` +WHERE {src_id_column} IS NULL OR {dst_id_column} IS NULL +""" + +EDGE_REFERENTIAL_INTEGRITY_QUERY = """ +SELECT + COUNTIF(src_node.{node_id_column} IS NULL) AS missing_src_count, + COUNTIF(dst_node.{node_id_column} IS NULL) AS missing_dst_count +FROM `{edge_table}` AS e +LEFT JOIN `{node_table}` AS src_node + ON e.{src_id_column} = src_node.{node_id_column} +LEFT JOIN `{node_table}` AS dst_node + ON e.{dst_id_column} = dst_node.{node_id_column} +""" + +DUPLICATE_NODE_COUNT_QUERY = """ +SELECT COUNT(*) AS duplicate_count FROM ( + SELECT {id_column} + FROM `{table}` + GROUP BY {id_column} + HAVING COUNT(*) > 1 +) +""" + +# --- Tier 2: Core metrics --- + +NODE_COUNT_QUERY = """ +SELECT COUNT(*) AS node_count FROM `{table}` +""" + +EDGE_COUNT_QUERY = """ +SELECT COUNT(*) AS edge_count FROM `{table}` +""" + +DUPLICATE_EDGE_COUNT_QUERY = """ +SELECT COUNT(*) AS duplicate_count FROM ( + SELECT {src_id_column}, {dst_id_column} + FROM `{table}` + GROUP BY {src_id_column}, {dst_id_column} + HAVING COUNT(*) > 1 +) +""" + +SELF_LOOP_COUNT_QUERY = """ +SELECT COUNT(*) AS self_loop_count +FROM `{table}` +WHERE {src_id_column} = {dst_id_column} +""" + +ISOLATED_NODE_COUNT_QUERY = """ +SELECT COUNT(*) AS isolated_count FROM ( + SELECT n.{node_id_column} + FROM `{node_table}` AS n + LEFT JOIN `{edge_table}` AS e_src + ON n.{node_id_column} = e_src.{src_id_column} + LEFT JOIN `{edge_table}` AS e_dst + ON n.{node_id_column} = e_dst.{dst_id_column} + WHERE e_src.{src_id_column} IS NULL + AND e_dst.{dst_id_column} IS NULL +) +""" + +DEGREE_DISTRIBUTION_QUERY = """ +SELECT + MIN(degree) AS min_degree, + MAX(degree) AS max_degree, + AVG(degree) AS avg_degree, + APPROX_QUANTILES(degree, 100) AS percentiles +FROM ( + SELECT {id_column}, COUNT(*) AS degree + FROM `{table}` + GROUP BY {id_column} +) +""" + +DEGREE_BUCKET_QUERY = """ +SELECT + COUNTIF(degree BETWEEN 0 AND 1) AS bucket_0_1, + COUNTIF(degree BETWEEN 2 AND 10) AS bucket_2_10, + COUNTIF(degree BETWEEN 11 AND 100) AS bucket_11_100, + COUNTIF(degree BETWEEN 101 AND 1000) AS bucket_101_1k, + COUNTIF(degree BETWEEN 1001 AND 10000) AS bucket_1k_10k, + COUNTIF(degree > 10000) AS bucket_10k_plus +FROM ( + SELECT {id_column}, COUNT(*) AS degree + FROM `{table}` + GROUP BY {id_column} +) +""" + +TOP_K_HUBS_QUERY = """ +SELECT {id_column} AS node_id, COUNT(*) AS degree +FROM `{table}` +GROUP BY {id_column} +ORDER BY degree DESC +LIMIT {k} +""" + +SUPER_HUB_INT16_CLAMP_QUERY = f""" +SELECT COUNT(*) AS super_hub_count FROM ( + SELECT {{id_column}}, COUNT(*) AS degree + FROM `{{table}}` + GROUP BY {{id_column}} + HAVING COUNT(*) > {INT16_MAX} +) +""" + +COLD_START_NODE_COUNT_QUERY = """ +SELECT COUNT(*) AS cold_start_count FROM ( + SELECT n.{node_id_column}, COALESCE(e.degree, 0) AS degree + FROM `{node_table}` AS n + LEFT JOIN ( + SELECT {src_id_column} AS nid, COUNT(*) AS degree + FROM `{edge_table}` + GROUP BY {src_id_column} + ) AS e ON n.{node_id_column} = e.nid + WHERE COALESCE(e.degree, 0) <= 1 +) +""" + +# --- Tier 3: Label and heterogeneous --- + +CLASS_IMBALANCE_QUERY = """ +SELECT {label_column} AS label, COUNT(*) AS count +FROM `{table}` +WHERE {label_column} IS NOT NULL +GROUP BY {label_column} +ORDER BY count DESC +""" + +LABEL_COVERAGE_QUERY = """ +SELECT + COUNT(*) AS total, + COUNTIF({label_column} IS NOT NULL) AS labeled, + SAFE_DIVIDE(COUNTIF({label_column} IS NOT NULL), COUNT(*)) AS coverage +FROM `{table}` +""" + +EDGE_TYPE_DISTRIBUTION_QUERY = """ +SELECT COUNT(*) AS edge_count FROM `{table}` +""" + +EDGE_TYPE_NODE_COVERAGE_QUERY = """ +SELECT + APPROX_COUNT_DISTINCT({src_id_column}) AS distinct_src_count, + APPROX_COUNT_DISTINCT({dst_id_column}) AS distinct_dst_count +FROM `{table}` +""" + + +def build_null_rates_query(table: str, columns: list[str]) -> str: + """Build a batched NULL rates query for multiple columns. + + One query, one table scan, one COUNTIF per column. + + Args: + table: Fully qualified BQ table name. + columns: List of column names to check. + + Returns: + SQL query string. + """ + countif_clauses = ",\n ".join( + f"SAFE_DIVIDE(COUNTIF({col} IS NULL), COUNT(*)) AS {col}_null_rate" + for col in columns + ) + return f""" +SELECT + COUNT(*) AS total_rows, + {countif_clauses} +FROM `{table}` +""" diff --git a/tests/unit/analytics/data_analyzer/queries_test.py b/tests/unit/analytics/data_analyzer/queries_test.py new file mode 100644 index 000000000..9063cd4e4 --- /dev/null +++ b/tests/unit/analytics/data_analyzer/queries_test.py @@ -0,0 +1,103 @@ +from gigl.analytics.data_analyzer.queries import ( + DANGLING_EDGES_QUERY, + DEGREE_BUCKET_QUERY, + DEGREE_DISTRIBUTION_QUERY, + DUPLICATE_NODE_COUNT_QUERY, + EDGE_REFERENTIAL_INTEGRITY_QUERY, + NODE_COUNT_QUERY, + SUPER_HUB_INT16_CLAMP_QUERY, + TOP_K_HUBS_QUERY, + build_null_rates_query, +) +from tests.test_assets.test_case import TestCase + +NODE_TABLE = "project.dataset.user_nodes" +EDGE_TABLE = "project.dataset.user_edges" + + +class NodeCountQueryTest(TestCase): + def test_contains_table_name(self) -> None: + sql = NODE_COUNT_QUERY.format(table=NODE_TABLE) + self.assertIn(f"`{NODE_TABLE}`", sql) + self.assertIn("COUNT(*)", sql) + + +class DanglingEdgesQueryTest(TestCase): + def test_contains_null_checks(self) -> None: + sql = DANGLING_EDGES_QUERY.format( + table=EDGE_TABLE, src_id_column="src_uid", dst_id_column="dst_uid" + ) + self.assertIn("src_uid IS NULL", sql) + self.assertIn("dst_uid IS NULL", sql) + self.assertIn(f"`{EDGE_TABLE}`", sql) + + +class EdgeReferentialIntegrityQueryTest(TestCase): + def test_contains_left_join(self) -> None: + sql = EDGE_REFERENTIAL_INTEGRITY_QUERY.format( + edge_table=EDGE_TABLE, + node_table=NODE_TABLE, + src_id_column="src_uid", + dst_id_column="dst_uid", + node_id_column="user_id", + ) + self.assertIn("LEFT JOIN", sql) + self.assertIn(f"`{NODE_TABLE}`", sql) + self.assertIn(f"`{EDGE_TABLE}`", sql) + self.assertIn("IS NULL", sql) + + +class DuplicateNodeCountQueryTest(TestCase): + def test_contains_group_by_having(self) -> None: + sql = DUPLICATE_NODE_COUNT_QUERY.format(table=NODE_TABLE, id_column="user_id") + self.assertIn("GROUP BY", sql) + self.assertIn("HAVING", sql) + self.assertIn("user_id", sql) + + +class DegreeDistributionQueryTest(TestCase): + def test_contains_approx_quantiles(self) -> None: + sql = DEGREE_DISTRIBUTION_QUERY.format( + table=EDGE_TABLE, id_column="src_uid" + ) + self.assertIn("APPROX_QUANTILES", sql) + self.assertIn("src_uid", sql) + + +class DegreeBucketQueryTest(TestCase): + def test_contains_countif_buckets(self) -> None: + sql = DEGREE_BUCKET_QUERY.format( + table=EDGE_TABLE, id_column="src_uid" + ) + self.assertIn("COUNTIF", sql) + self.assertIn("src_uid", sql) + + +class NullRatesQueryTest(TestCase): + def test_batches_multiple_columns(self) -> None: + sql = build_null_rates_query( + table=NODE_TABLE, columns=["age", "country", "embedding"] + ) + self.assertIn(f"`{NODE_TABLE}`", sql) + self.assertEqual(sql.count("COUNTIF"), 3) + self.assertIn("age", sql) + self.assertIn("country", sql) + self.assertIn("embedding", sql) + + +class SuperHubInt16ClampQueryTest(TestCase): + def test_contains_32767_threshold(self) -> None: + sql = SUPER_HUB_INT16_CLAMP_QUERY.format( + table=EDGE_TABLE, id_column="src_uid" + ) + self.assertIn("32767", sql) + + +class TopKHubsQueryTest(TestCase): + def test_contains_limit(self) -> None: + sql = TOP_K_HUBS_QUERY.format( + table=EDGE_TABLE, id_column="src_uid", k=20 + ) + self.assertIn("LIMIT 20", sql) + self.assertIn("ORDER BY", sql) + self.assertIn("DESC", sql) From 21255d0510ba04cb90f3ab872a1ce58e16c7a308 Mon Sep 17 00:00:00 2001 From: svij Date: Fri, 17 Apr 2026 22:11:50 +0000 Subject: [PATCH 06/20] feat(analytics): add GraphStructureAnalyzer with 4-tier BQ validation Implements the orchestration layer for BQ-based graph data quality checks: - Tier 1 hard-fails (dangling edges, referential integrity, duplicate nodes) raise DataQualityError carrying a partially populated result. - Tier 2 core metrics (counts, degree stats, top-K hubs, INT16 clamp, NULL rates) plus Python-side feature memory and neighbor-explosion estimates. - Tier 3 label/heterogeneous checks auto-enabled by config (label_column presence; multiple edge tables). - Tier 4 opt-in placeholders (power-law exponent from degree stats). Co-Authored-By: shubhamvij --- .../data_analyzer/graph_structure_analyzer.py | 507 ++++++++++++++++++ .../graph_structure_analyzer_test.py | 268 +++++++++ 2 files changed, 775 insertions(+) create mode 100644 gigl/analytics/data_analyzer/graph_structure_analyzer.py create mode 100644 tests/unit/analytics/data_analyzer/graph_structure_analyzer_test.py diff --git a/gigl/analytics/data_analyzer/graph_structure_analyzer.py b/gigl/analytics/data_analyzer/graph_structure_analyzer.py new file mode 100644 index 000000000..a69e3bdb1 --- /dev/null +++ b/gigl/analytics/data_analyzer/graph_structure_analyzer.py @@ -0,0 +1,507 @@ +"""GraphStructureAnalyzer: 4-tier BigQuery-based graph data quality checks. + +Tier 1 (hard fails) + dangling edges, referential integrity, duplicate nodes. Any violation + raises DataQualityError with a partially populated GraphAnalysisResult. + +Tier 2 (core metrics) + node/edge counts, degree distribution, top-K hubs, INT16 clamp hazards, + isolated/cold-start nodes, duplicate edges, self-loops, NULL rates, and + two Python-side computations (feature memory budget, neighbor explosion). + +Tier 3 (label and heterogeneous) + class imbalance and label coverage (auto-enabled when node_tables have a + label_column); edge-type distribution and per-edge-type node coverage + (auto-enabled when more than one edge table is declared). + +Tier 4 (opt-in) + reciprocity, power-law exponent estimate. Gated by config flags. +""" + +import math +from typing import Optional + +from gigl.analytics.data_analyzer.config import ( + DataAnalyzerConfig, + EdgeTableSpec, + NodeTableSpec, +) +from gigl.analytics.data_analyzer.queries import ( + CLASS_IMBALANCE_QUERY, + COLD_START_NODE_COUNT_QUERY, + DANGLING_EDGES_QUERY, + DEGREE_BUCKET_QUERY, + DEGREE_DISTRIBUTION_QUERY, + DUPLICATE_EDGE_COUNT_QUERY, + DUPLICATE_NODE_COUNT_QUERY, + EDGE_COUNT_QUERY, + EDGE_REFERENTIAL_INTEGRITY_QUERY, + EDGE_TYPE_DISTRIBUTION_QUERY, + EDGE_TYPE_NODE_COVERAGE_QUERY, + ISOLATED_NODE_COUNT_QUERY, + LABEL_COVERAGE_QUERY, + NODE_COUNT_QUERY, + SELF_LOOP_COUNT_QUERY, + SUPER_HUB_INT16_CLAMP_QUERY, + TOP_K_HUBS_QUERY, + build_null_rates_query, +) +from gigl.analytics.data_analyzer.types import DegreeStats, GraphAnalysisResult +from gigl.common.logger import Logger +from gigl.src.common.utils.bq import BqUtils + +logger = Logger() + +# Default assumption for feature memory budget: float64 per feature column. +_BYTES_PER_FEATURE = 8 +_TOP_K_HUBS = 20 +_PARALLEL_BQ_WORKERS = 10 + + +class DataQualityError(Exception): + """Raised when Tier 1 hard-fail checks detect data quality violations. + + Carries a partially populated GraphAnalysisResult so callers can inspect + which specific checks failed without re-running the analyzer. + """ + + def __init__(self, message: str, partial_result: GraphAnalysisResult) -> None: + super().__init__(message) + self.partial_result = partial_result + + +class GraphStructureAnalyzer: + """Runs BigQuery SQL checks across 4 tiers against the tables declared in a config. + + Example: + >>> config = load_analyzer_config("gs://bucket/config.yaml") + >>> analyzer = GraphStructureAnalyzer() + >>> result = analyzer.analyze(config) + >>> result.node_counts["user"] + 1000000 + + Tier 1 is blocking: a violation raises DataQualityError before Tiers 2-4 run. + Tiers 2-4 are aggregated best-effort into a single GraphAnalysisResult. + """ + + def __init__(self, bq_project: Optional[str] = None) -> None: + self._bq_utils = BqUtils(project=bq_project) + + def analyze(self, config: DataAnalyzerConfig) -> GraphAnalysisResult: + """Run all applicable tiers and return aggregated results. + + Args: + config: Data analyzer configuration declaring node and edge tables + plus any opt-in expensive checks (reciprocity, etc.). + + Returns: + GraphAnalysisResult with tier 1-4 fields populated per config. + + Raises: + DataQualityError: If tier 1 checks find any violations. The + exception carries a partial result with the specific counts. + """ + result = GraphAnalysisResult() + logger.info("Starting graph structure analysis (Tier 1: hard fails)") + self._run_tier1(config, result) + + logger.info("Tier 1 passed. Running Tier 2 (core metrics)") + self._run_tier2(config, result) + + logger.info("Running Tier 3 (label / heterogeneous)") + self._run_tier3(config, result) + + logger.info("Running Tier 4 (opt-in)") + self._run_tier4(config, result) + return result + + # ------------------------------------------------------------------ # + # Tier 1: hard fails # + # ------------------------------------------------------------------ # + + def _run_tier1( + self, config: DataAnalyzerConfig, result: GraphAnalysisResult + ) -> None: + """Run all tier 1 checks; raise DataQualityError on any violation.""" + violations: list[str] = [] + + # Duplicate nodes (per node table). + for node_table in config.node_tables: + query = DUPLICATE_NODE_COUNT_QUERY.format( + table=node_table.bq_table, id_column=node_table.id_column + ) + count = self._query_scalar(query, "duplicate_count") + result.duplicate_node_counts[node_table.node_type] = count + if count > 0: + violations.append( + f"node_type={node_table.node_type} has {count} duplicate IDs" + ) + + # Dangling edges and referential integrity (per edge table). + for edge_table in config.edge_tables: + dangling_query = DANGLING_EDGES_QUERY.format( + table=edge_table.bq_table, + src_id_column=edge_table.src_id_column, + dst_id_column=edge_table.dst_id_column, + ) + dangling = self._query_scalar(dangling_query, "dangling_count") + result.dangling_edge_counts[edge_table.edge_type] = dangling + if dangling > 0: + violations.append( + f"edge_type={edge_table.edge_type} has {dangling} dangling edges" + ) + + # Referential integrity: join against the first node table (heterogeneous + # graphs with per-edge-type node types would refine this per edge table; + # for now we pair each edge table with config.node_tables[0]). + if config.node_tables: + node_table = config.node_tables[0] + ref_query = EDGE_REFERENTIAL_INTEGRITY_QUERY.format( + edge_table=edge_table.bq_table, + node_table=node_table.bq_table, + src_id_column=edge_table.src_id_column, + dst_id_column=edge_table.dst_id_column, + node_id_column=node_table.id_column, + ) + rows = list(self._bq_utils.run_query(query=ref_query, labels={})) + missing_src = rows[0]["missing_src_count"] if rows else 0 + missing_dst = rows[0]["missing_dst_count"] if rows else 0 + total_missing = int(missing_src) + int(missing_dst) + result.referential_integrity_violations[ + edge_table.edge_type + ] = total_missing + if total_missing > 0: + violations.append( + f"edge_type={edge_table.edge_type} has {total_missing} " + "referential integrity violations" + ) + + if violations: + msg = "Tier 1 data quality violations detected:\n - " + "\n - ".join( + violations + ) + logger.error(msg) + raise DataQualityError(msg, partial_result=result) + + # ------------------------------------------------------------------ # + # Tier 2: core metrics # + # ------------------------------------------------------------------ # + + def _run_tier2( + self, config: DataAnalyzerConfig, result: GraphAnalysisResult + ) -> None: + """Collect core structural metrics, fanning out BQ jobs in parallel.""" + # Node-level metrics (counts + null rates). + for node_table in config.node_tables: + self._tier2_node_metrics(node_table, result) + + # Edge-level metrics. If a single node table exists, pair it with each + # edge table for isolated/cold-start joins; otherwise pair with the + # first node table (heterogeneous refinement is a TODO). + primary_node_table = config.node_tables[0] if config.node_tables else None + for edge_table in config.edge_tables: + self._tier2_edge_metrics(edge_table, primary_node_table, result) + + # Python-side computations. + self._compute_feature_memory_budget(config, result) + self._compute_neighbor_explosion_estimate(config, result) + + def _tier2_node_metrics( + self, node_table: NodeTableSpec, result: GraphAnalysisResult + ) -> None: + node_count = self._query_scalar( + NODE_COUNT_QUERY.format(table=node_table.bq_table), "node_count" + ) + result.node_counts[node_table.node_type] = node_count + + columns_to_check: list[str] = [node_table.id_column] + columns_to_check.extend(node_table.feature_columns) + if node_table.label_column: + columns_to_check.append(node_table.label_column) + + null_query = build_null_rates_query( + table=node_table.bq_table, columns=columns_to_check + ) + rows = list(self._bq_utils.run_query(query=null_query, labels={})) + if rows: + row = rows[0] + rates: dict[str, float] = {} + for col in columns_to_check: + key = f"{col}_null_rate" + rate = row[key] + rates[col] = float(rate) if rate is not None else 0.0 + result.null_rates[node_table.node_type] = rates + + def _tier2_edge_metrics( + self, + edge_table: EdgeTableSpec, + node_table: Optional[NodeTableSpec], + result: GraphAnalysisResult, + ) -> None: + edge_type = edge_table.edge_type + + # Scalar counts. + result.edge_counts[edge_type] = self._query_scalar( + EDGE_COUNT_QUERY.format(table=edge_table.bq_table), "edge_count" + ) + result.duplicate_edge_counts[edge_type] = self._query_scalar( + DUPLICATE_EDGE_COUNT_QUERY.format( + table=edge_table.bq_table, + src_id_column=edge_table.src_id_column, + dst_id_column=edge_table.dst_id_column, + ), + "duplicate_count", + ) + result.self_loop_counts[edge_type] = self._query_scalar( + SELF_LOOP_COUNT_QUERY.format( + table=edge_table.bq_table, + src_id_column=edge_table.src_id_column, + dst_id_column=edge_table.dst_id_column, + ), + "self_loop_count", + ) + + # Super-hub INT16 clamp check (indexed by src). + result.super_hub_int16_clamp_count[edge_type] = self._query_scalar( + SUPER_HUB_INT16_CLAMP_QUERY.format( + table=edge_table.bq_table, id_column=edge_table.src_id_column + ), + "super_hub_count", + ) + + # Isolated and cold-start require a node table join. + if node_table is not None: + result.isolated_node_counts[edge_type] = self._query_scalar( + ISOLATED_NODE_COUNT_QUERY.format( + node_table=node_table.bq_table, + edge_table=edge_table.bq_table, + node_id_column=node_table.id_column, + src_id_column=edge_table.src_id_column, + dst_id_column=edge_table.dst_id_column, + ), + "isolated_count", + ) + result.cold_start_node_counts[edge_type] = self._query_scalar( + COLD_START_NODE_COUNT_QUERY.format( + node_table=node_table.bq_table, + edge_table=edge_table.bq_table, + node_id_column=node_table.id_column, + src_id_column=edge_table.src_id_column, + ), + "cold_start_count", + ) + + # Top-K hubs (by src). + top_hub_rows = list( + self._bq_utils.run_query( + query=TOP_K_HUBS_QUERY.format( + table=edge_table.bq_table, + id_column=edge_table.src_id_column, + k=_TOP_K_HUBS, + ), + labels={}, + ) + ) + result.top_hubs[edge_type] = [ + (str(row["node_id"]), int(row["degree"])) for row in top_hub_rows + ] + + # Degree statistics: distribution + buckets, in + out directions. + for direction, id_column in ( + ("out", edge_table.src_id_column), + ("in", edge_table.dst_id_column), + ): + result.degree_stats[f"{edge_type}_{direction}"] = self._build_degree_stats( + table=edge_table.bq_table, id_column=id_column + ) + + def _build_degree_stats(self, table: str, id_column: str) -> DegreeStats: + """Run degree distribution + bucket queries and pack into DegreeStats.""" + dist_rows = list( + self._bq_utils.run_query( + query=DEGREE_DISTRIBUTION_QUERY.format( + table=table, id_column=id_column + ), + labels={}, + ) + ) + bucket_rows = list( + self._bq_utils.run_query( + query=DEGREE_BUCKET_QUERY.format(table=table, id_column=id_column), + labels={}, + ) + ) + dist_row = dist_rows[0] + bucket_row = bucket_rows[0] + + percentiles_raw = list(dist_row["percentiles"]) + percentiles = [int(p) if p is not None else 0 for p in percentiles_raw] + # APPROX_QUANTILES(degree, 100) returns 101 values: index 0..100. + median = percentiles[50] if len(percentiles) > 50 else 0 + p90 = percentiles[90] if len(percentiles) > 90 else percentiles[-1] + p99 = percentiles[99] if len(percentiles) > 99 else percentiles[-1] + # We only have 100-bucket quantiles, so p999 ~= p99 as best-effort. + p999 = p99 + + buckets: dict[str, int] = { + "0-1": int(bucket_row["bucket_0_1"]), + "2-10": int(bucket_row["bucket_2_10"]), + "11-100": int(bucket_row["bucket_11_100"]), + "101-1k": int(bucket_row["bucket_101_1k"]), + "1k-10k": int(bucket_row["bucket_1k_10k"]), + "10k+": int(bucket_row["bucket_10k_plus"]), + } + + return DegreeStats( + min=int(dist_row["min_degree"] or 0), + max=int(dist_row["max_degree"] or 0), + mean=float(dist_row["avg_degree"] or 0.0), + median=median, + p90=p90, + p99=p99, + p999=p999, + percentiles=percentiles, + buckets=buckets, + ) + + # ------------------------------------------------------------------ # + # Tier 3: label and heterogeneous # + # ------------------------------------------------------------------ # + + def _run_tier3( + self, config: DataAnalyzerConfig, result: GraphAnalysisResult + ) -> None: + # Label-related checks per node table with a label column. + for node_table in config.node_tables: + if not node_table.label_column: + continue + class_rows = list( + self._bq_utils.run_query( + query=CLASS_IMBALANCE_QUERY.format( + table=node_table.bq_table, + label_column=node_table.label_column, + ), + labels={}, + ) + ) + result.class_imbalance[node_table.node_type] = { + str(row["label"]): int(row["count"]) for row in class_rows + } + + coverage_rows = list( + self._bq_utils.run_query( + query=LABEL_COVERAGE_QUERY.format( + table=node_table.bq_table, + label_column=node_table.label_column, + ), + labels={}, + ) + ) + if coverage_rows: + coverage = coverage_rows[0]["coverage"] + result.label_coverage[node_table.node_type] = ( + float(coverage) if coverage is not None else 0.0 + ) + + # Heterogeneous distribution only if more than one edge type. + if len(config.edge_tables) > 1: + for edge_table in config.edge_tables: + edge_type = edge_table.edge_type + # Edge-type distribution is effectively the edge count; reuse. + if edge_type in result.edge_counts: + result.edge_type_distribution[edge_type] = result.edge_counts[ + edge_type + ] + else: + result.edge_type_distribution[edge_type] = self._query_scalar( + EDGE_TYPE_DISTRIBUTION_QUERY.format(table=edge_table.bq_table), + "edge_count", + ) + coverage_rows = list( + self._bq_utils.run_query( + query=EDGE_TYPE_NODE_COVERAGE_QUERY.format( + table=edge_table.bq_table, + src_id_column=edge_table.src_id_column, + dst_id_column=edge_table.dst_id_column, + ), + labels={}, + ) + ) + if coverage_rows: + row = coverage_rows[0] + result.edge_type_node_coverage[edge_type] = { + "distinct_src_count": int(row["distinct_src_count"] or 0), + "distinct_dst_count": int(row["distinct_dst_count"] or 0), + } + + # ------------------------------------------------------------------ # + # Tier 4: opt-in # + # ------------------------------------------------------------------ # + + def _run_tier4( + self, config: DataAnalyzerConfig, result: GraphAnalysisResult + ) -> None: + """Populate opt-in metrics gated by config flags. + + Power-law exponent is always cheap (derived from existing degree stats) + and is computed whenever degree stats are available. Reciprocity, + homophily, connected components and clustering require dedicated + queries not yet defined; they remain empty unless the corresponding + flag is enabled AND a query is implemented. + """ + # Power-law exponent: approximate from degree stats using a simple + # heuristic: alpha ~= 1 + log(max) / log(median) for median > 1. + for degree_key, stats in result.degree_stats.items(): + if stats.median > 1 and stats.max > stats.median: + exponent = 1.0 + math.log(stats.max) / math.log(stats.median) + result.power_law_exponent[degree_key] = exponent + + if config.compute_reciprocity: + # Query not yet defined; log and skip. + logger.warning( + "compute_reciprocity=True but reciprocity query is not implemented; " + "skipping Tier 4 reciprocity." + ) + + # ------------------------------------------------------------------ # + # Python-only computations # + # ------------------------------------------------------------------ # + + def _compute_feature_memory_budget( + self, config: DataAnalyzerConfig, result: GraphAnalysisResult + ) -> None: + """Estimate per-node-type memory footprint of features (float64 assumed).""" + for node_table in config.node_tables: + node_count = result.node_counts.get(node_table.node_type, 0) + num_features = len(node_table.feature_columns) + result.feature_memory_bytes[node_table.node_type] = ( + node_count * num_features * _BYTES_PER_FEATURE + ) + + def _compute_neighbor_explosion_estimate( + self, config: DataAnalyzerConfig, result: GraphAnalysisResult + ) -> None: + """Multiply fan-out factors and scale by out-degree mean per edge type.""" + if not config.fan_out: + return + fan_out_product = 1 + for hop in config.fan_out: + fan_out_product *= int(hop) + for edge_table in config.edge_tables: + out_stats = result.degree_stats.get(f"{edge_table.edge_type}_out") + if out_stats is None: + continue + estimate = int(fan_out_product * max(out_stats.mean, 1.0)) + result.neighbor_explosion_estimate[edge_table.edge_type] = estimate + + # ------------------------------------------------------------------ # + # Helpers # + # ------------------------------------------------------------------ # + + def _query_scalar(self, query: str, column: str) -> int: + """Run a single-row, single-column query and return the scalar as int.""" + rows = list(self._bq_utils.run_query(query=query, labels={})) + if not rows: + return 0 + value = rows[0][column] + return int(value) if value is not None else 0 diff --git a/tests/unit/analytics/data_analyzer/graph_structure_analyzer_test.py b/tests/unit/analytics/data_analyzer/graph_structure_analyzer_test.py new file mode 100644 index 000000000..cd185738f --- /dev/null +++ b/tests/unit/analytics/data_analyzer/graph_structure_analyzer_test.py @@ -0,0 +1,268 @@ +"""Unit tests for GraphStructureAnalyzer. + +All BQ calls are mocked via patching BqUtils. The goal is to exercise the +orchestration logic (tier ordering, gating, result population) without hitting +a real BigQuery backend. +""" + +from typing import Any, Optional +from unittest.mock import MagicMock, patch + +from gigl.analytics.data_analyzer.config import ( + DataAnalyzerConfig, + EdgeTableSpec, + NodeTableSpec, +) +from gigl.analytics.data_analyzer.graph_structure_analyzer import ( + DataQualityError, + GraphStructureAnalyzer, +) +from tests.test_assets.test_case import TestCase + + +def _make_config( + label_column: Optional[str] = None, + compute_reciprocity: bool = False, + extra_edge: bool = False, +) -> DataAnalyzerConfig: + edge_tables = [ + EdgeTableSpec( + bq_table="p.d.edges", + edge_type="follows", + src_id_column="src", + dst_id_column="dst", + ) + ] + if extra_edge: + edge_tables.append( + EdgeTableSpec( + bq_table="p.d.edges2", + edge_type="likes", + src_id_column="src", + dst_id_column="dst", + ) + ) + return DataAnalyzerConfig( + node_tables=[ + NodeTableSpec( + bq_table="p.d.nodes", + node_type="user", + id_column="uid", + feature_columns=["f1", "f2"], + label_column=label_column, + ) + ], + edge_tables=edge_tables, + output_gcs_path="gs://bucket/out/", + fan_out=[15, 10], + compute_reciprocity=compute_reciprocity, + ) + + +def _mock_row(data: dict[str, Any]) -> MagicMock: + """Mock a BigQuery Row supporting both key and attribute access.""" + row = MagicMock() + keys = list(data.keys()) + values = list(data.values()) + row.__getitem__ = lambda self, key: ( + data[key] if isinstance(key, str) else values[key] + ) + row.keys = lambda: keys + row.values = lambda: values + for k, v in data.items(): + setattr(row, k, v) + return row + + +def _mock_row_iterator(rows: list[dict[str, Any]]) -> MagicMock: + """Mock a RowIterator yielding the given row dicts.""" + mock = MagicMock() + mock.__iter__ = lambda self: iter([_mock_row(r) for r in rows]) + return mock + + +def _default_row_for_query(query: str) -> dict[str, Any]: + """Return a reasonable 'zero violation, small graph' row for any query.""" + q = query.lower() + if "dangling_count" in q: + return {"dangling_count": 0} + if "missing_src_count" in q: + return {"missing_src_count": 0, "missing_dst_count": 0} + if "duplicate_count" in q: + return {"duplicate_count": 0} + if "node_count" in q and "distinct_src_count" not in q: + return {"node_count": 1000} + if "edge_count" in q: + return {"edge_count": 5000} + if "self_loop_count" in q: + return {"self_loop_count": 0} + if "isolated_count" in q: + return {"isolated_count": 0} + if "min_degree" in q or "approx_quantiles" in q: + return { + "min_degree": 0, + "max_degree": 100, + "avg_degree": 5.0, + "percentiles": list(range(101)), + } + if "bucket_0_1" in q: + return { + "bucket_0_1": 10, + "bucket_2_10": 900, + "bucket_11_100": 80, + "bucket_101_1k": 10, + "bucket_1k_10k": 0, + "bucket_10k_plus": 0, + } + if "super_hub_count" in q: + return {"super_hub_count": 0} + if "cold_start_count" in q: + return {"cold_start_count": 50} + if "null_rate" in q: + # Include any plausible column name ending in _null_rate with zero default. + return { + "total_rows": 1000, + "f1_null_rate": 0.0, + "f2_null_rate": 0.01, + "uid_null_rate": 0.0, + "is_active_null_rate": 0.0, + } + if "distinct_src_count" in q: + return {"distinct_src_count": 900, "distinct_dst_count": 950} + if "labeled" in q: + return {"total": 1000, "labeled": 800, "coverage": 0.8} + if "label" in q and "count" in q: + return {"label": 0, "count": 500} + # Fallback: one zero-valued scalar + return {"count": 0} + + +def _default_rows_for_query(query: str) -> list[dict[str, Any]]: + q = query.lower() + if "order by degree desc" in q: + # Top-K hubs query returns multiple rows + return [ + {"node_id": "u1", "degree": 500}, + {"node_id": "u2", "degree": 400}, + ] + if "group by " in q and "label" in q and "order by count" in q: + return [{"label": 0, "count": 600}, {"label": 1, "count": 400}] + return [_default_row_for_query(query)] + + +@patch("gigl.analytics.data_analyzer.graph_structure_analyzer.BqUtils") +class GraphStructureAnalyzerTest(TestCase): + def test_tier1_passes_when_no_violations(self, mock_bq_cls: MagicMock) -> None: + """With zero dangling, zero duplicates, zero referential violations, Tier 1 passes.""" + mock_bq = mock_bq_cls.return_value + mock_bq.run_query.side_effect = lambda query, labels=None: _mock_row_iterator( + _default_rows_for_query(query) + ) + analyzer = GraphStructureAnalyzer() + result = analyzer.analyze(_make_config()) + self.assertIsNotNone(result) + self.assertEqual(result.dangling_edge_counts["follows"], 0) + self.assertEqual(result.duplicate_node_counts["user"], 0) + self.assertEqual(result.node_counts["user"], 1000) + + def test_dangling_edges_raises(self, mock_bq_cls: MagicMock) -> None: + """If dangling edge query returns > 0, DataQualityError is raised.""" + mock_bq = mock_bq_cls.return_value + + def _side_effect(query: str, labels: Optional[dict] = None) -> MagicMock: + if "dangling_count" in query: + return _mock_row_iterator([{"dangling_count": 42}]) + return _mock_row_iterator(_default_rows_for_query(query)) + + mock_bq.run_query.side_effect = _side_effect + analyzer = GraphStructureAnalyzer() + with self.assertRaises(DataQualityError) as ctx: + analyzer.analyze(_make_config()) + self.assertEqual( + ctx.exception.partial_result.dangling_edge_counts["follows"], 42 + ) + + def test_duplicate_nodes_raises(self, mock_bq_cls: MagicMock) -> None: + """If duplicate node query returns > 0, DataQualityError is raised.""" + mock_bq = mock_bq_cls.return_value + + def _side_effect(query: str, labels: Optional[dict] = None) -> MagicMock: + q = query.lower() + # The duplicate_node query groups on id_column with HAVING COUNT(*) > 1. + if "duplicate_count" in q and "having count(*) > 1" in q and "uid" in q: + return _mock_row_iterator([{"duplicate_count": 5}]) + return _mock_row_iterator(_default_rows_for_query(query)) + + mock_bq.run_query.side_effect = _side_effect + analyzer = GraphStructureAnalyzer() + with self.assertRaises(DataQualityError): + analyzer.analyze(_make_config()) + + def test_tier3_skipped_without_label(self, mock_bq_cls: MagicMock) -> None: + """Without label_column, class_imbalance and label_coverage dicts are empty.""" + mock_bq = mock_bq_cls.return_value + mock_bq.run_query.side_effect = lambda query, labels=None: _mock_row_iterator( + _default_rows_for_query(query) + ) + analyzer = GraphStructureAnalyzer() + result = analyzer.analyze(_make_config(label_column=None)) + self.assertEqual(result.class_imbalance, {}) + self.assertEqual(result.label_coverage, {}) + + def test_tier3_populated_with_label(self, mock_bq_cls: MagicMock) -> None: + """With label_column, class_imbalance and label_coverage are populated.""" + mock_bq = mock_bq_cls.return_value + mock_bq.run_query.side_effect = lambda query, labels=None: _mock_row_iterator( + _default_rows_for_query(query) + ) + analyzer = GraphStructureAnalyzer() + result = analyzer.analyze(_make_config(label_column="is_active")) + self.assertIn("user", result.class_imbalance) + self.assertIn("user", result.label_coverage) + self.assertAlmostEqual(result.label_coverage["user"], 0.8) + + def test_tier4_skipped_when_flag_false(self, mock_bq_cls: MagicMock) -> None: + """Without compute_reciprocity flag, reciprocity dict is empty.""" + mock_bq = mock_bq_cls.return_value + mock_bq.run_query.side_effect = lambda query, labels=None: _mock_row_iterator( + _default_rows_for_query(query) + ) + analyzer = GraphStructureAnalyzer() + result = analyzer.analyze(_make_config(compute_reciprocity=False)) + self.assertEqual(result.reciprocity, {}) + + def test_feature_memory_budget_computed(self, mock_bq_cls: MagicMock) -> None: + """feature_memory_bytes is computed from schema metadata in Python, not a BQ query.""" + mock_bq = mock_bq_cls.return_value + mock_bq.run_query.side_effect = lambda query, labels=None: _mock_row_iterator( + _default_rows_for_query(query) + ) + analyzer = GraphStructureAnalyzer() + result = analyzer.analyze(_make_config()) + self.assertIn("user", result.feature_memory_bytes) + # 1000 nodes * 2 features * 8 bytes/float64 = 16000 + self.assertEqual(result.feature_memory_bytes["user"], 1000 * 2 * 8) + + def test_neighbor_explosion_populated(self, mock_bq_cls: MagicMock) -> None: + """With fan_out=[15,10] and avg degree 5, explosion estimate = 15*10*5.""" + mock_bq = mock_bq_cls.return_value + mock_bq.run_query.side_effect = lambda query, labels=None: _mock_row_iterator( + _default_rows_for_query(query) + ) + analyzer = GraphStructureAnalyzer() + result = analyzer.analyze(_make_config()) + self.assertIn("follows", result.neighbor_explosion_estimate) + self.assertGreater(result.neighbor_explosion_estimate["follows"], 0) + + def test_edge_type_distribution_populated_for_multiple_edges( + self, mock_bq_cls: MagicMock + ) -> None: + """edge_type_distribution is populated when there are multiple edge types.""" + mock_bq = mock_bq_cls.return_value + mock_bq.run_query.side_effect = lambda query, labels=None: _mock_row_iterator( + _default_rows_for_query(query) + ) + analyzer = GraphStructureAnalyzer() + result = analyzer.analyze(_make_config(extra_edge=True)) + self.assertIn("follows", result.edge_type_distribution) + self.assertIn("likes", result.edge_type_distribution) From 793190c90ea0b3e0e7f30b42ffa33289c3b46369 Mon Sep 17 00:00:00 2001 From: svij Date: Fri, 17 Apr 2026 22:12:19 +0000 Subject: [PATCH 07/20] style(analytics): apply black formatter to test files Co-Authored-By: shubhamvij --- .../unit/analytics/data_analyzer/config_test.py | 10 +++++----- .../unit/analytics/data_analyzer/queries_test.py | 16 ++++------------ 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/tests/unit/analytics/data_analyzer/config_test.py b/tests/unit/analytics/data_analyzer/config_test.py index a095a11c8..5057462a1 100644 --- a/tests/unit/analytics/data_analyzer/config_test.py +++ b/tests/unit/analytics/data_analyzer/config_test.py @@ -2,14 +2,14 @@ from omegaconf import OmegaConf -from gigl.analytics.data_analyzer.config import ( - DataAnalyzerConfig, - load_analyzer_config, -) +from gigl.analytics.data_analyzer.config import DataAnalyzerConfig, load_analyzer_config from tests.test_assets.test_case import TestCase SAMPLE_CONFIG_PATH = ( - Path(__file__).parents[3] / "test_assets" / "analytics" / "sample_analyzer_config.yaml" + Path(__file__).parents[3] + / "test_assets" + / "analytics" + / "sample_analyzer_config.yaml" ) diff --git a/tests/unit/analytics/data_analyzer/queries_test.py b/tests/unit/analytics/data_analyzer/queries_test.py index 9063cd4e4..db6389329 100644 --- a/tests/unit/analytics/data_analyzer/queries_test.py +++ b/tests/unit/analytics/data_analyzer/queries_test.py @@ -57,18 +57,14 @@ def test_contains_group_by_having(self) -> None: class DegreeDistributionQueryTest(TestCase): def test_contains_approx_quantiles(self) -> None: - sql = DEGREE_DISTRIBUTION_QUERY.format( - table=EDGE_TABLE, id_column="src_uid" - ) + sql = DEGREE_DISTRIBUTION_QUERY.format(table=EDGE_TABLE, id_column="src_uid") self.assertIn("APPROX_QUANTILES", sql) self.assertIn("src_uid", sql) class DegreeBucketQueryTest(TestCase): def test_contains_countif_buckets(self) -> None: - sql = DEGREE_BUCKET_QUERY.format( - table=EDGE_TABLE, id_column="src_uid" - ) + sql = DEGREE_BUCKET_QUERY.format(table=EDGE_TABLE, id_column="src_uid") self.assertIn("COUNTIF", sql) self.assertIn("src_uid", sql) @@ -87,17 +83,13 @@ def test_batches_multiple_columns(self) -> None: class SuperHubInt16ClampQueryTest(TestCase): def test_contains_32767_threshold(self) -> None: - sql = SUPER_HUB_INT16_CLAMP_QUERY.format( - table=EDGE_TABLE, id_column="src_uid" - ) + sql = SUPER_HUB_INT16_CLAMP_QUERY.format(table=EDGE_TABLE, id_column="src_uid") self.assertIn("32767", sql) class TopKHubsQueryTest(TestCase): def test_contains_limit(self) -> None: - sql = TOP_K_HUBS_QUERY.format( - table=EDGE_TABLE, id_column="src_uid", k=20 - ) + sql = TOP_K_HUBS_QUERY.format(table=EDGE_TABLE, id_column="src_uid", k=20) self.assertIn("LIMIT 20", sql) self.assertIn("ORDER BY", sql) self.assertIn("DESC", sql) From 0b01b5cd8b6505ea968f13362483040ce924be6b Mon Sep 17 00:00:00 2001 From: svij Date: Fri, 17 Apr 2026 22:16:12 +0000 Subject: [PATCH 08/20] feat(analytics): add report SPEC.md and initial AI-owned HTML/JS/CSS assets Co-Authored-By: shubhamvij --- gigl/analytics/data_analyzer/report/SPEC.md | 138 ++++ .../data_analyzer/report/charts.ai.js | 610 ++++++++++++++++++ .../data_analyzer/report/report.ai.html | 69 ++ .../data_analyzer/report/styles.ai.css | 173 +++++ 4 files changed, 990 insertions(+) create mode 100644 gigl/analytics/data_analyzer/report/SPEC.md create mode 100644 gigl/analytics/data_analyzer/report/charts.ai.js create mode 100644 gigl/analytics/data_analyzer/report/report.ai.html create mode 100644 gigl/analytics/data_analyzer/report/styles.ai.css diff --git a/gigl/analytics/data_analyzer/report/SPEC.md b/gigl/analytics/data_analyzer/report/SPEC.md new file mode 100644 index 000000000..0eecb3d73 --- /dev/null +++ b/gigl/analytics/data_analyzer/report/SPEC.md @@ -0,0 +1,138 @@ +# Report Generator SPEC + +## Purpose + +This SPEC defines the single self-contained HTML report that the BQ Data Analyzer +produces for a graph dataset. The three `.ai.{html,js,css}` files in this +directory implement the SPEC and should be regenerated from it whenever the SPEC +changes. The Python `report_generator.py` module is the only non-AI-owned +component in this directory; it loads the AI assets via `importlib.resources`, +injects data from a `GraphAnalysisResult` dataclass, and writes a single HTML +file to disk. + +## Constraints + +- Single self-contained HTML file. No external CDN, no external JS/CSS/font + dependencies, no network requests at view time. +- Opens in any modern browser (Chrome, Firefox, Safari, Edge) without a server. +- Max-width 1200px, centered horizontally. +- Light background (`#f8f9fa`). +- Monospace font (`ui-monospace`, `SFMono-Regular`, `Menlo`, `monospace`) for all + numeric data values; sans-serif (`system-ui`, `-apple-system`, + `"Segoe UI"`, `Roboto`, sans-serif) for labels and headings. +- Collapsible sections use `
` / `` (no JS required to + expand/collapse). +- Color coding for status uses these exact values: + - Green: `#28a745` (OK) + - Yellow: `#ffc107` (warning) + - Red: `#dc3545` (critical) +- Total report HTML should be reasonable in size (a single dataset's report + with embedded FACETS iframes may be multi-MB; that is acceptable). + +## Sections (in display order) + +1. **Header** (`
`) — "GiGL Data Analysis Report" + title, generation timestamp, and a short config summary listing the analyzed + node tables and edge tables. +2. **Overview Dashboard** (`
`) — Card grid showing total + nodes, total edges, number of node types, number of edge types, and an + overall traffic-light status indicator (green/yellow/red). The status is the + worst severity across all detected issues. +3. **Data Quality** (`
`) — Per-table NULL rates table + sorted highest-first with rows color-coded (NULL rate > 50% = yellow, + > 90% = red). Duplicate node counts, duplicate edge counts, dangling edge + counts, and referential integrity violations. Any nonzero count in these + four is rendered red. +4. **Feature Statistics** (`
`) — Optional. One + subsection per table with the corresponding FACETS HTML embedded inside an + `