From e8d0ea6c8ac4fd23d8069461dc8349c348376743 Mon Sep 17 00:00:00 2001 From: Pablo Seibelt Date: Fri, 17 Apr 2026 12:09:58 -0400 Subject: [PATCH] Preload source_aliases in process_executor_events PR #56916 added a _eager_load_dag_run_for_validation helper that returns two loader options so DRDataModel.model_validate can walk DagRun.consumed_asset_events[*].asset and source_aliases without triggering lazy loads on a detached ORM instance. The helper's docstring example applies both loaders, and _adopt_or_reset_orphaned_tasks applies both. SchedulerJobRunner.process_executor_events only applies asset_loader and discards alias_loader, leaving AssetEvent.source_aliases lazy-loaded. When Celery/Kubernetes executor reports a failed-while-queued event for an asset-triggered DagRun whose consumed AssetEvent has a non-empty source_aliases collection, SQLAlchemy detaches the AssetEvent by the time pydantic walks the DagRun, and source_aliases raises DetachedInstanceError wrapped as a pydantic ValidationError. The exception escapes _run_scheduler_loop, the scheduler exits, which orphans in-flight task instances. The next scheduler's _adopt_or_reset_orphaned_tasks picks them up and replays the same validation path, crashing again. Apply alias_loader alongside asset_loader at this call site. Extend test_process_executor_events_with_asset_events so the AssetEvent now carries a non-empty source_aliases collection and the assertion verifies the alias survives into the callback payload. Co-Authored-By: Claude --- airflow-core/newsfragments/65422.bugfix.rst | 1 + .../src/airflow/jobs/scheduler_job_runner.py | 3 ++- .../tests/unit/jobs/test_scheduler_job.py | 21 +++++++++++++------ 3 files changed, 18 insertions(+), 7 deletions(-) create mode 100644 airflow-core/newsfragments/65422.bugfix.rst diff --git a/airflow-core/newsfragments/65422.bugfix.rst b/airflow-core/newsfragments/65422.bugfix.rst new file mode 100644 index 0000000000000..efb484da7234c --- /dev/null +++ b/airflow-core/newsfragments/65422.bugfix.rst @@ -0,0 +1 @@ +Fix scheduler crash on asset-triggered DagRuns by eager-loading ``AssetEvent.source_aliases`` in ``SchedulerJobRunner.process_executor_events``. diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index d949012d4f5a4..2256e0b7bea66 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1231,12 +1231,13 @@ def process_executor_events( filter_for_tis = TI.filter_for_tis(tis_with_right_state) if filter_for_tis is None: return len(event_buffer) - asset_loader, _ = _eager_load_dag_run_for_validation() + asset_loader, alias_loader = _eager_load_dag_run_for_validation() query = ( select(TI) .where(filter_for_tis) .options(selectinload(TI.dag_model)) .options(asset_loader) + .options(alias_loader) .options(joinedload(TI.dag_run).selectinload(DagRun.created_dag_version)) .options(joinedload(TI.dag_version)) ) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index b0f72b91db3eb..21dd6a088ec93 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -849,11 +849,15 @@ def test_process_executor_events_with_asset_events(self, mock_stats_incr, sessio Test that _process_executor_events handles asset events without DetachedInstanceError. Regression test for scheduler crashes when task callbacks are built with - consumed_asset_events that weren't eager-loaded. + consumed_asset_events that weren't eager-loaded. Exercises both + ``AssetEvent.asset`` and ``AssetEvent.source_aliases`` so that missing + either loader option on the TI query surfaces as a ``DetachedInstanceError`` + when the callback's ``DRDataModel`` is built. """ asset1 = Asset(uri="test://asset1", name="test_asset_executor", group="test_group") asset_model = AssetModel(name=asset1.name, uri=asset1.uri, group=asset1.group) - session.add(asset_model) + asset_alias = AssetAliasModel(name="test_alias_executor", group="test_group") + session.add_all([asset_model, asset_alias]) session.flush() with dag_maker(dag_id="test_executor_events_with_assets", schedule=[asset1], fileloc="/test_path1/"): @@ -865,7 +869,9 @@ def test_process_executor_events_with_asset_events(self, mock_stats_incr, sessio dr = dag_maker.create_dagrun() - # Create asset event and attach to dag run + # Create asset event with an attached source alias so the lazy-loaded + # AssetEvent.source_aliases relationship is non-empty and must be + # eager-loaded to survive a detached ORM instance in the callback. asset_event = AssetEvent( asset_id=asset_model.id, source_task_id="upstream_task", @@ -873,6 +879,7 @@ def test_process_executor_events_with_asset_events(self, mock_stats_incr, sessio source_run_id="upstream_run", source_map_index=-1, ) + asset_event.source_aliases.append(asset_alias) session.add(asset_event) session.flush() dr.consumed_asset_events.append(asset_event) @@ -896,12 +903,14 @@ def test_process_executor_events_with_asset_events(self, mock_stats_incr, sessio ti1.refresh_from_db(session=session) assert ti1.state == State.FAILED - # Verify callback was created with asset event data + # Verify callback was created with asset event data including aliases self.job_runner.executor.callback_sink.send.assert_called_once() callback_request = self.job_runner.executor.callback_sink.send.call_args.args[0] assert callback_request.context_from_server is not None - assert len(callback_request.context_from_server.dag_run.consumed_asset_events) == 1 - assert callback_request.context_from_server.dag_run.consumed_asset_events[0].asset.uri == asset1.uri + events = callback_request.context_from_server.dag_run.consumed_asset_events + assert len(events) == 1 + assert events[0].asset.uri == asset1.uri + assert [alias.name for alias in events[0].source_aliases] == [asset_alias.name] @pytest.mark.usefixtures("testing_dag_bundle") def test_schedule_dag_run_with_asset_event(self, session: Session, dag_maker: DagMaker):