Skip to content

feat(jobs): drain zombie NATS streams in periodic health check#1239

Merged
mihow merged 2 commits intomainfrom
feat/drain-zombie-nats-streams
Apr 16, 2026
Merged

feat(jobs): drain zombie NATS streams in periodic health check#1239
mihow merged 2 commits intomainfrom
feat/drain-zombie-nats-streams

Conversation

@mihow
Copy link
Copy Markdown
Collaborator

@mihow mihow commented Apr 15, 2026

What this does (plain English)

When an ML job finishes or gets cancelled, we sometimes leave its NATS message queue behind. Workers don't know the job is over, so they keep politely asking that queue for work forever — burning cycles on nothing. This PR adds a periodic cleanup that deletes those orphaned queues once they're clearly abandoned, while keeping a safety window so we don't accidentally tear down a queue that a just-started job is still setting up.


Summary

Adds a zombie_streams sub-check to the existing jobs_health_check periodic beat task. Drains NATS JetStream job_{N} streams whose backing Django Job is in a terminal state (or missing) AND whose stream age exceeds Job.ZOMBIE_STREAMS_MAX_AGE_MINUTES (default 60 minutes).

Zombie streams accumulate when the cleanup-on-cancel path misses a stream on job failure or revocation; workers keep polling the leftover subject for no reason. This is defense-in-depth — the underlying cleanup-on-cancel gap is a separate fix.

Two guards protect against draining live state:

  • Terminal-only: streams are only drained if the Job is in JobState.final_states() or the Job row is missing. In-flight Jobs are skipped.
  • Age guard: streams newer than ZOMBIE_STREAMS_MAX_AGE_MINUTES are skipped. Protects against the transaction.on_commit race where a NATS stream exists briefly before the Job row is persisted.

Implementation notes:

  • Uses the raw $JS.API.STREAM.LIST endpoint (in ami/ml/orchestration/nats_queue.py::list_job_stream_snapshots) because JetStreamContext.streams_info in the currently pinned nats.py drops the server-side created timestamp, which we need to age zombies out with a safety margin. Pages at 256 streams per response so long tails are still fully enumerated.
  • Returns {checked, fixed, unfixable} counters consistent with the other sub-checks in JobsHealthCheckResult.

Test plan

  • ami.jobs.tests.test_periodic_beat_tasks — unit tests cover drain, skip-fresh, skip-running, missing-job, paginated listing
  • E2E on local stack against a real NATS server: drain fires on terminal+old, skips fresh+terminal, skips old+running

E2E validation

Invoked jobs_health_check() synchronously against a live local NATS server in three scenarios. All three hit the expected branch of the age/state guard.

1. Drain on terminal+old. Created stream job_999991 with no matching Job row, patched Job.ZOMBIE_STREAMS_MAX_AGE_MINUTES = 0, ran the check:

zombie_streams: checked=1, fixed=1, unfixable=0
INFO Drained zombie NATS stream job_999991 (status=missing, age=0.0h, redelivered=None, consumer_deleted=False)

consumer_deleted=False is expected — the fixture only created a stream, no consumer. The stream itself (the entity that was wasting worker poll cycles) was cleanly removed. Post-drain stream_info raised NotFoundError as expected.

2. Skip on fresh+terminal (the transaction.on_commit race guard). Created a fresh stream job_999992, left the threshold at the default 60 min, ran the check:

zombie_streams: checked=1, fixed=0, unfixable=0

Stream retained. This is the scenario that would break async_api dispatch if it regressed — a freshly-created stream whose Job row hasn't committed yet must not be drained.

3. Skip on old+running. Created a throwaway Job (status=STARTED) and matching stream, patched threshold to 0 so age alone would permit drain:

zombie_streams: checked=1, fixed=0, unfixable=0

Stream retained. In-flight Job correctly shields the stream even when the age guard would otherwise permit it. running_job_snapshots sub-check simultaneously picked up the Job as expected (checked=1).

All three scenarios also exercise the raw $JS.API.STREAM.LIST enumeration + UTC→local timestamp conversion against the real NATS server, which is the piece the unit tests mock.

Summary by CodeRabbit

Release Notes

  • New Features

    • Added automatic cleanup of stale job streams in NATS for jobs in terminal states or no longer tracked in the system.
    • Enhanced health check monitoring to include zombie stream detection and removal.
  • Tests

    • Added comprehensive test coverage for zombie stream detection and cleanup behavior, including edge cases and failure scenarios.

A NATS stream whose Django Job is already terminal (or whose Job row has
been deleted) is a zombie: it consumes worker poll cycles and
redelivery-advisory traffic for no reason, yet the existing
cleanup-on-cancel path does not always run. Add a
defense-in-depth sub-check to the 15-min jobs_health_check umbrella.

_run_zombie_streams_check:
- Enumerates every job_{N} stream currently in JetStream via the raw
  $JS.API.STREAM.LIST endpoint (nats.py's streams_info() drops the
  server-side "created" timestamp we need for the age guard).
- Skips streams younger than Job.ZOMBIE_STREAMS_MAX_AGE_MINUTES
  (STALLED_JOBS_MAX_MINUTES * 6 by default) to avoid racing with
  freshly-dispatched jobs whose stream is created before
  transaction.on_commit persists the Job row.
- Drains (delete_consumer + delete_stream) only when the backing Job is
  in JobState.final_states or the row is missing entirely. Running jobs
  are left alone regardless of age.
- Surfaces results through an IntegrityCheckResult with
  checked/fixed/unfixable counters and logs a per-stream line carrying
  status, age, redelivered count, and consumer-drain outcome.

New Job.ZOMBIE_STREAMS_MAX_AGE_MINUTES class attribute for symmetry with
existing STALLED_JOBS_MAX_MINUTES and FAILED_JOBS_DISPLAY_MAX_HOURS
constants. Tighten this if zombie streams start stranding poll cycles
faster than the 60-minute default catches them.

Tests cover the four outcomes (drain on terminal+old, drain on
missing+old, skip on fresh-but-terminal, skip on old-but-running) plus
the unfixable path when the NATS drain itself raises.

Co-Authored-By: Claude <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings April 15, 2026 23:33
@netlify
Copy link
Copy Markdown

netlify bot commented Apr 15, 2026

Deploy Preview for antenna-ssec canceled.

Name Link
🔨 Latest commit 1e24d51
🔍 Latest deploy log https://app.netlify.com/projects/antenna-ssec/deploys/69e02fc7eb036b0008b3e1db

@netlify
Copy link
Copy Markdown

netlify bot commented Apr 15, 2026

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit 1e24d51
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/69e02fc478d4a400087f5413

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 15, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: d94e4469-2e64-44da-ae4f-ca2fc3dc5413

📥 Commits

Reviewing files that changed from the base of the PR and between 5090057 and 1e24d51.

📒 Files selected for processing (5)
  • ami/jobs/models.py
  • ami/jobs/tasks.py
  • ami/jobs/tests/test_periodic_beat_tasks.py
  • ami/ml/orchestration/nats_queue.py
  • ami/ml/orchestration/tests/test_nats_queue.py

📝 Walkthrough

Walkthrough

The changes introduce a zombie stream detection and cleanup mechanism within the job health-check system. A new constant bounds stream retention age. New NATS manager methods enumerate job streams, parse creation timestamps, and populate redelivery counts. A health-check function identifies streams for terminal/missing jobs exceeding the age threshold and attempts deletion via NATS consumer/stream removal. Comprehensive tests validate behavior across success and failure scenarios.

Changes

Cohort / File(s) Summary
Job model and zombie stream configuration
ami/jobs/models.py
Added class constant ZOMBIE_STREAMS_MAX_AGE_MINUTES derived from STALLED_JOBS_MAX_MINUTES * 6 to define retention threshold for job streams.
Job health-check integration
ami/jobs/tasks.py
Extended JobsHealthCheckResult dataclass with zombie_streams field. Implemented _run_zombie_streams_check() to identify and delete NATS streams for terminal/missing jobs older than threshold. Updated jobs_health_check() to invoke the new sub-check with isolated error handling.
Job health-check tests
ami/jobs/tests/test_periodic_beat_tasks.py
Added mocked NATS manager methods (list_job_stream_snapshots, populate_redelivered_counts, delete_consumer, delete_stream). Implemented multiple test cases covering stream drainage for old terminal jobs, missing jobs, age-threshold skipping, and exception handling as unfixable errors.
NATS stream enumeration and management
ami/ml/orchestration/nats_queue.py
Added _parse_nats_timestamp() to convert RFC3339 timestamps to naive local datetimes. Implemented list_job_stream_snapshots() to enumerate job streams and extract metadata. Added populate_redelivered_counts() for concurrent redelivery count enrichment and _consumer_redelivered_count() helper to fetch per-job consumer info with exception tolerance.
NATS enumeration tests
ami/ml/orchestration/tests/test_nats_queue.py
Added unit tests validating list_job_stream_snapshots() error handling (raising nats.errors.Error on server errors) and successful listing with proper snapshot structure and num_redelivered initialization.

Sequence Diagram

sequenceDiagram
    participant HC as Health Check
    participant TQM as TaskQueueManager
    participant NATS as NATS JetStream
    participant DB as Django DB

    HC->>TQM: _run_zombie_streams_check()
    TQM->>NATS: list_job_stream_snapshots()
    NATS-->>TQM: [stream snapshots with created timestamps]
    TQM->>TQM: populate_redelivered_counts(snapshots)
    loop for each snapshot
        TQM->>DB: Job.objects.get(id=job_id)
        DB-->>TQM: job (or not found)
        alt job is terminal or missing
            alt stream age >= ZOMBIE_STREAMS_MAX_AGE_MINUTES
                TQM->>NATS: delete_consumer(job_id)
                NATS-->>TQM: ack
                TQM->>NATS: delete_stream(job_id)
                NATS-->>TQM: ack
                TQM->>TQM: increment fixed count
            else stream too young
                TQM->>TQM: skip (not old enough)
            end
        else job is running
            TQM->>TQM: skip (job still active)
        end
    end
    TQM-->>HC: IntegrityCheckResult{checked, fixed, unfixable}
    HC->>HC: add to jobs_health_check() output
Loading

Estimated Code Review Effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly Related PRs

Poem

🐰 Zombie streams begone! 🌊
Old ghosts of finished jobs drift away,
Health checks sweep the NATS halls clean—
No more haunting the queue, hooray! ✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 77.78% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: adding a zombie streams drain feature to the periodic health check task.
Description check ✅ Passed The description covers all required sections with substantial detail: summary, list of changes, related issues reference, detailed description with implementation notes, test plan with E2E validation, and addresses deployment concerns.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/drain-zombie-nats-streams

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new “zombie NATS streams” sub-check to the existing periodic jobs_health_check so the system can proactively drain leftover JetStream job_{N} streams after their backing Django Job is terminal (or missing) and sufficiently old, reducing wasted worker polling.

Changes:

  • Added JetStream stream enumeration via raw $JS.API.STREAM.LIST, including parsing the server created timestamp.
  • Implemented a new zombie_streams sub-check in jobs_health_check to drain eligible streams/consumers and report {checked, fixed, unfixable}.
  • Added unit tests covering drain/skip scenarios and pagination.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

File Description
ami/ml/orchestration/nats_queue.py Adds timestamp parsing and raw stream listing to support identifying zombie job_{N} streams.
ami/jobs/tasks.py Adds zombie_streams integrity sub-check and wires it into the umbrella health check result.
ami/jobs/models.py Introduces Job.ZOMBIE_STREAMS_MAX_AGE_MINUTES to gate safe draining by age.
ami/jobs/tests/test_periodic_beat_tasks.py Extends health-check tests to cover zombie-stream draining behavior.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread ami/ml/orchestration/nats_queue.py
Comment thread ami/ml/orchestration/nats_queue.py Outdated
Comment thread ami/jobs/tasks.py Outdated
- Invert created=None safety guard: skip (unfixable) rather than
  drain when a stream's created timestamp is missing or unparseable,
  preserving the intent of the age guard for unknown-age streams
- Raise nats.errors.Error on NATS STREAM.LIST error payloads so an
  outage surfaces as unfixable=1 instead of masking as "zero streams"
- Defer _consumer_redelivered_count to drain candidates only via a
  new populate_redelivered_counts() helper, reducing O(N) NATS
  round-trips to O(candidates) per beat tick

Co-Authored-By: Claude <noreply@anthropic.com>
@mihow mihow merged commit 461c2c1 into main Apr 16, 2026
7 checks passed
@mihow mihow deleted the feat/drain-zombie-nats-streams branch April 16, 2026 01:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants