Skip to content

Preload source_aliases in process_executor_events#65422

Open
sicarul wants to merge 1 commit intoapache:mainfrom
sicarul:fix/scheduler-process-executor-events-source-aliases-eager-load
Open

Preload source_aliases in process_executor_events#65422
sicarul wants to merge 1 commit intoapache:mainfrom
sicarul:fix/scheduler-process-executor-events-source-aliases-eager-load

Conversation

@sicarul
Copy link
Copy Markdown
Contributor

@sicarul sicarul commented Apr 17, 2026

Summary

SchedulerJobRunner.process_executor_events builds a DRDataModel for the failing task via DRDataModel.model_validate(ti.dag_run, from_attributes=True). Pydantic walks DagRun.consumed_asset_events, and for each AssetEvent it descends into both asset and source_aliases.

PR #56916 (included in 3.2.0) added the helper _eager_load_dag_run_for_validation to preload those relationships. Its docstring example applies both loaders, and the other call site in _adopt_or_reset_orphaned_tasks applies both. The process_executor_events site only applies the first one:

asset_loader, _ = _eager_load_dag_run_for_validation()
query = (
    select(TI)
    .where(filter_for_tis)
    .options(selectinload(TI.dag_model))
    .options(asset_loader)
    # alias_loader is missing
    ...
)

As a result, when the Celery/Kubernetes executor reports a failed-while-queued event for an asset-triggered DagRun whose consumed AssetEvent has a non-empty source_aliases, SQLAlchemy detaches the AssetEvent instance by the time pydantic walks it, and source_aliases hits a lazy loader without a session:

pydantic_core._pydantic_core.ValidationError: 1 validation error for DagRun
consumed_asset_events.0.source_aliases
  Error extracting attribute: DetachedInstanceError: Parent instance
  <AssetEvent at 0x...> is not bound to a Session; lazy load operation
  of attribute 'source_aliases' cannot proceed

The exception escapes _run_scheduler_loop, the scheduler process exits, which orphans in-flight task instances. The next scheduler's _adopt_or_reset_orphaned_tasks picks them up, their adoption replays the same validation path, and the crash repeats.

Change

Apply the alias_loader alongside asset_loader at this call site. One extra selectinload is issued when the batch contains asset-triggered DagRuns with populated source_aliases; zero runtime impact for DAGs without asset events.

Test

The existing regression test test_process_executor_events_with_asset_events created an AssetEvent with no aliases, so it didn't stress the missing loader. Extended to attach an AssetAliasModel to the event and to assert that the alias survives into the callback payload:

events = callback_request.context_from_server.dag_run.consumed_asset_events
assert len(events) == 1
assert events[0].asset.uri == asset1.uri
assert [alias.name for alias in events[0].source_aliases] == [asset_alias.name]

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Generated-by: Claude Code following the guidelines


  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

@sicarul sicarul requested review from XD-DENG and ashb as code owners April 17, 2026 15:44
@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Apr 17, 2026
@sicarul sicarul marked this pull request as draft April 17, 2026 15:51
PR apache#56916 added a _eager_load_dag_run_for_validation helper that
returns two loader options so DRDataModel.model_validate can walk
DagRun.consumed_asset_events[*].asset and source_aliases without
triggering lazy loads on a detached ORM instance. The helper's
docstring example applies both loaders, and _adopt_or_reset_orphaned_tasks
applies both. SchedulerJobRunner.process_executor_events only applies
asset_loader and discards alias_loader, leaving
AssetEvent.source_aliases lazy-loaded.

When Celery/Kubernetes executor reports a failed-while-queued event
for an asset-triggered DagRun whose consumed AssetEvent has a
non-empty source_aliases collection, SQLAlchemy detaches the
AssetEvent by the time pydantic walks the DagRun, and source_aliases
raises DetachedInstanceError wrapped as a pydantic ValidationError.
The exception escapes _run_scheduler_loop, the scheduler exits,
which orphans in-flight task instances. The next scheduler's
_adopt_or_reset_orphaned_tasks picks them up and replays the same
validation path, crashing again.

Apply alias_loader alongside asset_loader at this call site.

Extend test_process_executor_events_with_asset_events so the
AssetEvent now carries a non-empty source_aliases collection and
the assertion verifies the alias survives into the callback payload.

Co-Authored-By: Claude <noreply@anthropic.com>
@sicarul sicarul force-pushed the fix/scheduler-process-executor-events-source-aliases-eager-load branch from 81b1163 to e8d0ea6 Compare April 17, 2026 16:10
@sicarul sicarul changed the title fix(scheduler): eager-load source_aliases in process_executor_events Preload source_aliases in process_executor_events Apr 17, 2026
@sicarul sicarul marked this pull request as ready for review April 17, 2026 17:45
@sicarul
Copy link
Copy Markdown
Contributor Author

sicarul commented Apr 17, 2026

cc: @Lee-W tagging you since you wrote the original PR which seems to have introduced this bug if i'm not mistaken.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant