fix(jobs): fix dangling jobs from going to revoked#1276
Conversation
Background: production job 2521 ended REVOKED despite all 4510 NATS messages acked and Redis pending sets empty. Root cause is a race in _update_job_progress: it writes the entire Job.progress JSONB blob without a row lock (PR #1261 dropped select_for_update to break a contention bottleneck), and two concurrent workers can each read the same pre-state then commit last-writer-wins. The slower committer with processed=4509 clobbered the faster committer's processed=4510. Job.progress cannot be the source of truth for state-machine decisions. This plan introduces: - AsyncJobStateManager.summarize() — single Redis pipeline call returning a JobCompletionSnapshot with state_present, total, pending_per_stage, failed, is_complete. - Job.is_actually_complete() — Tier 1 Redis → Tier 2 DB detection-count fallback (logs WARNING) → Tier 3 progress.is_complete() for sync_api jobs. - Replaces progress.is_complete() reads at the four state-machine call sites (reaper async_api guard, _update_job_progress cleanup gate, task_postrun, task_failure). Job.progress stays as the cosmetic mirror for the UI. Out of scope (filed as follow-ups in the doc): the underlying race in _update_job_progress, persisting queued_image_ids on Job, fixing the zombie_streams 'CANCELLED' enum crash surfaced during this investigation. Co-Authored-By: Claude <noreply@anthropic.com>
…essed Revise per review: the only user-visible site bitten by the _update_job_progress clobber race is the reaper async_api guard at tasks.py:968. Other sites that read progress.is_complete() either run at single-writer moments (task_postrun, task_failure) or self-correct later (cleanup gate, log gating). Narrow the fix accordingly. Also drops the proposed Job.is_actually_complete() wrapper in favor of inlining the Redis check at the one call site that needs it. Pydantic JobProgress.is_complete() stays pure data; AsyncJobStateManager grows all_tasks_processed() with tri-state bool|None semantics. Naming makes the scope explicit: Redis tracks NATS-task SREM completeness for the process and results stages, not job-wide multi-stage completion. DB-count fallback (Tier 2) deferred until monitoring shows the Redis-gone WARNING fires often enough to matter. Sync_api jobs are unaffected — dispatch_mode gate keeps them on the existing trust-Celery path. Co-Authored-By: Claude <noreply@anthropic.com>
Resume instructions for the next session — implementation steps for all_tasks_processed() + reaper guard swap, test cases, e2e chaos validation, follow-up tickets, and the saved diagnostic log paths on the ami-devops host. References the plan doc at docs/claude/planning/job-complete-check.md (commit 10cdac2). Co-Authored-By: Claude <noreply@anthropic.com>
Job 2521 ended REVOKED despite NATS+Redis both confirming all 4510 tasks processed. Root cause: `_update_job_progress` writes the entire `Job.progress` JSONB blob without a row lock since #1261 dropped `select_for_update`. Concurrent workers raced; a slower committer clobbered the faster committer's SUCCESS write, leaving progress at `processed=4509 remaining=1 STARTED`. The reaper read the clobbered `progress.is_complete()` -> False -> REVOKE. Add `AsyncJobStateManager.all_tasks_processed() -> bool | None` (tri-state) backed directly by Redis SCARD across both pending sets, and inline it at the reaper guard in `check_stale_jobs`. Sync_api jobs unchanged (dispatch_mode gate). When Redis state is absent (cleanup, expiry, never initialized, or transient RedisError), fall back to `progress.is_complete()` and emit a WARNING so monitoring can flag the fallback path. Other 5 `progress.is_complete()` callers stay as-is — only the reaper guard is bitten by the clobber. Atomic-write follow-up tracked separately. Co-Authored-By: Claude <noreply@anthropic.com>
✅ Deploy Preview for antenna-ssec canceled.
|
✅ Deploy Preview for antenna-preview canceled.
|
|
Warning Rate limit exceeded
To keep reviews running without waiting, you can enable usage-based add-on for your organization. This allows additional reviews beyond the hourly cap. Account admins can enable it under billing. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughThis PR enhances the job reaper's terminal-state decision logic for async_api jobs by introducing a Redis-backed completion check. The Changes
Sequence DiagramsequenceDiagram
participant Reaper as check_stale_jobs<br/>(Reaper)
participant Celery as Celery<br/>AsyncResult
participant AsyncMgr as AsyncJobStateManager
participant Redis as Redis
participant JobProgress as job.progress
Reaper->>Celery: Check terminal state<br/>(SUCCESS/FAILURE)
alt Async API Job with Terminal State
Reaper->>AsyncMgr: all_tasks_processed()?
AsyncMgr->>Redis: Pipeline SCARD on pending sets<br/>+ GET total
alt Redis State Available
Redis-->>AsyncMgr: pending counts & total
alt All Pending Drained
AsyncMgr-->>Reaper: True
Reaper->>Reaper: Keep job terminal<br/>(SUCCESS)
else Pending Remain
AsyncMgr-->>Reaper: False
Reaper->>Reaper: Revoke job<br/>defer completion check
end
else Redis State Absent/Error
Redis-->>AsyncMgr: Error or missing total
AsyncMgr-->>Reaper: None
Reaper->>JobProgress: Fallback to<br/>is_complete()?
alt Complete
JobProgress-->>Reaper: True
Reaper->>Reaper: Log WARNING,<br/>set SUCCESS
else Incomplete
JobProgress-->>Reaper: False
Reaper->>Reaper: Log WARNING,<br/>Revoke
end
end
else Sync API Job or<br/>Non-Terminal State
Reaper->>Reaper: Use standard logic<br/>(skip Redis check)
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes Possibly related issues
Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 0/1 reviews remaining, refill in 50 minutes and 37 seconds.Comment |
There was a problem hiding this comment.
Pull request overview
Updates the async job reaper completion guard to consult Redis (pending-set drain) directly to avoid incorrect REVOKED outcomes caused by concurrent JSONB Job.progress clobbering.
Changes:
- Add
AsyncJobStateManager.all_tasks_processed() -> bool | Nonebacked by RedisSCARD/GETto determine whether async NATS-tracked tasks are fully drained. - Update
check_stale_jobsto use the Redis-backed completion check forasync_apijobs, with a warning +progress.is_complete()fallback when Redis state is unavailable. - Add test coverage for the new tri-state Redis oracle and reaper guard behavior (including the “clobbered progress but drained Redis” case).
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| docs/claude/prompts/NEXT_SESSION_PROMPT.md | Adds session/runbook notes for implementing and validating the Redis-backed reaper guard. |
| docs/claude/planning/job-complete-check.md | Planning doc describing rationale, scope, risks, and acceptance criteria for the Redis-backed completion check. |
| ami/ml/orchestration/async_job_state.py | Implements all_tasks_processed() using Redis to determine pending work across stages. |
| ami/ml/tests.py | Adds unit tests covering all_tasks_processed() True/False/None behaviors, including RedisError. |
| ami/jobs/tasks.py | Replaces async reaper guard from progress.is_complete() to Redis-backed all_tasks_processed() with fallback. |
| ami/jobs/tests/test_tasks.py | Adds integration-style tests for the reaper guard outcomes across Redis drained/pending/absent and sync_api unaffected. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def all_tasks_processed(self) -> bool | None: | ||
| """Tri-state truth signal for NATS-task SREM completeness across both | ||
| process and results pending sets. | ||
|
|
||
| True — both pending sets empty AND total > 0 (or total == 0) | ||
| False — at least one pending set has members | ||
| None — Redis state absent (cleaned up, expired, never initialized, | ||
| or transient RedisError) | ||
|
|
||
| Scope: tracks NATS task lifecycle only; does not know about `collect` | ||
| or any future post-results stages. | ||
| """ | ||
| try: | ||
| redis = self._get_redis() | ||
| with redis.pipeline() as pipe: | ||
| for stage in self.STAGES: | ||
| pipe.scard(self._get_pending_key(stage)) | ||
| pipe.get(self._total_key) | ||
| results = pipe.execute() | ||
| except RedisError as e: | ||
| logger.warning(f"Redis error reading all_tasks_processed for job {self.job_id}: {e}") | ||
| return None |
| is_terminal = celery_state in states.READY_STATES | ||
| is_async_api = job.dispatch_mode == JobDispatchMode.ASYNC_API | ||
| if is_async_api and celery_state in {states.SUCCESS, states.FAILURE} and not job.progress.is_complete(): | ||
| is_terminal = False | ||
| if is_async_api and celery_state in {states.SUCCESS, states.FAILURE}: | ||
| processed = AsyncJobStateManager(job.pk).all_tasks_processed() | ||
| if processed is False: |
| is_terminal = False | ||
| elif processed is None: | ||
| logger.warning( | ||
| "Reaper for job %s: Redis state absent, falling back to " "progress.is_complete()", |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
docs/claude/prompts/NEXT_SESSION_PROMPT.md (1)
211-215: Add a language to this fenced block.markdownlint MD040 will keep flagging this unlabeled fence.
textwould work fine here since this is captured command output, not executable code.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/claude/prompts/NEXT_SESSION_PROMPT.md` around lines 211 - 215, The fenced block containing the git output (the lines starting with "$ git log --oneline fix/job-complete-check ^main" and the two commit lines) is missing a language tag and triggers markdownlint MD040; update the opening fence from ``` to ```text so the block is labeled as plain text. Ensure only the opening fence is changed (leave the content and closing ``` intact) so the block is recognized as non-executable text.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ami/jobs/tasks.py`:
- Around line 972-983: The current fast-path treats Celery states.SUCCESS and
states.FAILURE as terminal for is_async_api jobs by querying
AsyncJobStateManager(job.pk).all_tasks_processed(), which wrongly forces FAILURE
before the async result path can apply failure-threshold logic (see
update_job_failure()). Change the condition so only states.SUCCESS is considered
a terminal fast-path (i.e., `if is_async_api and celery_state is
states.SUCCESS:`), keep the same handling for processed==False/None and
job.progress.is_complete(), and do not treat states.FAILURE as authoritative
here; add a regression test that simulates celery_state=FAILURE with
Redis-drained (AsyncJobStateManager processed True) and verifies final status is
decided by the async result/failure-threshold logic instead of being forced to
FAILURE.
---
Nitpick comments:
In `@docs/claude/prompts/NEXT_SESSION_PROMPT.md`:
- Around line 211-215: The fenced block containing the git output (the lines
starting with "$ git log --oneline fix/job-complete-check ^main" and the two
commit lines) is missing a language tag and triggers markdownlint MD040; update
the opening fence from ``` to ```text so the block is labeled as plain text.
Ensure only the opening fence is changed (leave the content and closing ```
intact) so the block is recognized as non-executable text.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: bd872a1d-acf2-4a53-9a3e-a5b5e6cefa12
📒 Files selected for processing (6)
ami/jobs/tasks.pyami/jobs/tests/test_tasks.pyami/ml/orchestration/async_job_state.pyami/ml/tests.pydocs/claude/planning/job-complete-check.mddocs/claude/prompts/NEXT_SESSION_PROMPT.md
| if is_async_api and celery_state in {states.SUCCESS, states.FAILURE}: | ||
| processed = AsyncJobStateManager(job.pk).all_tasks_processed() | ||
| if processed is False: | ||
| is_terminal = False | ||
| elif processed is None: | ||
| logger.warning( | ||
| "Reaper for job %s: Redis state absent, falling back to " "progress.is_complete()", | ||
| job.pk, | ||
| ) | ||
| if not job.progress.is_complete(): | ||
| is_terminal = False | ||
| # processed is True -> trust Celery's terminal state |
There was a problem hiding this comment.
Don't treat Celery FAILURE as authoritative for drained async_api jobs.
update_job_failure() deliberately defers post-queue run_job failures for ASYNC_API jobs until the async result path decides the terminal outcome. With this new Redis gate, a job whose NATS work drained but whose run_job task ended in FAILURE will now be forced to FAILURE here, even if the final processed/failed counts should land below FAILURE_THRESHOLD and resolve to SUCCESS.
At minimum, I’d scope this fast-path to states.SUCCESS until there is a non-Celery oracle for the failure-threshold outcome, and add a regression test for celery_state=FAILURE + drained Redis.
Possible narrow fix
- if is_async_api and celery_state in {states.SUCCESS, states.FAILURE}:
+ if is_async_api and celery_state == states.SUCCESS:
processed = AsyncJobStateManager(job.pk).all_tasks_processed()
if processed is False:
is_terminal = False
elif processed is None:
logger.warning(
"Reaper for job %s: Redis state absent, falling back to " "progress.is_complete()",
job.pk,
)
if not job.progress.is_complete():
is_terminal = False
- # processed is True -> trust Celery's terminal state
+ # processed is True -> trust Celery SUCCESSBased on learnings, jobs using AsyncJobStateManager should only be set to SUCCESS or FAILURE status when complete (progress >= 1.0), and the final status is determined when the job completes based on the failure threshold.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if is_async_api and celery_state in {states.SUCCESS, states.FAILURE}: | |
| processed = AsyncJobStateManager(job.pk).all_tasks_processed() | |
| if processed is False: | |
| is_terminal = False | |
| elif processed is None: | |
| logger.warning( | |
| "Reaper for job %s: Redis state absent, falling back to " "progress.is_complete()", | |
| job.pk, | |
| ) | |
| if not job.progress.is_complete(): | |
| is_terminal = False | |
| # processed is True -> trust Celery's terminal state | |
| if is_async_api and celery_state == states.SUCCESS: | |
| processed = AsyncJobStateManager(job.pk).all_tasks_processed() | |
| if processed is False: | |
| is_terminal = False | |
| elif processed is None: | |
| logger.warning( | |
| "Reaper for job %s: Redis state absent, falling back to " "progress.is_complete()", | |
| job.pk, | |
| ) | |
| if not job.progress.is_complete(): | |
| is_terminal = False | |
| # processed is True -> trust Celery SUCCESS |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ami/jobs/tasks.py` around lines 972 - 983, The current fast-path treats
Celery states.SUCCESS and states.FAILURE as terminal for is_async_api jobs by
querying AsyncJobStateManager(job.pk).all_tasks_processed(), which wrongly
forces FAILURE before the async result path can apply failure-threshold logic
(see update_job_failure()). Change the condition so only states.SUCCESS is
considered a terminal fast-path (i.e., `if is_async_api and celery_state is
states.SUCCESS:`), keep the same handling for processed==False/None and
job.progress.is_complete(), and do not treat states.FAILURE as authoritative
here; add a regression test that simulates celery_state=FAILURE with
Redis-drained (AsyncJobStateManager processed True) and verifies final status is
decided by the async result/failure-threshold logic instead of being forced to
FAILURE.
Low-priority follow-up to #1276. Three sources of truth for async_api job lifecycle (Job model, Redis sets, NATS consumer state) lie differently; unifying them into a diagnostic view catches bug classes Redis alone can't see (#1247-style stranding, result-handler swallow, hung in-flight workers). Doc proposes snapshot-on-terminal, diagnostics endpoint, JobReconciler.diagnose(), and an admin UI panel. Tracked in #1285. Co-Authored-By: Claude <noreply@anthropic.com>
Summary
Fixes a class of REVOKE-instead-of-SUCCESS reaping where async_api jobs land terminal-Celery-SUCCESS while NATS and Redis both confirm all tasks processed, but
Job.progress.is_complete()reads False because of a JSONB clobber race in_update_job_progress.Since #1261 dropped
select_for_updatefrom the progress writer, two concurrent workers can both read the pre-state and commit last-writer-wins. The slower committer's stale snapshot (e.g.processed=N-1 remaining=1 STARTED) clobbers the faster committer'sprocessed=N SUCCESSblob. Reaper atcheck_stale_jobsthen consultsprogress.is_complete()→ False → REVOKE — even though every task has been ACKed and SREMed.Fix
AsyncJobStateManager.all_tasks_processed() -> bool | None: tri-state truth signal backed directly by Redis SCARD across both pending sets (process + results) plus a GET on the total key. True/False on observed state, None when Redis state is genuinely absent (cleanup, TTL expiry, never initialized, or transientRedisError).check_stale_jobs. Sync_api jobs unchanged (dispatch_mode gate). When Redis returns None, fall back toprogress.is_complete()and emit a WARNING so monitoring can surface the fallback path.Behavior change
progress.is_complete()(racy)progress.is_complete()Other 5
progress.is_complete()call sites are untouched — only the reaper guard is bitten by the clobber. Atomic-write follow-up for_update_job_progressis a separate ticket.Test plan
ami.ml.tests.TestTaskStateManager— 7 new cases forall_tasks_processed()(fresh init, drained, partial-stage, total=0, never-init, post-cleanup, transient RedisError)ami.jobs.tests.test_tasks.TestCheckStaleJobsReaperGuard— clobber → SUCCESS, pending → REVOKED, Redis-absent + complete → SUCCESS+WARNING, Redis-absent + incomplete → REVOKED+WARNING, sync_api unaffectedami.jobs+ami.mlregression: 191 tests passingJob.progressto mimic the failure shape, triggerjobs_health_check, verify SUCCESS not REVOKED🤖 Generated with Claude Code
Summary by CodeRabbit
Bug Fixes
Tests