feat(jobs): drain zombie NATS streams in periodic health check#1239
feat(jobs): drain zombie NATS streams in periodic health check#1239
Conversation
A NATS stream whose Django Job is already terminal (or whose Job row has
been deleted) is a zombie: it consumes worker poll cycles and
redelivery-advisory traffic for no reason, yet the existing
cleanup-on-cancel path does not always run. Add a
defense-in-depth sub-check to the 15-min jobs_health_check umbrella.
_run_zombie_streams_check:
- Enumerates every job_{N} stream currently in JetStream via the raw
$JS.API.STREAM.LIST endpoint (nats.py's streams_info() drops the
server-side "created" timestamp we need for the age guard).
- Skips streams younger than Job.ZOMBIE_STREAMS_MAX_AGE_MINUTES
(STALLED_JOBS_MAX_MINUTES * 6 by default) to avoid racing with
freshly-dispatched jobs whose stream is created before
transaction.on_commit persists the Job row.
- Drains (delete_consumer + delete_stream) only when the backing Job is
in JobState.final_states or the row is missing entirely. Running jobs
are left alone regardless of age.
- Surfaces results through an IntegrityCheckResult with
checked/fixed/unfixable counters and logs a per-stream line carrying
status, age, redelivered count, and consumer-drain outcome.
New Job.ZOMBIE_STREAMS_MAX_AGE_MINUTES class attribute for symmetry with
existing STALLED_JOBS_MAX_MINUTES and FAILED_JOBS_DISPLAY_MAX_HOURS
constants. Tighten this if zombie streams start stranding poll cycles
faster than the 60-minute default catches them.
Tests cover the four outcomes (drain on terminal+old, drain on
missing+old, skip on fresh-but-terminal, skip on old-but-running) plus
the unfixable path when the NATS drain itself raises.
Co-Authored-By: Claude <noreply@anthropic.com>
✅ Deploy Preview for antenna-ssec canceled.
|
✅ Deploy Preview for antenna-preview canceled.
|
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (5)
📝 WalkthroughWalkthroughThe changes introduce a zombie stream detection and cleanup mechanism within the job health-check system. A new constant bounds stream retention age. New NATS manager methods enumerate job streams, parse creation timestamps, and populate redelivery counts. A health-check function identifies streams for terminal/missing jobs exceeding the age threshold and attempts deletion via NATS consumer/stream removal. Comprehensive tests validate behavior across success and failure scenarios. Changes
Sequence DiagramsequenceDiagram
participant HC as Health Check
participant TQM as TaskQueueManager
participant NATS as NATS JetStream
participant DB as Django DB
HC->>TQM: _run_zombie_streams_check()
TQM->>NATS: list_job_stream_snapshots()
NATS-->>TQM: [stream snapshots with created timestamps]
TQM->>TQM: populate_redelivered_counts(snapshots)
loop for each snapshot
TQM->>DB: Job.objects.get(id=job_id)
DB-->>TQM: job (or not found)
alt job is terminal or missing
alt stream age >= ZOMBIE_STREAMS_MAX_AGE_MINUTES
TQM->>NATS: delete_consumer(job_id)
NATS-->>TQM: ack
TQM->>NATS: delete_stream(job_id)
NATS-->>TQM: ack
TQM->>TQM: increment fixed count
else stream too young
TQM->>TQM: skip (not old enough)
end
else job is running
TQM->>TQM: skip (job still active)
end
end
TQM-->>HC: IntegrityCheckResult{checked, fixed, unfixable}
HC->>HC: add to jobs_health_check() output
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly Related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
Adds a new “zombie NATS streams” sub-check to the existing periodic jobs_health_check so the system can proactively drain leftover JetStream job_{N} streams after their backing Django Job is terminal (or missing) and sufficiently old, reducing wasted worker polling.
Changes:
- Added JetStream stream enumeration via raw
$JS.API.STREAM.LIST, including parsing the servercreatedtimestamp. - Implemented a new
zombie_streamssub-check injobs_health_checkto drain eligible streams/consumers and report{checked, fixed, unfixable}. - Added unit tests covering drain/skip scenarios and pagination.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
ami/ml/orchestration/nats_queue.py |
Adds timestamp parsing and raw stream listing to support identifying zombie job_{N} streams. |
ami/jobs/tasks.py |
Adds zombie_streams integrity sub-check and wires it into the umbrella health check result. |
ami/jobs/models.py |
Introduces Job.ZOMBIE_STREAMS_MAX_AGE_MINUTES to gate safe draining by age. |
ami/jobs/tests/test_periodic_beat_tasks.py |
Extends health-check tests to cover zombie-stream draining behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Invert created=None safety guard: skip (unfixable) rather than drain when a stream's created timestamp is missing or unparseable, preserving the intent of the age guard for unknown-age streams - Raise nats.errors.Error on NATS STREAM.LIST error payloads so an outage surfaces as unfixable=1 instead of masking as "zero streams" - Defer _consumer_redelivered_count to drain candidates only via a new populate_redelivered_counts() helper, reducing O(N) NATS round-trips to O(candidates) per beat tick Co-Authored-By: Claude <noreply@anthropic.com>
What this does (plain English)
When an ML job finishes or gets cancelled, we sometimes leave its NATS message queue behind. Workers don't know the job is over, so they keep politely asking that queue for work forever — burning cycles on nothing. This PR adds a periodic cleanup that deletes those orphaned queues once they're clearly abandoned, while keeping a safety window so we don't accidentally tear down a queue that a just-started job is still setting up.
Summary
Adds a
zombie_streamssub-check to the existingjobs_health_checkperiodic beat task. Drains NATS JetStreamjob_{N}streams whose backing Django Job is in a terminal state (or missing) AND whose stream age exceedsJob.ZOMBIE_STREAMS_MAX_AGE_MINUTES(default 60 minutes).Zombie streams accumulate when the cleanup-on-cancel path misses a stream on job failure or revocation; workers keep polling the leftover subject for no reason. This is defense-in-depth — the underlying cleanup-on-cancel gap is a separate fix.
Two guards protect against draining live state:
JobState.final_states()or the Job row is missing. In-flight Jobs are skipped.ZOMBIE_STREAMS_MAX_AGE_MINUTESare skipped. Protects against thetransaction.on_commitrace where a NATS stream exists briefly before the Job row is persisted.Implementation notes:
$JS.API.STREAM.LISTendpoint (inami/ml/orchestration/nats_queue.py::list_job_stream_snapshots) becauseJetStreamContext.streams_infoin the currently pinnednats.pydrops the server-sidecreatedtimestamp, which we need to age zombies out with a safety margin. Pages at 256 streams per response so long tails are still fully enumerated.{checked, fixed, unfixable}counters consistent with the other sub-checks inJobsHealthCheckResult.Test plan
ami.jobs.tests.test_periodic_beat_tasks— unit tests cover drain, skip-fresh, skip-running, missing-job, paginated listingE2E validation
Invoked
jobs_health_check()synchronously against a live local NATS server in three scenarios. All three hit the expected branch of the age/state guard.1. Drain on terminal+old. Created stream
job_999991with no matchingJobrow, patchedJob.ZOMBIE_STREAMS_MAX_AGE_MINUTES = 0, ran the check:consumer_deleted=Falseis expected — the fixture only created a stream, no consumer. The stream itself (the entity that was wasting worker poll cycles) was cleanly removed. Post-drainstream_inforaisedNotFoundErroras expected.2. Skip on fresh+terminal (the
transaction.on_commitrace guard). Created a fresh streamjob_999992, left the threshold at the default 60 min, ran the check:Stream retained. This is the scenario that would break async_api dispatch if it regressed — a freshly-created stream whose
Jobrow hasn't committed yet must not be drained.3. Skip on old+running. Created a throwaway
Job(status=STARTED) and matching stream, patched threshold to 0 so age alone would permit drain:Stream retained. In-flight Job correctly shields the stream even when the age guard would otherwise permit it.
running_job_snapshotssub-check simultaneously picked up the Job as expected (checked=1).All three scenarios also exercise the raw
$JS.API.STREAM.LISTenumeration + UTC→local timestamp conversion against the real NATS server, which is the piece the unit tests mock.Summary by CodeRabbit
Release Notes
New Features
Tests