Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions gigl/analytics/data_analyzer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""
BQ Data Analyzer for pre-training graph data analysis.

Produces a single HTML report covering data quality, feature distributions,
and graph structure metrics from BigQuery node/edge tables.
"""

from gigl.analytics.data_analyzer.data_analyzer import DataAnalyzer

__all__ = ["DataAnalyzer"]
6 changes: 6 additions & 0 deletions gigl/analytics/data_analyzer/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""Entry point for running the BQ Data Analyzer as a module: python -m gigl.analytics.data_analyzer."""

from gigl.analytics.data_analyzer.data_analyzer import main

if __name__ == "__main__":
main()
75 changes: 75 additions & 0 deletions gigl/analytics/data_analyzer/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from dataclasses import dataclass, field
from typing import Optional

from omegaconf import MISSING, OmegaConf

from gigl.common.logger import Logger

logger = Logger()


@dataclass
class NodeTableSpec:
"""Specification for a node table in BigQuery."""

bq_table: str = MISSING
node_type: str = MISSING
id_column: str = MISSING
feature_columns: list[str] = MISSING
label_column: Optional[str] = None


@dataclass
class EdgeTableSpec:
"""Specification for an edge table in BigQuery."""

bq_table: str = MISSING
edge_type: str = MISSING
src_id_column: str = MISSING
dst_id_column: str = MISSING
feature_columns: list[str] = field(default_factory=list)
timestamp_column: Optional[str] = None


@dataclass
class DataAnalyzerConfig:
"""Configuration for the BQ Data Analyzer.

Parsed from YAML via OmegaConf.

Example:
>>> config = load_analyzer_config("gs://bucket/config.yaml")
>>> config.node_tables[0].bq_table
'project.dataset.user_nodes'
"""

node_tables: list[NodeTableSpec] = MISSING
edge_tables: list[EdgeTableSpec] = MISSING
output_gcs_path: str = MISSING
fan_out: Optional[list[int]] = None
compute_reciprocity: bool = False
compute_homophily: bool = False
compute_connected_components: bool = False
compute_clustering: bool = False


def load_analyzer_config(config_path: str) -> DataAnalyzerConfig:
"""Load and validate a DataAnalyzerConfig from a YAML file.

Args:
config_path: Local file path or GCS URI to the YAML config.

Returns:
Validated DataAnalyzerConfig instance.

Raises:
omegaconf.errors.MissingMandatoryValue: If required fields are missing.
"""
raw = OmegaConf.load(config_path)
merged = OmegaConf.merge(OmegaConf.structured(DataAnalyzerConfig), raw)
config: DataAnalyzerConfig = OmegaConf.to_object(merged) # type: ignore
logger.info(
f"Loaded analyzer config with {len(config.node_tables)} node tables "
f"and {len(config.edge_tables)} edge tables"
)
return config
91 changes: 91 additions & 0 deletions gigl/analytics/data_analyzer/data_analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
"""Main orchestrator and CLI entry point for the BQ Data Analyzer."""
import argparse
from typing import Optional

from gigl.analytics.data_analyzer.config import DataAnalyzerConfig, load_analyzer_config
from gigl.analytics.data_analyzer.graph_structure_analyzer import (
DataQualityError,
GraphStructureAnalyzer,
)
from gigl.analytics.data_analyzer.report.report_generator import generate_report
from gigl.analytics.data_analyzer.types import FeatureProfileResult, GraphAnalysisResult
from gigl.common import Uri
from gigl.common.logger import Logger

logger = Logger()


class DataAnalyzer:
"""Orchestrates graph structure analysis, feature profiling, and report generation.

Example:
>>> from gigl.analytics.data_analyzer.config import load_analyzer_config
>>> config = load_analyzer_config("gs://bucket/config.yaml")
>>> analyzer = DataAnalyzer()
>>> report_path = analyzer.run(config=config)
"""

def run(
self,
config: DataAnalyzerConfig,
resource_config_uri: Optional[Uri] = None,
) -> str:
"""Run the full analysis pipeline and generate an HTML report.

Args:
config: Analyzer configuration.
resource_config_uri: Optional resource config for Dataflow sizing.

Returns:
GCS path to the generated HTML report.
"""
analysis_result: GraphAnalysisResult
profile_result: Optional[FeatureProfileResult] = None

structure_analyzer = GraphStructureAnalyzer()
try:
analysis_result = structure_analyzer.analyze(config)
except DataQualityError as e:
logger.error(f"Tier 1 data quality failure: {e}")
analysis_result = e.partial_result

# TODO: run feature profiler (TFDV/Dataflow) in parallel once implemented.

html = generate_report(
analysis_result=analysis_result,
profile_result=profile_result,
config=config,
)

report_gcs_path = f"{config.output_gcs_path.rstrip('/')}/report.html"
logger.info(f"Generated report; would upload to {report_gcs_path}")
# TODO: wire up GCS upload via gigl.common.utils.gcs.GcsUtils

return report_gcs_path


def main() -> None:
"""CLI entry point for the BQ Data Analyzer."""
parser = argparse.ArgumentParser(
description="BQ Data Analyzer: analyze graph data in BigQuery before GNN training"
)
parser.add_argument(
"--analyzer_config_uri",
required=True,
help="Path or GCS URI to the analyzer YAML config",
)
parser.add_argument(
"--resource_config_uri",
required=False,
help="Path or GCS URI to the resource config for Dataflow sizing",
)
args = parser.parse_args()

config = load_analyzer_config(args.analyzer_config_uri)
analyzer = DataAnalyzer()
report_path = analyzer.run(config=config)
logger.info(f"Report generated at: {report_path}")


if __name__ == "__main__":
main()
57 changes: 57 additions & 0 deletions gigl/analytics/data_analyzer/feature_profiler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""TFDV feature profiling via Beam/Dataflow.

Builds standalone Beam pipelines that read from BQ tables, run
tfdv.GenerateStatistics(), and produce FACETS HTML visualizations.

Will reuse existing PTransforms:
- GenerateAndVisualizeStats (gigl/src/data_preprocessor/lib/transform/utils.py:120)
- IngestRawFeatures (gigl/src/data_preprocessor/lib/transform/utils.py:85)
- init_beam_pipeline_options (gigl/src/common/utils/dataflow.py)

NOTE: Currently a stub. Full implementation is deferred to a future PR
once the wrapping Dataflow infrastructure is ready. The stub logs a
warning and returns an empty FeatureProfileResult so callers can wire
up their code without blocking on Dataflow.
"""
from typing import Optional

from gigl.analytics.data_analyzer.config import DataAnalyzerConfig
from gigl.analytics.data_analyzer.types import FeatureProfileResult
from gigl.common import Uri
from gigl.common.logger import Logger

logger = Logger()


class FeatureProfiler:
"""Runs TFDV feature profiling on BQ tables via Dataflow.

Currently a stub. See module docstring.

Example:
>>> profiler = FeatureProfiler()
>>> result = profiler.profile(config)
>>> # result.facets_html_paths will be empty until full impl lands
"""

def profile(
self,
config: DataAnalyzerConfig,
resource_config_uri: Optional[Uri] = None,
) -> FeatureProfileResult:
"""Run TFDV profiling on all tables in config.

Args:
config: Analyzer configuration with table specs.
resource_config_uri: Optional resource config for Dataflow sizing.

Returns:
FeatureProfileResult with GCS paths to TFDV artifacts.
"""
logger.warning(
"FeatureProfiler not yet implemented. "
"Returning empty results. "
"Full implementation will wire up Beam/Dataflow pipelines "
"using GenerateAndVisualizeStats and IngestRawFeatures."
)
return FeatureProfileResult()
Loading