Preload source_aliases in process_executor_events#65422
Open
sicarul wants to merge 1 commit intoapache:mainfrom
Open
Preload source_aliases in process_executor_events#65422sicarul wants to merge 1 commit intoapache:mainfrom
sicarul wants to merge 1 commit intoapache:mainfrom
Conversation
PR apache#56916 added a _eager_load_dag_run_for_validation helper that returns two loader options so DRDataModel.model_validate can walk DagRun.consumed_asset_events[*].asset and source_aliases without triggering lazy loads on a detached ORM instance. The helper's docstring example applies both loaders, and _adopt_or_reset_orphaned_tasks applies both. SchedulerJobRunner.process_executor_events only applies asset_loader and discards alias_loader, leaving AssetEvent.source_aliases lazy-loaded. When Celery/Kubernetes executor reports a failed-while-queued event for an asset-triggered DagRun whose consumed AssetEvent has a non-empty source_aliases collection, SQLAlchemy detaches the AssetEvent by the time pydantic walks the DagRun, and source_aliases raises DetachedInstanceError wrapped as a pydantic ValidationError. The exception escapes _run_scheduler_loop, the scheduler exits, which orphans in-flight task instances. The next scheduler's _adopt_or_reset_orphaned_tasks picks them up and replays the same validation path, crashing again. Apply alias_loader alongside asset_loader at this call site. Extend test_process_executor_events_with_asset_events so the AssetEvent now carries a non-empty source_aliases collection and the assertion verifies the alias survives into the callback payload. Co-Authored-By: Claude <noreply@anthropic.com>
81b1163 to
e8d0ea6
Compare
Contributor
Author
|
cc: @Lee-W tagging you since you wrote the original PR which seems to have introduced this bug if i'm not mistaken. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
SchedulerJobRunner.process_executor_eventsbuilds aDRDataModelfor the failing task viaDRDataModel.model_validate(ti.dag_run, from_attributes=True). Pydantic walksDagRun.consumed_asset_events, and for eachAssetEventit descends into bothassetandsource_aliases.PR #56916 (included in 3.2.0) added the helper
_eager_load_dag_run_for_validationto preload those relationships. Its docstring example applies both loaders, and the other call site in_adopt_or_reset_orphaned_tasksapplies both. Theprocess_executor_eventssite only applies the first one:As a result, when the Celery/Kubernetes executor reports a failed-while-queued event for an asset-triggered DagRun whose consumed
AssetEventhas a non-emptysource_aliases, SQLAlchemy detaches theAssetEventinstance by the time pydantic walks it, andsource_aliaseshits a lazy loader without a session:The exception escapes
_run_scheduler_loop, the scheduler process exits, which orphans in-flight task instances. The next scheduler's_adopt_or_reset_orphaned_taskspicks them up, their adoption replays the same validation path, and the crash repeats.Change
Apply the
alias_loaderalongsideasset_loaderat this call site. One extraselectinloadis issued when the batch contains asset-triggered DagRuns with populatedsource_aliases; zero runtime impact for DAGs without asset events.Test
The existing regression test
test_process_executor_events_with_asset_eventscreated anAssetEventwith no aliases, so it didn't stress the missing loader. Extended to attach anAssetAliasModelto the event and to assert that the alias survives into the callback payload:Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code following the guidelines
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.