Skip to content

fix(jobs): fix dangling jobs from going to revoked#1276

Open
mihow wants to merge 5 commits intomainfrom
fix/job-complete-check
Open

fix(jobs): fix dangling jobs from going to revoked#1276
mihow wants to merge 5 commits intomainfrom
fix/job-complete-check

Conversation

@mihow
Copy link
Copy Markdown
Collaborator

@mihow mihow commented Apr 29, 2026

Summary

Fixes a class of REVOKE-instead-of-SUCCESS reaping where async_api jobs land terminal-Celery-SUCCESS while NATS and Redis both confirm all tasks processed, but Job.progress.is_complete() reads False because of a JSONB clobber race in _update_job_progress.

Since #1261 dropped select_for_update from the progress writer, two concurrent workers can both read the pre-state and commit last-writer-wins. The slower committer's stale snapshot (e.g. processed=N-1 remaining=1 STARTED) clobbers the faster committer's processed=N SUCCESS blob. Reaper at check_stale_jobs then consults progress.is_complete() → False → REVOKE — even though every task has been ACKed and SREMed.

Fix

  • AsyncJobStateManager.all_tasks_processed() -> bool | None: tri-state truth signal backed directly by Redis SCARD across both pending sets (process + results) plus a GET on the total key. True/False on observed state, None when Redis state is genuinely absent (cleanup, TTL expiry, never initialized, or transient RedisError).
  • Inline at the reaper guard in check_stale_jobs. Sync_api jobs unchanged (dispatch_mode gate). When Redis returns None, fall back to progress.is_complete() and emit a WARNING so monitoring can surface the fallback path.

Behavior change

dispatch_mode celery state Redis Old reaper New reaper
async_api SUCCESS drained depended on progress.is_complete() (racy) trusts Redis → SUCCESS
async_api SUCCESS pending > 0 REVOKED REVOKED
async_api SUCCESS absent depended on progress.is_complete() falls back + WARNING
async_api FAILURE (any) same as SUCCESS rules same as SUCCESS rules
sync_api SUCCESS n/a trusts Celery trusts Celery (unchanged)

Other 5 progress.is_complete() call sites are untouched — only the reaper guard is bitten by the clobber. Atomic-write follow-up for _update_job_progress is a separate ticket.

Test plan

  • ami.ml.tests.TestTaskStateManager — 7 new cases for all_tasks_processed() (fresh init, drained, partial-stage, total=0, never-init, post-cleanup, transient RedisError)
  • ami.jobs.tests.test_tasks.TestCheckStaleJobsReaperGuard — clobber → SUCCESS, pending → REVOKED, Redis-absent + complete → SUCCESS+WARNING, Redis-absent + incomplete → REVOKED+WARNING, sync_api unaffected
  • Full ami.jobs + ami.ml regression: 191 tests passing
  • E2E chaos: dispatch real async_api job, hand-clobber Job.progress to mimic the failure shape, trigger jobs_health_check, verify SUCCESS not REVOKED

🤖 Generated with Claude Code

Summary by CodeRabbit

  • Bug Fixes

    • Improved accuracy of job completion detection for async jobs, preventing jobs from being incorrectly revoked when they have successfully completed processing.
  • Tests

    • Added comprehensive test coverage for job state management across multiple task completion scenarios and system conditions.

mihow and others added 4 commits April 29, 2026 14:00
Background: production job 2521 ended REVOKED despite all 4510 NATS
messages acked and Redis pending sets empty. Root cause is a race in
_update_job_progress: it writes the entire Job.progress JSONB blob
without a row lock (PR #1261 dropped select_for_update to break a
contention bottleneck), and two concurrent workers can each read the
same pre-state then commit last-writer-wins. The slower committer with
processed=4509 clobbered the faster committer's processed=4510.

Job.progress cannot be the source of truth for state-machine decisions.
This plan introduces:

- AsyncJobStateManager.summarize() — single Redis pipeline call
  returning a JobCompletionSnapshot with state_present, total,
  pending_per_stage, failed, is_complete.
- Job.is_actually_complete() — Tier 1 Redis → Tier 2 DB
  detection-count fallback (logs WARNING) → Tier 3 progress.is_complete()
  for sync_api jobs.
- Replaces progress.is_complete() reads at the four state-machine call
  sites (reaper async_api guard, _update_job_progress cleanup gate,
  task_postrun, task_failure). Job.progress stays as the cosmetic
  mirror for the UI.

Out of scope (filed as follow-ups in the doc): the underlying race in
_update_job_progress, persisting queued_image_ids on Job, fixing the
zombie_streams 'CANCELLED' enum crash surfaced during this
investigation.

Co-Authored-By: Claude <noreply@anthropic.com>
…essed

Revise per review: the only user-visible site bitten by the
_update_job_progress clobber race is the reaper async_api guard at
tasks.py:968. Other sites that read progress.is_complete() either run
at single-writer moments (task_postrun, task_failure) or self-correct
later (cleanup gate, log gating). Narrow the fix accordingly.

Also drops the proposed Job.is_actually_complete() wrapper in favor of
inlining the Redis check at the one call site that needs it. Pydantic
JobProgress.is_complete() stays pure data; AsyncJobStateManager grows
all_tasks_processed() with tri-state bool|None semantics. Naming makes
the scope explicit: Redis tracks NATS-task SREM completeness for the
process and results stages, not job-wide multi-stage completion.

DB-count fallback (Tier 2) deferred until monitoring shows the
Redis-gone WARNING fires often enough to matter.

Sync_api jobs are unaffected — dispatch_mode gate keeps them on the
existing trust-Celery path.

Co-Authored-By: Claude <noreply@anthropic.com>
Resume instructions for the next session — implementation steps for
all_tasks_processed() + reaper guard swap, test cases, e2e chaos
validation, follow-up tickets, and the saved diagnostic log paths on
the ami-devops host. References the plan doc at
docs/claude/planning/job-complete-check.md (commit 10cdac2).

Co-Authored-By: Claude <noreply@anthropic.com>
Job 2521 ended REVOKED despite NATS+Redis both confirming all 4510 tasks
processed. Root cause: `_update_job_progress` writes the entire
`Job.progress` JSONB blob without a row lock since #1261 dropped
`select_for_update`. Concurrent workers raced; a slower committer
clobbered the faster committer's SUCCESS write, leaving progress at
`processed=4509 remaining=1 STARTED`. The reaper read the clobbered
`progress.is_complete()` -> False -> REVOKE.

Add `AsyncJobStateManager.all_tasks_processed() -> bool | None`
(tri-state) backed directly by Redis SCARD across both pending sets, and
inline it at the reaper guard in `check_stale_jobs`. Sync_api jobs
unchanged (dispatch_mode gate). When Redis state is absent (cleanup,
expiry, never initialized, or transient RedisError), fall back to
`progress.is_complete()` and emit a WARNING so monitoring can flag the
fallback path.

Other 5 `progress.is_complete()` callers stay as-is — only the reaper
guard is bitten by the clobber. Atomic-write follow-up tracked separately.

Co-Authored-By: Claude <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings April 29, 2026 22:11
@netlify
Copy link
Copy Markdown

netlify Bot commented Apr 29, 2026

Deploy Preview for antenna-ssec canceled.

Name Link
🔨 Latest commit 0a7480b
🔍 Latest deploy log https://app.netlify.com/projects/antenna-ssec/deploys/69f3e44687e30a0008043335

@netlify
Copy link
Copy Markdown

netlify Bot commented Apr 29, 2026

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit 0a7480b
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/69f3e4462234a4000882c080

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 29, 2026

Warning

Rate limit exceeded

@mihow has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 50 minutes and 37 seconds before requesting another review.

To keep reviews running without waiting, you can enable usage-based add-on for your organization. This allows additional reviews beyond the hourly cap. Account admins can enable it under billing.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: c13ad608-c2f2-4181-acfb-4de60d33704c

📥 Commits

Reviewing files that changed from the base of the PR and between 36b17c4 and 0a7480b.

📒 Files selected for processing (1)
  • docs/claude/planning/job-state-triangulation.md
📝 Walkthrough

Walkthrough

This PR enhances the job reaper's terminal-state decision logic for async_api jobs by introducing a Redis-backed completion check. The AsyncJobStateManager.all_tasks_processed() method queries Redis pending-set drain status across process and results stages. When Celery indicates a terminal state, the reaper now consults Redis to determine true completion, with graceful fallback to job.progress.is_complete() when Redis state is unavailable.

Changes

Cohort / File(s) Summary
Core Logic
ami/jobs/tasks.py, ami/ml/orchestration/async_job_state.py
Added AsyncJobStateManager.all_tasks_processed() tri-state method that pipelines Redis SCARD queries across pending-set stages and interprets drain completion, returning True/False/None. Updated check_stale_jobs to consult this method for async_api Celery terminal states, with fallback to job.progress.is_complete() and warning log when Redis state absent.
Test Coverage
ami/jobs/tests/test_tasks.py, ami/ml/tests.py
Added TestCheckStaleJobsReaperGuard TransactionTestCase with 4+ scenarios covering Redis-present/drained, Redis-present/pending, Redis-absent (with fallback & warning log), and sync_api unchanged cases. Added AsyncJobStateManager.all_tasks_processed() unit tests across fresh initialization, drain stages, zero-image jobs, and error fallback conditions.
Documentation
docs/claude/planning/job-complete-check.md, docs/claude/prompts/NEXT_SESSION_PROMPT.md
Added planning and next-session prompt documents specifying the Redis-oracle implementation, tri-state method semantics, reaper guard control flow, expected scenario outcomes, logging format, test matrix, and validation steps.

Sequence Diagram

sequenceDiagram
    participant Reaper as check_stale_jobs<br/>(Reaper)
    participant Celery as Celery<br/>AsyncResult
    participant AsyncMgr as AsyncJobStateManager
    participant Redis as Redis
    participant JobProgress as job.progress
    
    Reaper->>Celery: Check terminal state<br/>(SUCCESS/FAILURE)
    alt Async API Job with Terminal State
        Reaper->>AsyncMgr: all_tasks_processed()?
        AsyncMgr->>Redis: Pipeline SCARD on pending sets<br/>+ GET total
        alt Redis State Available
            Redis-->>AsyncMgr: pending counts & total
            alt All Pending Drained
                AsyncMgr-->>Reaper: True
                Reaper->>Reaper: Keep job terminal<br/>(SUCCESS)
            else Pending Remain
                AsyncMgr-->>Reaper: False
                Reaper->>Reaper: Revoke job<br/>defer completion check
            end
        else Redis State Absent/Error
            Redis-->>AsyncMgr: Error or missing total
            AsyncMgr-->>Reaper: None
            Reaper->>JobProgress: Fallback to<br/>is_complete()?
            alt Complete
                JobProgress-->>Reaper: True
                Reaper->>Reaper: Log WARNING,<br/>set SUCCESS
            else Incomplete
                JobProgress-->>Reaper: False
                Reaper->>Reaper: Log WARNING,<br/>Revoke
            end
        end
    else Sync API Job or<br/>Non-Terminal State
        Reaper->>Reaper: Use standard logic<br/>(skip Redis check)
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~22 minutes

Possibly related issues

Possibly related PRs

Poem

🐇 Redis drains the pending queue with grace,
While progress-check stands as fallback place,
Through warnings logged and states refined,
The reaper's guard leaves clobbered jobs behind! 🔄

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description check ✅ Passed Description includes all required template sections: Summary, List of Changes, Related Issues, Detailed Description with risk context, How to Test (test plan with checkbox completion), and Deployment Notes. All major information is present and complete.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Title check ✅ Passed The PR title 'fix(jobs): fix dangling jobs from going to revoked' accurately summarizes the main change: addressing a bug where jobs that reached terminal SUCCESS were being incorrectly revoked due to a race condition involving stale progress data.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/job-complete-check

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 0/1 reviews remaining, refill in 50 minutes and 37 seconds.

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Updates the async job reaper completion guard to consult Redis (pending-set drain) directly to avoid incorrect REVOKED outcomes caused by concurrent JSONB Job.progress clobbering.

Changes:

  • Add AsyncJobStateManager.all_tasks_processed() -> bool | None backed by Redis SCARD/GET to determine whether async NATS-tracked tasks are fully drained.
  • Update check_stale_jobs to use the Redis-backed completion check for async_api jobs, with a warning + progress.is_complete() fallback when Redis state is unavailable.
  • Add test coverage for the new tri-state Redis oracle and reaper guard behavior (including the “clobbered progress but drained Redis” case).

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
docs/claude/prompts/NEXT_SESSION_PROMPT.md Adds session/runbook notes for implementing and validating the Redis-backed reaper guard.
docs/claude/planning/job-complete-check.md Planning doc describing rationale, scope, risks, and acceptance criteria for the Redis-backed completion check.
ami/ml/orchestration/async_job_state.py Implements all_tasks_processed() using Redis to determine pending work across stages.
ami/ml/tests.py Adds unit tests covering all_tasks_processed() True/False/None behaviors, including RedisError.
ami/jobs/tasks.py Replaces async reaper guard from progress.is_complete() to Redis-backed all_tasks_processed() with fallback.
ami/jobs/tests/test_tasks.py Adds integration-style tests for the reaper guard outcomes across Redis drained/pending/absent and sync_api unaffected.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +231 to +252
def all_tasks_processed(self) -> bool | None:
"""Tri-state truth signal for NATS-task SREM completeness across both
process and results pending sets.

True — both pending sets empty AND total > 0 (or total == 0)
False — at least one pending set has members
None — Redis state absent (cleaned up, expired, never initialized,
or transient RedisError)

Scope: tracks NATS task lifecycle only; does not know about `collect`
or any future post-results stages.
"""
try:
redis = self._get_redis()
with redis.pipeline() as pipe:
for stage in self.STAGES:
pipe.scard(self._get_pending_key(stage))
pipe.get(self._total_key)
results = pipe.execute()
except RedisError as e:
logger.warning(f"Redis error reading all_tasks_processed for job {self.job_id}: {e}")
return None
Comment thread ami/jobs/tasks.py
Comment on lines 970 to +974
is_terminal = celery_state in states.READY_STATES
is_async_api = job.dispatch_mode == JobDispatchMode.ASYNC_API
if is_async_api and celery_state in {states.SUCCESS, states.FAILURE} and not job.progress.is_complete():
is_terminal = False
if is_async_api and celery_state in {states.SUCCESS, states.FAILURE}:
processed = AsyncJobStateManager(job.pk).all_tasks_processed()
if processed is False:
Comment thread ami/jobs/tasks.py
is_terminal = False
elif processed is None:
logger.warning(
"Reaper for job %s: Redis state absent, falling back to " "progress.is_complete()",
@mihow mihow changed the title fix(jobs): reaper checks Redis directly via all_tasks_processed fix(jobs): fix dangling jobs from going to revoked Apr 29, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
docs/claude/prompts/NEXT_SESSION_PROMPT.md (1)

211-215: Add a language to this fenced block.

markdownlint MD040 will keep flagging this unlabeled fence. text would work fine here since this is captured command output, not executable code.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/claude/prompts/NEXT_SESSION_PROMPT.md` around lines 211 - 215, The
fenced block containing the git output (the lines starting with "$ git log
--oneline fix/job-complete-check ^main" and the two commit lines) is missing a
language tag and triggers markdownlint MD040; update the opening fence from ```
to ```text so the block is labeled as plain text. Ensure only the opening fence
is changed (leave the content and closing ``` intact) so the block is recognized
as non-executable text.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ami/jobs/tasks.py`:
- Around line 972-983: The current fast-path treats Celery states.SUCCESS and
states.FAILURE as terminal for is_async_api jobs by querying
AsyncJobStateManager(job.pk).all_tasks_processed(), which wrongly forces FAILURE
before the async result path can apply failure-threshold logic (see
update_job_failure()). Change the condition so only states.SUCCESS is considered
a terminal fast-path (i.e., `if is_async_api and celery_state is
states.SUCCESS:`), keep the same handling for processed==False/None and
job.progress.is_complete(), and do not treat states.FAILURE as authoritative
here; add a regression test that simulates celery_state=FAILURE with
Redis-drained (AsyncJobStateManager processed True) and verifies final status is
decided by the async result/failure-threshold logic instead of being forced to
FAILURE.

---

Nitpick comments:
In `@docs/claude/prompts/NEXT_SESSION_PROMPT.md`:
- Around line 211-215: The fenced block containing the git output (the lines
starting with "$ git log --oneline fix/job-complete-check ^main" and the two
commit lines) is missing a language tag and triggers markdownlint MD040; update
the opening fence from ``` to ```text so the block is labeled as plain text.
Ensure only the opening fence is changed (leave the content and closing ```
intact) so the block is recognized as non-executable text.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: bd872a1d-acf2-4a53-9a3e-a5b5e6cefa12

📥 Commits

Reviewing files that changed from the base of the PR and between a347d55 and 36b17c4.

📒 Files selected for processing (6)
  • ami/jobs/tasks.py
  • ami/jobs/tests/test_tasks.py
  • ami/ml/orchestration/async_job_state.py
  • ami/ml/tests.py
  • docs/claude/planning/job-complete-check.md
  • docs/claude/prompts/NEXT_SESSION_PROMPT.md

Comment thread ami/jobs/tasks.py
Comment on lines +972 to +983
if is_async_api and celery_state in {states.SUCCESS, states.FAILURE}:
processed = AsyncJobStateManager(job.pk).all_tasks_processed()
if processed is False:
is_terminal = False
elif processed is None:
logger.warning(
"Reaper for job %s: Redis state absent, falling back to " "progress.is_complete()",
job.pk,
)
if not job.progress.is_complete():
is_terminal = False
# processed is True -> trust Celery's terminal state
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't treat Celery FAILURE as authoritative for drained async_api jobs.

update_job_failure() deliberately defers post-queue run_job failures for ASYNC_API jobs until the async result path decides the terminal outcome. With this new Redis gate, a job whose NATS work drained but whose run_job task ended in FAILURE will now be forced to FAILURE here, even if the final processed/failed counts should land below FAILURE_THRESHOLD and resolve to SUCCESS.

At minimum, I’d scope this fast-path to states.SUCCESS until there is a non-Celery oracle for the failure-threshold outcome, and add a regression test for celery_state=FAILURE + drained Redis.

Possible narrow fix
-            if is_async_api and celery_state in {states.SUCCESS, states.FAILURE}:
+            if is_async_api and celery_state == states.SUCCESS:
                 processed = AsyncJobStateManager(job.pk).all_tasks_processed()
                 if processed is False:
                     is_terminal = False
                 elif processed is None:
                     logger.warning(
                         "Reaper for job %s: Redis state absent, falling back to " "progress.is_complete()",
                         job.pk,
                     )
                     if not job.progress.is_complete():
                         is_terminal = False
-                # processed is True -> trust Celery's terminal state
+                # processed is True -> trust Celery SUCCESS

Based on learnings, jobs using AsyncJobStateManager should only be set to SUCCESS or FAILURE status when complete (progress >= 1.0), and the final status is determined when the job completes based on the failure threshold.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if is_async_api and celery_state in {states.SUCCESS, states.FAILURE}:
processed = AsyncJobStateManager(job.pk).all_tasks_processed()
if processed is False:
is_terminal = False
elif processed is None:
logger.warning(
"Reaper for job %s: Redis state absent, falling back to " "progress.is_complete()",
job.pk,
)
if not job.progress.is_complete():
is_terminal = False
# processed is True -> trust Celery's terminal state
if is_async_api and celery_state == states.SUCCESS:
processed = AsyncJobStateManager(job.pk).all_tasks_processed()
if processed is False:
is_terminal = False
elif processed is None:
logger.warning(
"Reaper for job %s: Redis state absent, falling back to " "progress.is_complete()",
job.pk,
)
if not job.progress.is_complete():
is_terminal = False
# processed is True -> trust Celery SUCCESS
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/tasks.py` around lines 972 - 983, The current fast-path treats
Celery states.SUCCESS and states.FAILURE as terminal for is_async_api jobs by
querying AsyncJobStateManager(job.pk).all_tasks_processed(), which wrongly
forces FAILURE before the async result path can apply failure-threshold logic
(see update_job_failure()). Change the condition so only states.SUCCESS is
considered a terminal fast-path (i.e., `if is_async_api and celery_state is
states.SUCCESS:`), keep the same handling for processed==False/None and
job.progress.is_complete(), and do not treat states.FAILURE as authoritative
here; add a regression test that simulates celery_state=FAILURE with
Redis-drained (AsyncJobStateManager processed True) and verifies final status is
decided by the async result/failure-threshold logic instead of being forced to
FAILURE.

Low-priority follow-up to #1276. Three sources of truth for async_api job
lifecycle (Job model, Redis sets, NATS consumer state) lie differently;
unifying them into a diagnostic view catches bug classes Redis alone
can't see (#1247-style stranding, result-handler swallow, hung in-flight
workers). Doc proposes snapshot-on-terminal, diagnostics endpoint,
JobReconciler.diagnose(), and an admin UI panel.

Tracked in #1285.

Co-Authored-By: Claude <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants