feat(jobs): schedule periodic stale-job check#1227
Conversation
…pshots Two Celery Beat periodic tasks so operators can see ML job health without waiting for a job to finish: 1. check_stale_jobs_task (every 15 min) — thin wrapper around the existing check_stale_jobs() that reconciles jobs stuck in running states past FAILED_CUTOFF_HOURS. Previously the function existed but was only reachable via the update_stale_jobs management command, so nothing ran it automatically. 2. log_running_async_job_stats (every 5 min) — for each incomplete async_api job, opens a TaskQueueManager with that job's logger and logs a delivered/ack_floor/num_pending/num_ack_pending/num_redelivered snapshot of the NATS consumer. Read-only; no status changes. Builds on the lifecycle-logging landed in #1222 so long-running jobs now get mid-flight visibility, not just create + cleanup snapshots. Both registered via migration 0020 so existing deployments pick them up on the next migrate without manual Beat configuration. Co-Authored-By: Claude <noreply@anthropic.com>
✅ Deploy Preview for antenna-ssec canceled.
|
✅ Deploy Preview for antenna-preview canceled.
|
📝 WalkthroughWalkthroughAdds a scheduled umbrella Celery task Changes
Sequence Diagram(s)sequenceDiagram
participant Beat as Celery Beat
participant Health as jobs.health_check task
participant Jobs as Job DB / Reconciler
participant TQM as TaskQueueManager
participant NATS as NATS JetStream
Beat->>Health: trigger every 15m
Health->>Jobs: run stale-job reconciliation (check_stale_jobs)
Jobs-->>Health: IntegrityCheckResult (checked/fixed/unfixable)
Health->>TQM: for each running ASYNC_API job -> log_consumer_stats_snapshot(job_id)
TQM->>NATS: js.consumer_info / fetch consumer stats
NATS-->>TQM: consumer stats or error
TQM-->>Health: logs snapshot (tolerant to per-job errors)
Health-->>Beat: return combined JobsHealthCheckResult (as dict)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This PR adds two Celery Beat–scheduled monitoring tasks to prevent long-running ML jobs from silently hanging and to provide mid-flight observability into NATS consumer state for async_api jobs.
Changes:
- Add
check_stale_jobs_task(15 min) as a Beat entry point for the existingcheck_stale_jobs()reconciler. - Add
log_running_async_job_stats(5 min) to write periodic NATS consumer stats snapshots into each running async job’s per-job UI log. - Register both periodic tasks via a
django_celery_beatmigration and add targeted tests for both the snapshot helper and the Beat tasks.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
ami/jobs/tasks.py |
Adds two new Celery tasks for stale-job reconciliation and periodic NATS consumer snapshot logging. |
ami/jobs/migrations/0020_schedule_job_monitoring_beat_tasks.py |
Creates django_celery_beat.PeriodicTask entries for the two new monitoring tasks. |
ami/jobs/tests/test_periodic_beat_tasks.py |
Adds unit tests validating task outputs and that snapshots run per running async job without cross-job blockage. |
ami/ml/orchestration/nats_queue.py |
Adds a public log_consumer_stats_snapshot() method to log read-only mid-flight consumer stats. |
ami/ml/orchestration/tests/test_nats_queue.py |
Adds tests for snapshot logging behavior and missing-consumer tolerance. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
ami/jobs/migrations/0020_schedule_job_monitoring_beat_tasks.py (1)
14-37: Considerupdate_or_createso existing beat entries are corrected, not just preserved.
get_or_createwon’t fix staletask/crontabvalues when a task name already exists.update_or_createkeeps deployed scheduler config aligned with code.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/jobs/migrations/0020_schedule_job_monitoring_beat_tasks.py` around lines 14 - 37, Replace PeriodicTask.objects.get_or_create calls with update_or_create so existing PeriodicTask rows get their task/crontab/description corrected: for each task (e.g., "jobs.check_stale_jobs" and "jobs.log_running_async_job_stats") call PeriodicTask.objects.update_or_create(name=<name>, defaults={ "task": "<dotted_task>", "crontab": <schedule_obj>, "description": "<desc>" }) and continue to use CrontabSchedule.objects.get_or_create for the CrontabSchedule objects (assign the returned schedule to stale_schedule/stats_schedule and pass that object into the PeriodicTask.defaults).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ami/jobs/migrations/0020_schedule_job_monitoring_beat_tasks.py`:
- Around line 4-6: The migration function create_periodic_tasks currently
imports CrontabSchedule and PeriodicTask directly from
django_celery_beat.models; update it to use the migration-safe historical model
loader via apps.get_model (e.g., apps.get_model('django_celery_beat',
'CrontabSchedule') and apps.get_model('django_celery_beat', 'PeriodicTask'))
wherever those models are referenced (including the later usages around the
block noted at lines ~40-42) so the migration uses the historical models from
the migration state rather than direct imports.
---
Nitpick comments:
In `@ami/jobs/migrations/0020_schedule_job_monitoring_beat_tasks.py`:
- Around line 14-37: Replace PeriodicTask.objects.get_or_create calls with
update_or_create so existing PeriodicTask rows get their
task/crontab/description corrected: for each task (e.g., "jobs.check_stale_jobs"
and "jobs.log_running_async_job_stats") call
PeriodicTask.objects.update_or_create(name=<name>, defaults={ "task":
"<dotted_task>", "crontab": <schedule_obj>, "description": "<desc>" }) and
continue to use CrontabSchedule.objects.get_or_create for the CrontabSchedule
objects (assign the returned schedule to stale_schedule/stats_schedule and pass
that object into the PeriodicTask.defaults).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 99886835-f5ce-46e0-9154-c8cfc756a402
📒 Files selected for processing (5)
ami/jobs/migrations/0020_schedule_job_monitoring_beat_tasks.pyami/jobs/tasks.pyami/jobs/tests/test_periodic_beat_tasks.pyami/ml/orchestration/nats_queue.pyami/ml/orchestration/tests/test_nats_queue.py
- log_running_async_job_stats: reuse a single TaskQueueManager per tick instead of opening N NATS connections. Cost is now O(1) in the number of running async jobs rather than O(n). Guarded outer connection setup so a NATS outage drops the tick cleanly instead of crashing the task. - check_stale_jobs_task: bump expires from 10 to 14 minutes so a delayed copy still runs within the 15-minute schedule instead of expiring before a worker picks it up under broker pressure. - migration 0020: use apps.get_model for django_celery_beat models and declare an explicit migration dependency so the data migration uses historical model state. Co-Authored-By: Claude <noreply@anthropic.com>
If opening the shared TaskQueueManager raises at setup (or teardown), try each job with its own fresh manager before giving up. Defends against a regression in the shared mutation pattern silently costing us snapshot visibility for every running async job on every tick. If NATS itself is down, the fallback loop will fail too and log once per job — same end-state as before the refactor, just more noisy. Co-Authored-By: Claude <noreply@anthropic.com>
The periodic task is renamed from `check_stale_jobs_task` to
`jobs_health_check` so new job-health checks can share the 15-minute
cadence and `expires` guarantees without a new beat entry. Current
body runs a single `_run_stale_jobs_check()`; future checks plug in
alongside it and return the same `{checked, fixed, unfixable}` shape.
Names chosen to parallel #1188's integrity-check pattern:
- Beat task = noun phrase (`jobs_health_check`) — reads well in flower
- (Future) management command = verb (`manage.py check_jobs`)
Migration 0020 updated in place since it hasn't shipped yet.
Co-Authored-By: Claude <noreply@anthropic.com>
…kResult Addresses the takeaway-review findings on PR #1227: - Fix reverse migration: delete the row we create. The forward step registers `jobs.health_check` but the old `delete_periodic_tasks` still referenced the pre-rename `jobs.check_stale_jobs`, leaving a stranded row on rollback. - Collapse `log_running_async_job_stats` into a sub-check of the umbrella. Drops the second `PeriodicTask` and the shared-connection fallback path — on a 15-minute cadence there's no reason to keep two beat tasks or defend against a per-manager bug, and a quietly hung job now gets a snapshot in the same tick the reconciler will decide whether to revoke it. - Adopt `IntegrityCheckResult` as the shared sub-check shape, housed in a new `ami.main.checks.schemas` module so PR #1188 can re-target its import without a merge-order dance. Wrap the two sub-checks in a `JobsHealthCheckResult` parent dataclass; the umbrella returns `dataclasses.asdict(...)` for celery's JSON backend. Observation checks leave `fixed=0` and count per-item errors in `unfixable`. Tests collapse to one `JobsHealthCheckTest` class covering both sub-checks (reconcile + snapshot) and the edge cases that matter: per-job snapshot failure, shared-connection setup failure, non-async jobs skipped, idle deployment returns all zeros. Co-Authored-By: Claude <noreply@anthropic.com>
Addresses review findings from the re-review on 6dc7e6e: - The umbrella had no guard around sub-check calls, so a DB hiccup in ``_run_stale_jobs_check`` would kill the snapshot check and fail the whole task. Wrap each call in ``_safe_run_sub_check``, which catches, logs, and returns ``IntegrityCheckResult(unfixable=1)`` as a sentinel — operators watching the task result in Flower see the sub-check failed rather than reading all-zeros and assuming all-clear. - ``Job.logger`` attaches a ``JobLogHandler`` on first access which touches the ORM; the file's own docstring says resolve outside the event loop, but two accesses were inside the coroutine. Pre-resolve into a list of ``(job, job_logger)`` tuples before entering ``async_to_sync``. - Escalate the ``running_job_snapshots`` summary log to WARNING when ``errors > 0`` so persistent NATS unavailability is distinguishable from a quiet tick in aggregated logs. - Document that the outer shared-connection except overwrites per-iteration error counts on the rare ``__aexit__`` teardown path. New tests: - ``test_sub_check_exception_does_not_block_the_other`` — patches the snapshot sub-check to raise; stale-jobs still reports correctly and snapshots come back as the ``unfixable=1`` sentinel. - ``test_stale_jobs_fixed_counts_celery_updated_and_revoked_paths`` — one stale job with a terminal Celery ``task_id``, one without; both branches of ``fixed`` counted so a future refactor dropping one branch breaks the test. - Explicit ``fixed == 0`` assertion in the snapshot test locks the observation-only contract. Co-Authored-By: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
ami/jobs/migrations/0020_schedule_job_monitoring_beat_tasks.py (1)
15-25: Preferupdate_or_createto prevent config drift if the task already exists.
get_or_createwon’t correct an existingjobs.health_checkrow with outdatedtask/crontab/description.update_or_createkeeps this migration idempotent and convergent.🔧 Suggested change
- PeriodicTask.objects.get_or_create( - name="jobs.health_check", - defaults={ - "task": "ami.jobs.tasks.jobs_health_check", - "crontab": schedule, - "description": ( - "Umbrella job-health checks: stale-job reconciler plus a NATS " - "consumer snapshot for each running async_api job." - ), - }, - ) + PeriodicTask.objects.update_or_create( + name="jobs.health_check", + defaults={ + "task": "ami.jobs.tasks.jobs_health_check", + "crontab": schedule, + "description": ( + "Umbrella job-health checks: stale-job reconciler plus a NATS " + "consumer snapshot for each running async_api job." + ), + }, + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/jobs/migrations/0020_schedule_job_monitoring_beat_tasks.py` around lines 15 - 25, The migration uses PeriodicTask.objects.get_or_create for the "jobs.health_check" entry which won't correct existing rows with stale task/crontab/description values; replace the call with PeriodicTask.objects.update_or_create keyed on name="jobs.health_check" and supply the desired values for task, crontab, and description in the defaults/kwargs so the existing row is updated to match the migration (refer to PeriodicTask.objects.get_or_create and the "jobs.health_check" name and ensure task "ami.jobs.tasks.jobs_health_check", crontab variable, and the description string are applied via update_or_create).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ami/jobs/tasks.py`:
- Around line 449-507: The check _run_running_job_snapshot_check undercounts
failures because TaskQueueManager.log_consumer_stats_snapshot swallows errors
and doesn't signal failure; update that API to return a boolean (or similar
success flag) and have _run_running_job_snapshot_check call and inspect the
return value instead of relying only on exceptions: change
TaskQueueManager.log_consumer_stats_snapshot (and any internal helpers it uses)
to return True on success / False on failure, then in
_run_running_job_snapshot_check's loop (where
manager.log_consumer_stats_snapshot(job.pk) is awaited) check the result and
increment errors when it returns False (preserving the existing exception
handling for shared-connection setup/teardown via the outer try/except and
logger.exception).
---
Nitpick comments:
In `@ami/jobs/migrations/0020_schedule_job_monitoring_beat_tasks.py`:
- Around line 15-25: The migration uses PeriodicTask.objects.get_or_create for
the "jobs.health_check" entry which won't correct existing rows with stale
task/crontab/description values; replace the call with
PeriodicTask.objects.update_or_create keyed on name="jobs.health_check" and
supply the desired values for task, crontab, and description in the
defaults/kwargs so the existing row is updated to match the migration (refer to
PeriodicTask.objects.get_or_create and the "jobs.health_check" name and ensure
task "ami.jobs.tasks.jobs_health_check", crontab variable, and the description
string are applied via update_or_create).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 07c1e35f-8fbe-4d9f-a0d8-79abfacb568a
📒 Files selected for processing (6)
ami/jobs/migrations/0020_schedule_job_monitoring_beat_tasks.pyami/jobs/tasks.pyami/jobs/tests/test_periodic_beat_tasks.pyami/main/checks/__init__.pyami/main/checks/schemas.pyami/ml/orchestration/nats_queue.py
✅ Files skipped from review due to trivial changes (1)
- ami/main/checks/init.py
🚧 Files skipped from review as they are similar to previous changes (1)
- ami/ml/orchestration/nats_queue.py
| def _run_running_job_snapshot_check() -> IntegrityCheckResult: | ||
| """Log a NATS consumer snapshot for each running async_api job. | ||
|
|
||
| Observation-only: ``fixed`` stays 0 because no state is altered. Jobs | ||
| that error during snapshot are counted in ``unfixable`` — a persistently | ||
| stuck job will be picked up on the next tick by ``_run_stale_jobs_check``. | ||
| """ | ||
| from ami.jobs.models import Job, JobDispatchMode, JobState | ||
|
|
||
| running_jobs = list( | ||
| Job.objects.filter( | ||
| status__in=JobState.running_states(), | ||
| dispatch_mode=JobDispatchMode.ASYNC_API, | ||
| ) | ||
| ) | ||
| if not running_jobs: | ||
| return IntegrityCheckResult() | ||
|
|
||
| # Resolve each job's per-job logger synchronously before entering the | ||
| # event loop — ``Job.logger`` attaches a ``JobLogHandler`` on first access | ||
| # which touches the Django ORM, so it is only safe to call from a sync | ||
| # context. | ||
| job_loggers = [(job, job.logger) for job in running_jobs] | ||
| errors = 0 | ||
|
|
||
| async def _snapshot_all() -> None: | ||
| nonlocal errors | ||
| # One NATS connection per tick — on a 15-min cadence a per-job fallback | ||
| # is not worth the code. If the shared connection fails to set up, we | ||
| # skip this tick's snapshots and try fresh on the next one. | ||
| async with TaskQueueManager(job_logger=job_loggers[0][1]) as manager: | ||
| for job, job_logger in job_loggers: | ||
| try: | ||
| # ``log_async`` reads ``job_logger`` fresh each call, so | ||
| # swapping per iteration routes lifecycle lines to the | ||
| # right job's UI log. | ||
| manager.job_logger = job_logger | ||
| await manager.log_consumer_stats_snapshot(job.pk) | ||
| except Exception: | ||
| errors += 1 | ||
| logger.exception("Failed to snapshot NATS consumer stats for job %s", job.pk) | ||
|
|
||
| try: | ||
| async_to_sync(_snapshot_all)() | ||
| except Exception: | ||
| # Covers both ``__aenter__`` setup failures (no iteration ran) and the | ||
| # rare ``__aexit__`` teardown failure after a clean loop. In the | ||
| # teardown case this overwrites the per-iteration count with the total | ||
| # — accepted: a persistent failure will show up again next tick. | ||
| logger.exception("Shared-connection snapshot failed; marking tick unfixable") | ||
| errors = len(running_jobs) | ||
|
|
||
| log_fn = logger.warning if errors else logger.info | ||
| log_fn( | ||
| "running_job_snapshots check: %d running async job(s), %d error(s)", | ||
| len(running_jobs), | ||
| errors, | ||
| ) | ||
| return IntegrityCheckResult(checked=len(running_jobs), fixed=0, unfixable=errors) |
There was a problem hiding this comment.
running_job_snapshots.unfixable can undercount real per-job failures.
_run_running_job_snapshot_check increments errors only when log_consumer_stats_snapshot raises, but that method is implemented to swallow common snapshot-fetch failures internally and return early. This can report unfixable=0 even when snapshots failed for individual jobs. Consider returning a success/failure signal from log_consumer_stats_snapshot (or a companion API) and count from that result instead of exception flow.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ami/jobs/tasks.py` around lines 449 - 507, The check
_run_running_job_snapshot_check undercounts failures because
TaskQueueManager.log_consumer_stats_snapshot swallows errors and doesn't signal
failure; update that API to return a boolean (or similar success flag) and have
_run_running_job_snapshot_check call and inspect the return value instead of
relying only on exceptions: change TaskQueueManager.log_consumer_stats_snapshot
(and any internal helpers it uses) to return True on success / False on failure,
then in _run_running_job_snapshot_check's loop (where
manager.log_consumer_stats_snapshot(job.pk) is awaited) check the result and
increment errors when it returns False (preserving the existing exception
handling for shared-connection setup/teardown via the outer try/except and
logger.exception).
There was a problem hiding this comment.
If snapshots failed, we won't see logs. But I do like this idea. Perhaps unfixable could actually increment when jobs have alarming stats.
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
rg -n "_log_consumer_stats|log_consumer_stats_snapshot|ConsumerInfo|num_redelivered|num_pending|num_ack_pending|ack_floor|formatted_stats" ami/ml/orchestration/nats_queue.py -A 3 -B 1
Length of output: 2063
🏁 Script executed:
#!/bin/bash
rg -n "_log_consumer_stats" ami/ml/orchestration/nats_queue.py -A 30 | grep -A 30 "async def _log_consumer_stats"
Length of output: 1346
@mihow — both observations are valid, and they compound each other:
-
Silent on failure:
_log_consumer_statscurrently catches all exceptions and does onlylogger.debug(...), so a fetch failure produces no INFO/WARNING log and no exception to the_run_running_job_snapshot_checkloop. An operator watching job logs sees nothing, which looks identical to a healthy quiet job. -
Alarming-stats idea: This would make
unfixablesemantically richer and more actionable. Instead oflog_consumer_stats_snapshot → None, it could returnConsumerInfo | None, and_run_running_job_snapshot_checkcould then inspect the result across two axes:
info = await manager.log_consumer_stats_snapshot(job.pk)
if info is None:
errors += 1 # fetch failed → can't observe
elif _is_alarming(info):
errors += 1 # observed, but stats are concerningFor _is_alarming, the most actionable thresholds from the available ConsumerInfo fields would be:
num_redeliveredabove some threshold — repeated redelivery usually signals a processing loop or a stuck ackernum_ack_pendingsignificantly exceedingnum_pending + 1— messages being consumed but not acknowledged
A simple start could be a single configurable threshold on num_redelivered (e.g., > 10) since that's the clearest "something is wrong" signal without being noisy for normal in-flight jobs.
Would you like me to draft the changes to log_consumer_stats_snapshot / _log_consumer_stats (return ConsumerInfo | None) and the updated _run_running_job_snapshot_check loop (with the _is_alarming helper)? Happy to also add a WARNING-level log in _log_consumer_stats's except branch so fetch failures are visible at the INFO tier too.
🧠 Learnings used
Learnt from: mihow
Repo: RolnickLab/antenna PR: 1130
File: ami/ml/orchestration/nats_queue.py:217-217
Timestamp: 2026-02-13T23:11:30.276Z
Learning: In the RolnickLab/antenna project (Pydantic v1), use .dict() for serialization of Pydantic models instead of .model_dump(). Update code paths in Python files where model_dump() is used to ensure compatibility with Pydantic v1 behavior. If any files are later migrated to Pydantic v2, consider switching to .model_dump() or using model_dump(..., mode='json') as appropriate.
Learnt from: mihow
Repo: RolnickLab/antenna PR: 1222
File: ami/ml/orchestration/nats_queue.py:126-132
Timestamp: 2026-04-13T22:08:32.779Z
Learning: In code that uses the `nats-py` JetStream client, do not divide `ConsumerInfo.config.ack_wait` (or the `ack_wait` returned by `consumer_info()` / `add_consumer()`) by `1e9`. `nats-py` already deserializes the server’s raw nanosecond value by dividing by `_NANOSECOND` (1e9), so application-level code should treat `ack_wait` as already human-readable seconds (e.g., `30.0` means 30 seconds). Only apply nanosecond-to-second conversion if you are working with the raw server nanosecond field directly (not the `ack_wait` property exposed by `ConsumerInfo`/`ConsumerConfig`).
…ic-stale-job-check
… stalled-job reaper Two related fixes for the premature ``cleanup_async_job_resources`` path that flips async_api jobs to FAILURE within ~30-55s of dispatch while NATS is still delivering results: 1. ``Job.update_progress`` no longer silently bumps ``stage.progress = 1`` when ``stage.status`` is in a final state. The bumped value made ``is_complete()`` return True and triggered cleanup mid-flight. The trigger was ``_update_job_progress`` writing ``status=FAILURE`` at partial progress as soon as ``failed/total`` crossed FAILURE_THRESHOLD (very easy early in a job — 1-2 errors out of the first few results). Progress is a measurement; leave it alone. The honest FAILURE trip happens when the stage actually reaches 100%. 2. Split ``Job.FAILED_CUTOFF_HOURS`` (originally added in PR #368 to hide failed jobs from UI listings after 3 days) into: - ``FAILED_JOBS_DISPLAY_MAX_HOURS = 24 * 3`` — original UI/API filter - ``STALLED_JOBS_MAX_MINUTES = 10`` — reaper threshold The 72h value was reused by PR #1227's stale-job reaper for an entirely different purpose, leaving stuck jobs invisible for 3 days. ``check_stale_jobs`` now defaults to 10 minutes against ``updated_at`` (already in place from #1227), so a job whose worker pool stops pulling messages gets reaped within one Beat tick (~10-25 min). ``--hours`` arg on the management command becomes ``--minutes`` to match. Together: fix #1 stops the false-positive FAILUREs; fix #2 ensures true positives are caught quickly without depending on the misfiring coercion. Co-Authored-By: Claude <noreply@anthropic.com>
When the reaper revokes a job, log a single WARN line capturing the state needed to triage "why was this stalled?" without grepping back through prior tick logs: - minutes since last update vs threshold - previous_status, dispatch_mode, celery_state - per-stage progress + status summary - pointer to running_job_snapshots for prior NATS consumer state Pairs with PR #1227's per-tick consumer snapshots: every 15 min, each running async_api job gets a NATS state snapshot; this new line tells operators which of those snapshots is the last one before revocation. Co-Authored-By: Claude <noreply@anthropic.com>
…(3 days -> 10 min) (#1235) * fix(jobs): stop coercing partial-progress to 100% on FAILURE, tighten stalled-job reaper Two related fixes for the premature ``cleanup_async_job_resources`` path that flips async_api jobs to FAILURE within ~30-55s of dispatch while NATS is still delivering results: 1. ``Job.update_progress`` no longer silently bumps ``stage.progress = 1`` when ``stage.status`` is in a final state. The bumped value made ``is_complete()`` return True and triggered cleanup mid-flight. The trigger was ``_update_job_progress`` writing ``status=FAILURE`` at partial progress as soon as ``failed/total`` crossed FAILURE_THRESHOLD (very easy early in a job — 1-2 errors out of the first few results). Progress is a measurement; leave it alone. The honest FAILURE trip happens when the stage actually reaches 100%. 2. Split ``Job.FAILED_CUTOFF_HOURS`` (originally added in PR #368 to hide failed jobs from UI listings after 3 days) into: - ``FAILED_JOBS_DISPLAY_MAX_HOURS = 24 * 3`` — original UI/API filter - ``STALLED_JOBS_MAX_MINUTES = 10`` — reaper threshold The 72h value was reused by PR #1227's stale-job reaper for an entirely different purpose, leaving stuck jobs invisible for 3 days. ``check_stale_jobs`` now defaults to 10 minutes against ``updated_at`` (already in place from #1227), so a job whose worker pool stops pulling messages gets reaped within one Beat tick (~10-25 min). ``--hours`` arg on the management command becomes ``--minutes`` to match. Together: fix #1 stops the false-positive FAILUREs; fix #2 ensures true positives are caught quickly without depending on the misfiring coercion. Co-Authored-By: Claude <noreply@anthropic.com> * feat(jobs): log per-job diagnostic on stalled-job revoke When the reaper revokes a job, log a single WARN line capturing the state needed to triage "why was this stalled?" without grepping back through prior tick logs: - minutes since last update vs threshold - previous_status, dispatch_mode, celery_state - per-stage progress + status summary - pointer to running_job_snapshots for prior NATS consumer state Pairs with PR #1227's per-tick consumer snapshots: every 15 min, each running async_api job gets a NATS state snapshot; this new line tells operators which of those snapshots is the last one before revocation. Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
Summary
One Celery Beat periodic task so ML jobs can't silently hang for days without anyone noticing, and so operators have mid-flight visibility into each running async job.
jobs_health_check(every 15 min) is an umbrella that runs two sub-checks in sequence:stale_jobs— wraps the existingcheck_stale_jobs()reconciler (ami/jobs/tasks.py:319). The reconciler has been onmainsince fix: revoke stale jobs to "revoked" status instead of "pending" #1169 but nothing was scheduled to run it. Now any job stuck in a running state pastFAILED_CUTOFF_HOURSeither gets its status synced from Celery or revoked on the next tick, with async resources (NATS/Redis) cleaned up.running_job_snapshots— for each runningasync_apijob, emits onedelivered=… ack_floor=… num_pending=… num_ack_pending=… num_redelivered=…line to the job's own UI log. Builds on the lifecycle logging landed in feat(jobs): make NATS queue activity visible in async ML job logs #1222 — previously you got a NATS consumer snapshot on creation and at cleanup, nothing in between. Read-only, observation only.Running both in the same tick means a quietly hung async job gets a snapshot entry right before the reconciler decides whether to revoke it — one pass, one log trail per incident.
Sub-check shape
Each sub-check returns an
IntegrityCheckResult(checked, fixed, unfixable)dataclass (ami/main/checks/schemas.py). The umbrella wraps both in aJobsHealthCheckResultparent dataclass and returnsdataclasses.asdict(...)so celery's JSON backend can store the result and Flower can display it:{ "stale_jobs": {"checked": 2, "fixed": 2, "unfixable": 0}, "running_job_snapshots": {"checked": 3, "fixed": 0, "unfixable": 0} }Semantics:
fixedcounts rows actually mutated — observation-only checks keep it at 0 and count per-item errors inunfixable. This matches the convention from #1188, which will re-target itsIntegrityCheckResultimport to the new shared module on rebase.Naming convention
jobs_health_check) — reads naturally in Flower, admin, logs.manage.py check_jobs), symmetric with feat: add check_occurrences for occurrence data integrity #1188'scheck_data_integrity.Intentional asymmetry with #1188: this PR auto-schedules via migration because stale-job reconciliation is a self-healing invariant; #1188 leaves scheduling to admins because its checks are opt-in cleanup. The umbrella pattern itself already exists in the codebase —
ami.ml.tasks.check_processing_services_onlinehandles async+sync services in one task body — so this PR is formalizing a convention, not inventing one.Changes since the takeaway review
Earlier iterations shipped two beat tasks (one reconciler, one observer) and a shared-connection fast path with per-job fallback inside the observer. This push:
delete_periodic_tasksstill pointed at the pre-renamejobs.check_stale_jobs, which would have left a stranded row on rollback.IntegrityCheckResultand adds aJobsHealthCheckResultparent dataclass so the return shape is structured all the way down instead of dict-wrapping a dataclass._safe_run_sub_checkso one sub-check raising (e.g. a DB hiccup during reconciliation) no longer kills the whole tick — the failing sub-check returns anunfixable=1sentinel and the other sub-check still runs.Job.loggerinto a list before the event loop so theJobLogHandlerattachment stays on a sync context, matching the file's own guidance.running_job_snapshotssummary log to WARNING whenerrors > 0so persistent NATS unavailability is distinguishable from a quiet tick in aggregated logs.Test plan
docker compose -f docker-compose.ci.yml run --rm django python manage.py test ami.jobs.tests ami.ml.orchestration.tests.test_nats_queue --keepdb— 66 tests pass (46 jobs + 20 NATS queue)migrate jobs 0020thenmigrate jobs 0019thenmigrate jobs 0020— reverse removes the row it creates, forward restores it cleanly (see E2E verification below).black --check,flake8, pre-commit cleanjobs_health_check.apply()against idle stack, synthetic-stale-job reconciliation, live async_api job snapshot, and reverse-migration round-trip. See E2E verification below.*/1and watching the next two ticks in celerybeat + celeryworker logs (see E2E verification, Test 6). Crontab restored to*/15after the check.E2E verification (2026-04-14, local stack)
Run against the live antenna stack on detached HEAD at commit
6726eee5. Main antenna directory — same stack the other sessions have been using for #1231's Redis chaos tests.Test 1 — migration registers the expected
PeriodicTaskrowNo pre-rename
jobs.check_stale_jobsorjobs.log_running_async_job_statsrows were created.Test 2 — idle-stack tick returns structured nested dict
Forced
jobs_health_check.apply()on a clean project with no stale jobs and no running async jobs:{ "stale_jobs": {"checked": 0, "fixed": 0, "unfixable": 0}, "running_job_snapshots": {"checked": 0, "fixed": 0, "unfixable": 0} }Task completed in 21 ms. Confirms celery's JSON serializer handles
dataclasses.asdict(JobsHealthCheckResult(...))end-to-end and the summary log fires at INFO severity whenerrors == 0.Test 3 — reconciler sub-check revokes a synthetic stale job
Created
Job(name="e2e synthetic stale #1227", status=STARTED, updated_at=now-100h)(pk=1489), then forcedjobs_health_check.apply():The reconciler picked up the row, flipped STARTED → REVOKED because no
task_idwas present (untrusted Celery state path), and populatedfinished_at.fixedcorrectly counts the revoked path.Test 4 — snapshot sub-check lands a mid-flight line in a live async_api job's UI log
Created a fresh 15-image collection on project 18 and started
manage.py test_ml_job_e2e --dispatch-mode async_api --pipeline global_moths_2024. Waited for job 1490 to hit STARTED (NATS stream + consumer created), then forcedjobs_health_check.apply(). The task result:{"stale_jobs": {...zeros...}, "running_job_snapshots": {"checked": 1, "fixed": 0, "unfixable": 0}}And the new log line in job 1490's UI log (read via
Job.logs.stdout):Evidence for all three contract properties of the snapshot sub-check:
fixed=0.Job 1490.logs.stdout, not the module logger.num_pending=15matches the "Successfully queued 15/15 images" line from job start-up (ADC worker not consuming in this lane, so delivered stayed at 0).Test 5 — reverse-migration round-trip
The headline finding from the takeaway review: the pre-rename
delete_periodic_taskstargetedjobs.check_stale_jobswhile the forward createdjobs.health_check, orphaning a row on rollback. Verified the fix live:Test 6 — natural beat firing on the live scheduler
To shortcut the 15-minute cadence for verification, temporarily swapped the
PeriodicTaskcrontab to*/1 * * * *and calledPeriodicTasks.update_changed()so the DatabaseScheduler re-read the row. Watched celerybeat + celeryworker logs for the next two ticks, then restored*/15.Confirms the full pipeline:
PeriodicTask→ celerybeat scheduler → RabbitMQ → celeryworker → return value on the broker result backend. Crontab restored to*/15immediately after.Cleanup
Job 1490 was revoked manually (ADC worker not running in this lane, so it would have sat in STARTED until the reconciler caught it at 4 hours old). NATS stream/consumer deleted via
cleanup_async_job_resources(1490). Collection 171 left in place as a reference to pair with this PR's E2E. Main-dir branch restored tofix/async-job-state-redis-transient(#1231) with its stash popped.Out of scope / deliberately not included
last_checked_atetc.) — observability lives in logs, not DB.manage.py check_jobs [--dry-run]).