Skip to content

feat(jobs): schedule periodic stale-job check#1227

Merged
mihow merged 7 commits intomainfrom
feat/periodic-stale-job-check
Apr 15, 2026
Merged

feat(jobs): schedule periodic stale-job check#1227
mihow merged 7 commits intomainfrom
feat/periodic-stale-job-check

Conversation

@mihow
Copy link
Copy Markdown
Collaborator

@mihow mihow commented Apr 14, 2026

Summary

One Celery Beat periodic task so ML jobs can't silently hang for days without anyone noticing, and so operators have mid-flight visibility into each running async job.

jobs_health_check (every 15 min) is an umbrella that runs two sub-checks in sequence:

  • stale_jobs — wraps the existing check_stale_jobs() reconciler (ami/jobs/tasks.py:319). The reconciler has been on main since fix: revoke stale jobs to "revoked" status instead of "pending" #1169 but nothing was scheduled to run it. Now any job stuck in a running state past FAILED_CUTOFF_HOURS either gets its status synced from Celery or revoked on the next tick, with async resources (NATS/Redis) cleaned up.
  • running_job_snapshots — for each running async_api job, emits one delivered=… ack_floor=… num_pending=… num_ack_pending=… num_redelivered=… line to the job's own UI log. Builds on the lifecycle logging landed in feat(jobs): make NATS queue activity visible in async ML job logs #1222 — previously you got a NATS consumer snapshot on creation and at cleanup, nothing in between. Read-only, observation only.

Running both in the same tick means a quietly hung async job gets a snapshot entry right before the reconciler decides whether to revoke it — one pass, one log trail per incident.

Sub-check shape

Each sub-check returns an IntegrityCheckResult(checked, fixed, unfixable) dataclass (ami/main/checks/schemas.py). The umbrella wraps both in a JobsHealthCheckResult parent dataclass and returns dataclasses.asdict(...) so celery's JSON backend can store the result and Flower can display it:

{
  "stale_jobs":             {"checked": 2, "fixed": 2, "unfixable": 0},
  "running_job_snapshots":  {"checked": 3, "fixed": 0, "unfixable": 0}
}

Semantics: fixed counts rows actually mutated — observation-only checks keep it at 0 and count per-item errors in unfixable. This matches the convention from #1188, which will re-target its IntegrityCheckResult import to the new shared module on rebase.

Naming convention

Intentional asymmetry with #1188: this PR auto-schedules via migration because stale-job reconciliation is a self-healing invariant; #1188 leaves scheduling to admins because its checks are opt-in cleanup. The umbrella pattern itself already exists in the codebase — ami.ml.tasks.check_processing_services_online handles async+sync services in one task body — so this PR is formalizing a convention, not inventing one.

Changes since the takeaway review

Earlier iterations shipped two beat tasks (one reconciler, one observer) and a shared-connection fast path with per-job fallback inside the observer. This push:

  • Folds the observer into the umbrella as a sub-check. Migration 0020 now registers one PeriodicTask, not two.
  • Drops the per-job fallback. On a 15-min cadence there's no performance case for it; one NATS connection per tick is sufficient, and a setup failure just skips snapshots for that tick (the next one tries fresh).
  • Fixes the reverse migration — the old delete_periodic_tasks still pointed at the pre-rename jobs.check_stale_jobs, which would have left a stranded row on rollback.
  • Adopts IntegrityCheckResult and adds a JobsHealthCheckResult parent dataclass so the return shape is structured all the way down instead of dict-wrapping a dataclass.
  • Isolates sub-checks via _safe_run_sub_check so one sub-check raising (e.g. a DB hiccup during reconciliation) no longer kills the whole tick — the failing sub-check returns an unfixable=1 sentinel and the other sub-check still runs.
  • Pre-resolves Job.logger into a list before the event loop so the JobLogHandler attachment stays on a sync context, matching the file's own guidance.
  • Escalates the running_job_snapshots summary log to WARNING when errors > 0 so persistent NATS unavailability is distinguishable from a quiet tick in aggregated logs.

Test plan

  • docker compose -f docker-compose.ci.yml run --rm django python manage.py test ami.jobs.tests ami.ml.orchestration.tests.test_nats_queue --keepdb — 66 tests pass (46 jobs + 20 NATS queue)
  • migrate jobs 0020 then migrate jobs 0019 then migrate jobs 0020 — reverse removes the row it creates, forward restores it cleanly (see E2E verification below).
  • black --check, flake8, pre-commit clean
  • Local end-to-end run against project 18 covering all four surfaces: forced jobs_health_check.apply() against idle stack, synthetic-stale-job reconciliation, live async_api job snapshot, and reverse-migration round-trip. See E2E verification below.
  • Natural beat firing confirmed by temporarily swapping the crontab to */1 and watching the next two ticks in celerybeat + celeryworker logs (see E2E verification, Test 6). Crontab restored to */15 after the check.

E2E verification (2026-04-14, local stack)

Run against the live antenna stack on detached HEAD at commit 6726eee5. Main antenna directory — same stack the other sessions have been using for #1231's Redis chaos tests.

Test 1 — migration registers the expected PeriodicTask row

name='jobs.health_check'
task='ami.jobs.tasks.jobs_health_check'
crontab='*/15 * * * *'
enabled=True

No pre-rename jobs.check_stale_jobs or jobs.log_running_async_job_stats rows were created.

Test 2 — idle-stack tick returns structured nested dict

Forced jobs_health_check.apply() on a clean project with no stale jobs and no running async jobs:

{
  "stale_jobs":             {"checked": 0, "fixed": 0, "unfixable": 0},
  "running_job_snapshots":  {"checked": 0, "fixed": 0, "unfixable": 0}
}

Task completed in 21 ms. Confirms celery's JSON serializer handles dataclasses.asdict(JobsHealthCheckResult(...)) end-to-end and the summary log fires at INFO severity when errors == 0.

Test 3 — reconciler sub-check revokes a synthetic stale job

Created Job(name="e2e synthetic stale #1227", status=STARTED, updated_at=now-100h) (pk=1489), then forced jobs_health_check.apply():

stale_jobs check: 1 stale job(s), 0 updated from Celery, 1 revoked
result = {"stale_jobs": {"checked": 1, "fixed": 1, "unfixable": 0}, "running_job_snapshots": {"checked": 0, ...}}
after tick: status=REVOKED finished_at=2026-04-14 18:06:27.885160

The reconciler picked up the row, flipped STARTED → REVOKED because no task_id was present (untrusted Celery state path), and populated finished_at. fixed correctly counts the revoked path.

Test 4 — snapshot sub-check lands a mid-flight line in a live async_api job's UI log

Created a fresh 15-image collection on project 18 and started manage.py test_ml_job_e2e --dispatch-mode async_api --pipeline global_moths_2024. Waited for job 1490 to hit STARTED (NATS stream + consumer created), then forced jobs_health_check.apply(). The task result:

{"stale_jobs": {...zeros...}, "running_job_snapshots": {"checked": 1, "fixed": 0, "unfixable": 0}}

And the new log line in job 1490's UI log (read via Job.logs.stdout):

[2026-04-14 18:08:30] INFO NATS consumer status job-1490-consumer
  (delivered=0 ack_floor=0 num_pending=15 num_ack_pending=0 num_redelivered=0)

Evidence for all three contract properties of the snapshot sub-check:

  • Observation-only: fixed=0.
  • Per-job UI log routing: the snapshot entry lives on Job 1490.logs.stdout, not the module logger.
  • Consumer state was accurate: num_pending=15 matches the "Successfully queued 15/15 images" line from job start-up (ADC worker not consuming in this lane, so delivered stayed at 0).

Test 5 — reverse-migration round-trip

The headline finding from the takeaway review: the pre-rename delete_periodic_tasks targeted jobs.check_stale_jobs while the forward created jobs.health_check, orphaning a row on rollback. Verified the fix live:

migrate jobs 0019  →  PeriodicTask.objects.filter(name='jobs.health_check').exists() == False
migrate jobs 0020  →  restored: name='jobs.health_check' task='ami.jobs.tasks.jobs_health_check' crontab='*/15 * * * *' enabled=True

Test 6 — natural beat firing on the live scheduler

To shortcut the 15-minute cadence for verification, temporarily swapped the PeriodicTask crontab to */1 * * * * and called PeriodicTasks.update_changed() so the DatabaseScheduler re-read the row. Watched celerybeat + celeryworker logs for the next two ticks, then restored */15.

celerybeat-1   18:13:00,000  Scheduler: Sending due task jobs.health_check (ami.jobs.tasks.jobs_health_check)
celeryworker-1 18:13:00,002  Task ami.jobs.tasks.jobs_health_check[f29ebbff-...] received
celeryworker-1 18:13:00,024  Task ami.jobs.tasks.jobs_health_check[f29ebbff-...] succeeded in 0.021s:
                             {'stale_jobs': {'checked': 0, 'fixed': 0, 'unfixable': 0},
                              'running_job_snapshots': {'checked': 0, 'fixed': 0, 'unfixable': 0}}
celerybeat-1   18:14:00,000  Scheduler: Sending due task jobs.health_check (ami.jobs.tasks.jobs_health_check)
celeryworker-1 18:14:00,005  Task ami.jobs.tasks.jobs_health_check[46145cab-...] received

Confirms the full pipeline: PeriodicTask → celerybeat scheduler → RabbitMQ → celeryworker → return value on the broker result backend. Crontab restored to */15 immediately after.

Cleanup

Job 1490 was revoked manually (ADC worker not running in this lane, so it would have sat in STARTED until the reconciler caught it at 4 hours old). NATS stream/consumer deleted via cleanup_async_job_resources(1490). Collection 171 left in place as a reference to pair with this PR's E2E. Main-dir branch restored to fix/async-job-state-redis-transient (#1231) with its stash popped.

Out of scope / deliberately not included

  • No new model fields (last_checked_at etc.) — observability lives in logs, not DB.
  • No "disappeared task" / "stuck pending" / "resurrection" state machine — those branches in Add periodic status check for incomplete jobs  #1025 added thresholds without an operator asking for that granularity.
  • No management command yet — natural follow-up (manage.py check_jobs [--dry-run]).
  • No admin UI / system-health page — see follow-up discussion in PR comments.
  • No worker-side inline snapshot cadence (e.g. "log every 100 tasks processed") — complementary to the 15-min umbrella snapshot, worth a separate PR since it touches the async worker loop rather than the scheduler.

…pshots

Two Celery Beat periodic tasks so operators can see ML job health without
waiting for a job to finish:

1. check_stale_jobs_task (every 15 min) — thin wrapper around the existing
   check_stale_jobs() that reconciles jobs stuck in running states past
   FAILED_CUTOFF_HOURS. Previously the function existed but was only
   reachable via the update_stale_jobs management command, so nothing
   ran it automatically.

2. log_running_async_job_stats (every 5 min) — for each incomplete
   async_api job, opens a TaskQueueManager with that job's logger and
   logs a delivered/ack_floor/num_pending/num_ack_pending/num_redelivered
   snapshot of the NATS consumer. Read-only; no status changes.
   Builds on the lifecycle-logging landed in #1222 so long-running jobs
   now get mid-flight visibility, not just create + cleanup snapshots.

Both registered via migration 0020 so existing deployments pick them up
on the next migrate without manual Beat configuration.

Co-Authored-By: Claude <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings April 14, 2026 14:44
@netlify
Copy link
Copy Markdown

netlify bot commented Apr 14, 2026

Deploy Preview for antenna-ssec canceled.

Name Link
🔨 Latest commit 99d7c57
🔍 Latest deploy log https://app.netlify.com/projects/antenna-ssec/deploys/69ded2488c53e60008a1f434

@netlify
Copy link
Copy Markdown

netlify bot commented Apr 14, 2026

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit 99d7c57
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/69ded2486801d00009b14131

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 14, 2026

📝 Walkthrough

Walkthrough

Adds a scheduled umbrella Celery task jobs.health_check (runs every 15 minutes) that combines stale-job reconciliation and running-async-job consumer snapshots, a migration to register the periodic task, NATS consumer-stats logging helpers, shared integrity-check dataclasses, and tests covering the new task and NATS snapshot behavior.

Changes

Cohort / File(s) Summary
Celery Beat Migration
ami/jobs/migrations/0020_schedule_job_monitoring_beat_tasks.py
New data migration that creates/gets a CrontabSchedule (minute="*/15") and a PeriodicTask named jobs.health_check pointing to ami.jobs.tasks.jobs_health_check; rollback deletes PeriodicTask by name.
Umbrella Health Check Task & Helpers
ami/jobs/tasks.py
Added jobs_health_check Celery task (soft/ hard time limits, expires) that runs two sub-checks (stale-job reconciliation and running-async-job snapshots), _safe_run_sub_check wrapper, JobsHealthCheckResult dataclass, and an _JOBS_HEALTH_BEAT_EXPIRES constant.
Integrity Check Schemas & Package Export
ami/main/checks/schemas.py, ami/main/checks/__init__.py
New IntegrityCheckResult dataclass (checked, fixed, unfixable) and package-level re-export via ami.main.checks.
NATS Queue: stats logging refactor
ami/ml/orchestration/nats_queue.py
Added TaskQueueManager.log_consumer_stats_snapshot(job_id), refactored _log_final_consumer_stats to delegate to new _log_consumer_stats(job_id, *, prefix, suffix=""), consolidated stats-fetch/log logic and preserved tolerant failure behavior.
Tests: jobs health check
ami/jobs/tests/test_periodic_beat_tasks.py
New TestCase covering jobs_health_check() behaviors: stale-job reconciliation counts, running-async-job snapshot invocation and failure-tolerance, async connection failure handling, and filtering non-async jobs.
Tests: NATS snapshot behavior
ami/ml/orchestration/tests/test_nats_queue.py
Added tests for log_consumer_stats_snapshot() verifying INFO logging of consumer stats and graceful handling of NotFoundError from js.consumer_info.

Sequence Diagram(s)

sequenceDiagram
    participant Beat as Celery Beat
    participant Health as jobs.health_check task
    participant Jobs as Job DB / Reconciler
    participant TQM as TaskQueueManager
    participant NATS as NATS JetStream

    Beat->>Health: trigger every 15m
    Health->>Jobs: run stale-job reconciliation (check_stale_jobs)
    Jobs-->>Health: IntegrityCheckResult (checked/fixed/unfixable)
    Health->>TQM: for each running ASYNC_API job -> log_consumer_stats_snapshot(job_id)
    TQM->>NATS: js.consumer_info / fetch consumer stats
    NATS-->>TQM: consumer stats or error
    TQM-->>Health: logs snapshot (tolerant to per-job errors)
    Health-->>Beat: return combined JobsHealthCheckResult (as dict)
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Poem

🐰 I hop through schedules at quarter-hour chime,
I tally the stale and peek at each consumer in time,
If one snapshot trips, I keep hopping along,
Fixes counted, errors noted — a rabbit's small song,
Hooray for steady beats and logs that rhyme!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 40.63% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat(jobs): schedule periodic stale-job check' clearly and concisely describes the main objective of the PR: scheduling a periodic task to check for stale jobs.
Description check ✅ Passed PR description is comprehensive, well-structured, and follows the required template with all key sections completed.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/periodic-stale-job-check

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds two Celery Beat–scheduled monitoring tasks to prevent long-running ML jobs from silently hanging and to provide mid-flight observability into NATS consumer state for async_api jobs.

Changes:

  • Add check_stale_jobs_task (15 min) as a Beat entry point for the existing check_stale_jobs() reconciler.
  • Add log_running_async_job_stats (5 min) to write periodic NATS consumer stats snapshots into each running async job’s per-job UI log.
  • Register both periodic tasks via a django_celery_beat migration and add targeted tests for both the snapshot helper and the Beat tasks.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
ami/jobs/tasks.py Adds two new Celery tasks for stale-job reconciliation and periodic NATS consumer snapshot logging.
ami/jobs/migrations/0020_schedule_job_monitoring_beat_tasks.py Creates django_celery_beat.PeriodicTask entries for the two new monitoring tasks.
ami/jobs/tests/test_periodic_beat_tasks.py Adds unit tests validating task outputs and that snapshots run per running async job without cross-job blockage.
ami/ml/orchestration/nats_queue.py Adds a public log_consumer_stats_snapshot() method to log read-only mid-flight consumer stats.
ami/ml/orchestration/tests/test_nats_queue.py Adds tests for snapshot logging behavior and missing-consumer tolerance.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread ami/jobs/tasks.py Outdated
Comment thread ami/jobs/tasks.py Outdated
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
ami/jobs/migrations/0020_schedule_job_monitoring_beat_tasks.py (1)

14-37: Consider update_or_create so existing beat entries are corrected, not just preserved.

get_or_create won’t fix stale task/crontab values when a task name already exists. update_or_create keeps deployed scheduler config aligned with code.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/migrations/0020_schedule_job_monitoring_beat_tasks.py` around lines
14 - 37, Replace PeriodicTask.objects.get_or_create calls with update_or_create
so existing PeriodicTask rows get their task/crontab/description corrected: for
each task (e.g., "jobs.check_stale_jobs" and "jobs.log_running_async_job_stats")
call PeriodicTask.objects.update_or_create(name=<name>, defaults={ "task":
"<dotted_task>", "crontab": <schedule_obj>, "description": "<desc>" }) and
continue to use CrontabSchedule.objects.get_or_create for the CrontabSchedule
objects (assign the returned schedule to stale_schedule/stats_schedule and pass
that object into the PeriodicTask.defaults).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ami/jobs/migrations/0020_schedule_job_monitoring_beat_tasks.py`:
- Around line 4-6: The migration function create_periodic_tasks currently
imports CrontabSchedule and PeriodicTask directly from
django_celery_beat.models; update it to use the migration-safe historical model
loader via apps.get_model (e.g., apps.get_model('django_celery_beat',
'CrontabSchedule') and apps.get_model('django_celery_beat', 'PeriodicTask'))
wherever those models are referenced (including the later usages around the
block noted at lines ~40-42) so the migration uses the historical models from
the migration state rather than direct imports.

---

Nitpick comments:
In `@ami/jobs/migrations/0020_schedule_job_monitoring_beat_tasks.py`:
- Around line 14-37: Replace PeriodicTask.objects.get_or_create calls with
update_or_create so existing PeriodicTask rows get their
task/crontab/description corrected: for each task (e.g., "jobs.check_stale_jobs"
and "jobs.log_running_async_job_stats") call
PeriodicTask.objects.update_or_create(name=<name>, defaults={ "task":
"<dotted_task>", "crontab": <schedule_obj>, "description": "<desc>" }) and
continue to use CrontabSchedule.objects.get_or_create for the CrontabSchedule
objects (assign the returned schedule to stale_schedule/stats_schedule and pass
that object into the PeriodicTask.defaults).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 99886835-f5ce-46e0-9154-c8cfc756a402

📥 Commits

Reviewing files that changed from the base of the PR and between 1c6be7a and 14a449f.

📒 Files selected for processing (5)
  • ami/jobs/migrations/0020_schedule_job_monitoring_beat_tasks.py
  • ami/jobs/tasks.py
  • ami/jobs/tests/test_periodic_beat_tasks.py
  • ami/ml/orchestration/nats_queue.py
  • ami/ml/orchestration/tests/test_nats_queue.py

Comment thread ami/jobs/migrations/0020_schedule_job_monitoring_beat_tasks.py
@mihow mihow changed the title feat(jobs): schedule periodic stale-job reconcile + NATS consumer snapshots feat(jobs): schedule periodic stale-job check Apr 14, 2026
mihow and others added 3 commits April 14, 2026 08:14
- log_running_async_job_stats: reuse a single TaskQueueManager per tick
  instead of opening N NATS connections. Cost is now O(1) in the number
  of running async jobs rather than O(n). Guarded outer connection setup
  so a NATS outage drops the tick cleanly instead of crashing the task.
- check_stale_jobs_task: bump expires from 10 to 14 minutes so a delayed
  copy still runs within the 15-minute schedule instead of expiring before
  a worker picks it up under broker pressure.
- migration 0020: use apps.get_model for django_celery_beat models and
  declare an explicit migration dependency so the data migration uses
  historical model state.

Co-Authored-By: Claude <noreply@anthropic.com>
If opening the shared TaskQueueManager raises at setup (or teardown), try
each job with its own fresh manager before giving up. Defends against a
regression in the shared mutation pattern silently costing us snapshot
visibility for every running async job on every tick. If NATS itself is
down, the fallback loop will fail too and log once per job — same
end-state as before the refactor, just more noisy.

Co-Authored-By: Claude <noreply@anthropic.com>
The periodic task is renamed from `check_stale_jobs_task` to
`jobs_health_check` so new job-health checks can share the 15-minute
cadence and `expires` guarantees without a new beat entry. Current
body runs a single `_run_stale_jobs_check()`; future checks plug in
alongside it and return the same `{checked, fixed, unfixable}` shape.

Names chosen to parallel #1188's integrity-check pattern:
- Beat task = noun phrase (`jobs_health_check`) — reads well in flower
- (Future) management command = verb (`manage.py check_jobs`)

Migration 0020 updated in place since it hasn't shipped yet.

Co-Authored-By: Claude <noreply@anthropic.com>
mihow and others added 2 commits April 14, 2026 14:14
…kResult

Addresses the takeaway-review findings on PR #1227:

- Fix reverse migration: delete the row we create. The forward step
  registers `jobs.health_check` but the old `delete_periodic_tasks`
  still referenced the pre-rename `jobs.check_stale_jobs`, leaving a
  stranded row on rollback.
- Collapse `log_running_async_job_stats` into a sub-check of the
  umbrella. Drops the second `PeriodicTask` and the shared-connection
  fallback path — on a 15-minute cadence there's no reason to keep
  two beat tasks or defend against a per-manager bug, and a quietly
  hung job now gets a snapshot in the same tick the reconciler will
  decide whether to revoke it.
- Adopt `IntegrityCheckResult` as the shared sub-check shape, housed
  in a new `ami.main.checks.schemas` module so PR #1188 can re-target
  its import without a merge-order dance. Wrap the two sub-checks in
  a `JobsHealthCheckResult` parent dataclass; the umbrella returns
  `dataclasses.asdict(...)` for celery's JSON backend. Observation
  checks leave `fixed=0` and count per-item errors in `unfixable`.

Tests collapse to one `JobsHealthCheckTest` class covering both
sub-checks (reconcile + snapshot) and the edge cases that matter:
per-job snapshot failure, shared-connection setup failure, non-async
jobs skipped, idle deployment returns all zeros.

Co-Authored-By: Claude <noreply@anthropic.com>
Addresses review findings from the re-review on 6dc7e6e:

- The umbrella had no guard around sub-check calls, so a DB hiccup in
  ``_run_stale_jobs_check`` would kill the snapshot check and fail the
  whole task. Wrap each call in ``_safe_run_sub_check``, which catches,
  logs, and returns ``IntegrityCheckResult(unfixable=1)`` as a sentinel
  — operators watching the task result in Flower see the sub-check
  failed rather than reading all-zeros and assuming all-clear.
- ``Job.logger`` attaches a ``JobLogHandler`` on first access which
  touches the ORM; the file's own docstring says resolve outside the
  event loop, but two accesses were inside the coroutine. Pre-resolve
  into a list of ``(job, job_logger)`` tuples before entering
  ``async_to_sync``.
- Escalate the ``running_job_snapshots`` summary log to WARNING when
  ``errors > 0`` so persistent NATS unavailability is distinguishable
  from a quiet tick in aggregated logs.
- Document that the outer shared-connection except overwrites
  per-iteration error counts on the rare ``__aexit__`` teardown path.

New tests:
- ``test_sub_check_exception_does_not_block_the_other`` — patches the
  snapshot sub-check to raise; stale-jobs still reports correctly and
  snapshots come back as the ``unfixable=1`` sentinel.
- ``test_stale_jobs_fixed_counts_celery_updated_and_revoked_paths`` —
  one stale job with a terminal Celery ``task_id``, one without; both
  branches of ``fixed`` counted so a future refactor dropping one
  branch breaks the test.
- Explicit ``fixed == 0`` assertion in the snapshot test locks the
  observation-only contract.

Co-Authored-By: Claude <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
ami/jobs/migrations/0020_schedule_job_monitoring_beat_tasks.py (1)

15-25: Prefer update_or_create to prevent config drift if the task already exists.

get_or_create won’t correct an existing jobs.health_check row with outdated task/crontab/description. update_or_create keeps this migration idempotent and convergent.

🔧 Suggested change
-    PeriodicTask.objects.get_or_create(
-        name="jobs.health_check",
-        defaults={
-            "task": "ami.jobs.tasks.jobs_health_check",
-            "crontab": schedule,
-            "description": (
-                "Umbrella job-health checks: stale-job reconciler plus a NATS "
-                "consumer snapshot for each running async_api job."
-            ),
-        },
-    )
+    PeriodicTask.objects.update_or_create(
+        name="jobs.health_check",
+        defaults={
+            "task": "ami.jobs.tasks.jobs_health_check",
+            "crontab": schedule,
+            "description": (
+                "Umbrella job-health checks: stale-job reconciler plus a NATS "
+                "consumer snapshot for each running async_api job."
+            ),
+        },
+    )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/migrations/0020_schedule_job_monitoring_beat_tasks.py` around lines
15 - 25, The migration uses PeriodicTask.objects.get_or_create for the
"jobs.health_check" entry which won't correct existing rows with stale
task/crontab/description values; replace the call with
PeriodicTask.objects.update_or_create keyed on name="jobs.health_check" and
supply the desired values for task, crontab, and description in the
defaults/kwargs so the existing row is updated to match the migration (refer to
PeriodicTask.objects.get_or_create and the "jobs.health_check" name and ensure
task "ami.jobs.tasks.jobs_health_check", crontab variable, and the description
string are applied via update_or_create).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ami/jobs/tasks.py`:
- Around line 449-507: The check _run_running_job_snapshot_check undercounts
failures because TaskQueueManager.log_consumer_stats_snapshot swallows errors
and doesn't signal failure; update that API to return a boolean (or similar
success flag) and have _run_running_job_snapshot_check call and inspect the
return value instead of relying only on exceptions: change
TaskQueueManager.log_consumer_stats_snapshot (and any internal helpers it uses)
to return True on success / False on failure, then in
_run_running_job_snapshot_check's loop (where
manager.log_consumer_stats_snapshot(job.pk) is awaited) check the result and
increment errors when it returns False (preserving the existing exception
handling for shared-connection setup/teardown via the outer try/except and
logger.exception).

---

Nitpick comments:
In `@ami/jobs/migrations/0020_schedule_job_monitoring_beat_tasks.py`:
- Around line 15-25: The migration uses PeriodicTask.objects.get_or_create for
the "jobs.health_check" entry which won't correct existing rows with stale
task/crontab/description values; replace the call with
PeriodicTask.objects.update_or_create keyed on name="jobs.health_check" and
supply the desired values for task, crontab, and description in the
defaults/kwargs so the existing row is updated to match the migration (refer to
PeriodicTask.objects.get_or_create and the "jobs.health_check" name and ensure
task "ami.jobs.tasks.jobs_health_check", crontab variable, and the description
string are applied via update_or_create).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 07c1e35f-8fbe-4d9f-a0d8-79abfacb568a

📥 Commits

Reviewing files that changed from the base of the PR and between 14a449f and 6726eee.

📒 Files selected for processing (6)
  • ami/jobs/migrations/0020_schedule_job_monitoring_beat_tasks.py
  • ami/jobs/tasks.py
  • ami/jobs/tests/test_periodic_beat_tasks.py
  • ami/main/checks/__init__.py
  • ami/main/checks/schemas.py
  • ami/ml/orchestration/nats_queue.py
✅ Files skipped from review due to trivial changes (1)
  • ami/main/checks/init.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • ami/ml/orchestration/nats_queue.py

Comment thread ami/jobs/tasks.py
Comment on lines +449 to +507
def _run_running_job_snapshot_check() -> IntegrityCheckResult:
"""Log a NATS consumer snapshot for each running async_api job.

Observation-only: ``fixed`` stays 0 because no state is altered. Jobs
that error during snapshot are counted in ``unfixable`` — a persistently
stuck job will be picked up on the next tick by ``_run_stale_jobs_check``.
"""
from ami.jobs.models import Job, JobDispatchMode, JobState

running_jobs = list(
Job.objects.filter(
status__in=JobState.running_states(),
dispatch_mode=JobDispatchMode.ASYNC_API,
)
)
if not running_jobs:
return IntegrityCheckResult()

# Resolve each job's per-job logger synchronously before entering the
# event loop — ``Job.logger`` attaches a ``JobLogHandler`` on first access
# which touches the Django ORM, so it is only safe to call from a sync
# context.
job_loggers = [(job, job.logger) for job in running_jobs]
errors = 0

async def _snapshot_all() -> None:
nonlocal errors
# One NATS connection per tick — on a 15-min cadence a per-job fallback
# is not worth the code. If the shared connection fails to set up, we
# skip this tick's snapshots and try fresh on the next one.
async with TaskQueueManager(job_logger=job_loggers[0][1]) as manager:
for job, job_logger in job_loggers:
try:
# ``log_async`` reads ``job_logger`` fresh each call, so
# swapping per iteration routes lifecycle lines to the
# right job's UI log.
manager.job_logger = job_logger
await manager.log_consumer_stats_snapshot(job.pk)
except Exception:
errors += 1
logger.exception("Failed to snapshot NATS consumer stats for job %s", job.pk)

try:
async_to_sync(_snapshot_all)()
except Exception:
# Covers both ``__aenter__`` setup failures (no iteration ran) and the
# rare ``__aexit__`` teardown failure after a clean loop. In the
# teardown case this overwrites the per-iteration count with the total
# — accepted: a persistent failure will show up again next tick.
logger.exception("Shared-connection snapshot failed; marking tick unfixable")
errors = len(running_jobs)

log_fn = logger.warning if errors else logger.info
log_fn(
"running_job_snapshots check: %d running async job(s), %d error(s)",
len(running_jobs),
errors,
)
return IntegrityCheckResult(checked=len(running_jobs), fixed=0, unfixable=errors)
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

running_job_snapshots.unfixable can undercount real per-job failures.

_run_running_job_snapshot_check increments errors only when log_consumer_stats_snapshot raises, but that method is implemented to swallow common snapshot-fetch failures internally and return early. This can report unfixable=0 even when snapshots failed for individual jobs. Consider returning a success/failure signal from log_consumer_stats_snapshot (or a companion API) and count from that result instead of exception flow.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/tasks.py` around lines 449 - 507, The check
_run_running_job_snapshot_check undercounts failures because
TaskQueueManager.log_consumer_stats_snapshot swallows errors and doesn't signal
failure; update that API to return a boolean (or similar success flag) and have
_run_running_job_snapshot_check call and inspect the return value instead of
relying only on exceptions: change TaskQueueManager.log_consumer_stats_snapshot
(and any internal helpers it uses) to return True on success / False on failure,
then in _run_running_job_snapshot_check's loop (where
manager.log_consumer_stats_snapshot(job.pk) is awaited) check the result and
increment errors when it returns False (preserving the existing exception
handling for shared-connection setup/teardown via the outer try/except and
logger.exception).

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If snapshots failed, we won't see logs. But I do like this idea. Perhaps unfixable could actually increment when jobs have alarming stats.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
rg -n "_log_consumer_stats|log_consumer_stats_snapshot|ConsumerInfo|num_redelivered|num_pending|num_ack_pending|ack_floor|formatted_stats" ami/ml/orchestration/nats_queue.py -A 3 -B 1

Length of output: 2063


🏁 Script executed:

#!/bin/bash
rg -n "_log_consumer_stats" ami/ml/orchestration/nats_queue.py -A 30 | grep -A 30 "async def _log_consumer_stats"

Length of output: 1346


@mihow — both observations are valid, and they compound each other:

  1. Silent on failure: _log_consumer_stats currently catches all exceptions and does only logger.debug(...), so a fetch failure produces no INFO/WARNING log and no exception to the _run_running_job_snapshot_check loop. An operator watching job logs sees nothing, which looks identical to a healthy quiet job.

  2. Alarming-stats idea: This would make unfixable semantically richer and more actionable. Instead of log_consumer_stats_snapshot → None, it could return ConsumerInfo | None, and _run_running_job_snapshot_check could then inspect the result across two axes:

info = await manager.log_consumer_stats_snapshot(job.pk)
if info is None:
    errors += 1  # fetch failed → can't observe
elif _is_alarming(info):
    errors += 1  # observed, but stats are concerning

For _is_alarming, the most actionable thresholds from the available ConsumerInfo fields would be:

  • num_redelivered above some threshold — repeated redelivery usually signals a processing loop or a stuck acker
  • num_ack_pending significantly exceeding num_pending + 1 — messages being consumed but not acknowledged

A simple start could be a single configurable threshold on num_redelivered (e.g., > 10) since that's the clearest "something is wrong" signal without being noisy for normal in-flight jobs.

Would you like me to draft the changes to log_consumer_stats_snapshot / _log_consumer_stats (return ConsumerInfo | None) and the updated _run_running_job_snapshot_check loop (with the _is_alarming helper)? Happy to also add a WARNING-level log in _log_consumer_stats's except branch so fetch failures are visible at the INFO tier too.


🧠 Learnings used
Learnt from: mihow
Repo: RolnickLab/antenna PR: 1130
File: ami/ml/orchestration/nats_queue.py:217-217
Timestamp: 2026-02-13T23:11:30.276Z
Learning: In the RolnickLab/antenna project (Pydantic v1), use .dict() for serialization of Pydantic models instead of .model_dump(). Update code paths in Python files where model_dump() is used to ensure compatibility with Pydantic v1 behavior. If any files are later migrated to Pydantic v2, consider switching to .model_dump() or using model_dump(..., mode='json') as appropriate.

Learnt from: mihow
Repo: RolnickLab/antenna PR: 1222
File: ami/ml/orchestration/nats_queue.py:126-132
Timestamp: 2026-04-13T22:08:32.779Z
Learning: In code that uses the `nats-py` JetStream client, do not divide `ConsumerInfo.config.ack_wait` (or the `ack_wait` returned by `consumer_info()` / `add_consumer()`) by `1e9`. `nats-py` already deserializes the server’s raw nanosecond value by dividing by `_NANOSECOND` (1e9), so application-level code should treat `ack_wait` as already human-readable seconds (e.g., `30.0` means 30 seconds). Only apply nanosecond-to-second conversion if you are working with the raw server nanosecond field directly (not the `ack_wait` property exposed by `ConsumerInfo`/`ConsumerConfig`).

@mihow mihow merged commit 6928a9f into main Apr 15, 2026
7 checks passed
@mihow mihow deleted the feat/periodic-stale-job-check branch April 15, 2026 00:14
mihow added a commit that referenced this pull request Apr 15, 2026
… stalled-job reaper

Two related fixes for the premature ``cleanup_async_job_resources`` path
that flips async_api jobs to FAILURE within ~30-55s of dispatch while
NATS is still delivering results:

1. ``Job.update_progress`` no longer silently bumps ``stage.progress = 1``
   when ``stage.status`` is in a final state. The bumped value made
   ``is_complete()`` return True and triggered cleanup mid-flight. The
   trigger was ``_update_job_progress`` writing ``status=FAILURE`` at
   partial progress as soon as ``failed/total`` crossed
   FAILURE_THRESHOLD (very easy early in a job — 1-2 errors out of the
   first few results). Progress is a measurement; leave it alone. The
   honest FAILURE trip happens when the stage actually reaches 100%.

2. Split ``Job.FAILED_CUTOFF_HOURS`` (originally added in PR #368 to
   hide failed jobs from UI listings after 3 days) into:
   - ``FAILED_JOBS_DISPLAY_MAX_HOURS = 24 * 3`` — original UI/API filter
   - ``STALLED_JOBS_MAX_MINUTES = 10`` — reaper threshold
   The 72h value was reused by PR #1227's stale-job reaper for an
   entirely different purpose, leaving stuck jobs invisible for 3 days.
   ``check_stale_jobs`` now defaults to 10 minutes against ``updated_at``
   (already in place from #1227), so a job whose worker pool stops
   pulling messages gets reaped within one Beat tick (~10-25 min).
   ``--hours`` arg on the management command becomes ``--minutes`` to
   match.

Together: fix #1 stops the false-positive FAILUREs; fix #2 ensures true
positives are caught quickly without depending on the misfiring coercion.

Co-Authored-By: Claude <noreply@anthropic.com>
mihow added a commit that referenced this pull request Apr 15, 2026
When the reaper revokes a job, log a single WARN line capturing the
state needed to triage "why was this stalled?" without grepping back
through prior tick logs:

  - minutes since last update vs threshold
  - previous_status, dispatch_mode, celery_state
  - per-stage progress + status summary
  - pointer to running_job_snapshots for prior NATS consumer state

Pairs with PR #1227's per-tick consumer snapshots: every 15 min, each
running async_api job gets a NATS state snapshot; this new line tells
operators which of those snapshots is the last one before revocation.

Co-Authored-By: Claude <noreply@anthropic.com>
mihow added a commit that referenced this pull request Apr 15, 2026
…(3 days -> 10 min) (#1235)

* fix(jobs): stop coercing partial-progress to 100% on FAILURE, tighten stalled-job reaper

Two related fixes for the premature ``cleanup_async_job_resources`` path
that flips async_api jobs to FAILURE within ~30-55s of dispatch while
NATS is still delivering results:

1. ``Job.update_progress`` no longer silently bumps ``stage.progress = 1``
   when ``stage.status`` is in a final state. The bumped value made
   ``is_complete()`` return True and triggered cleanup mid-flight. The
   trigger was ``_update_job_progress`` writing ``status=FAILURE`` at
   partial progress as soon as ``failed/total`` crossed
   FAILURE_THRESHOLD (very easy early in a job — 1-2 errors out of the
   first few results). Progress is a measurement; leave it alone. The
   honest FAILURE trip happens when the stage actually reaches 100%.

2. Split ``Job.FAILED_CUTOFF_HOURS`` (originally added in PR #368 to
   hide failed jobs from UI listings after 3 days) into:
   - ``FAILED_JOBS_DISPLAY_MAX_HOURS = 24 * 3`` — original UI/API filter
   - ``STALLED_JOBS_MAX_MINUTES = 10`` — reaper threshold
   The 72h value was reused by PR #1227's stale-job reaper for an
   entirely different purpose, leaving stuck jobs invisible for 3 days.
   ``check_stale_jobs`` now defaults to 10 minutes against ``updated_at``
   (already in place from #1227), so a job whose worker pool stops
   pulling messages gets reaped within one Beat tick (~10-25 min).
   ``--hours`` arg on the management command becomes ``--minutes`` to
   match.

Together: fix #1 stops the false-positive FAILUREs; fix #2 ensures true
positives are caught quickly without depending on the misfiring coercion.

Co-Authored-By: Claude <noreply@anthropic.com>

* feat(jobs): log per-job diagnostic on stalled-job revoke

When the reaper revokes a job, log a single WARN line capturing the
state needed to triage "why was this stalled?" without grepping back
through prior tick logs:

  - minutes since last update vs threshold
  - previous_status, dispatch_mode, celery_state
  - per-stage progress + status summary
  - pointer to running_job_snapshots for prior NATS consumer state

Pairs with PR #1227's per-tick consumer snapshots: every 15 min, each
running async_api job gets a NATS state snapshot; this new line tells
operators which of those snapshots is the last one before revocation.

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants