Skip to content

feat(jobs): make NATS queue activity visible in async ML job logs#1222

Merged
mihow merged 14 commits intomainfrom
feat/nats-queue-job-logger
Apr 14, 2026
Merged

feat(jobs): make NATS queue activity visible in async ML job logs#1222
mihow merged 14 commits intomainfrom
feat/nats-queue-job-logger

Conversation

@mihow
Copy link
Copy Markdown
Collaborator

@mihow mihow commented Apr 11, 2026

Summary

Closes #1220. Makes NATS queue lifecycle events visible in the per-job log that users see in the UI, so async ML jobs no longer have a silent gap where their NATS activity used to be invisible.

TaskQueueManager now accepts an optional job_logger. When set, lifecycle events are mirrored to both the module logger (unchanged — still feeds stdout / container logs / observability pipelines) and the per-job logger:

Setup

  • Created NATS stream job_<id> / Reusing NATS stream job_<id> (messages=N, last_seq=N)
  • Created NATS consumer job-<id>-consumer (max_deliver=5, ack_wait=30.0s, max_ack_pending=1000, deliver_policy=all, ack_policy=explicit) — config snapshot read from the ConsumerInfo returned by the server
  • Reusing NATS consumer job-<id>-consumer (delivered=N ack_floor=N num_pending=N num_ack_pending=N num_redelivered=N)

Cleanup (forensic)

  • Finalizing NATS consumer job-<id>-consumer before deletion (delivered=N ack_floor=N num_pending=N num_ack_pending=N num_redelivered=N) — logged before the delete calls. This is the single most useful line for post-mortem investigations of jobs that ended in a weird state; without it, the consumer is already gone by the time anyone asks.
  • Deleted NATS consumer job-<id>-consumer / Deleted NATS stream job_<id> and matching failure lines

Error paths

  • Failed to publish task to stream for job '<id>': <error> — previously only on the module logger, now also on the job logger with traceback preserved on the module side via exc_info=True

What's deliberately NOT logged to job_logger

Per-message and per-poll success events stay on the module logger only — a 10k-image job would otherwise drown its own log:

  • publish_task success (one per image)
  • reserve_tasks (one per worker poll, every few seconds)
  • acknowledge_task success (one per result)

Per-image queueing is routed through manager.log_async at DEBUG level, so it's available in the job log when debug logging is intentionally enabled (off by default). These might be worth surfacing more permanently when we can identify each processing service (#1194).

Implementation notes

Async bridging. Logging from inside the async block goes through the manager's log_async(level, msg, exc_info=False) helper — one entry point that:

  1. Always emits to the module logger (synchronous, fine on the event loop)
  2. Mirrors to job.logger via sync_to_async when one was passed in

The bridge is needed because Django's JobLogHandler.emit() does ORM writes (refresh_from_db + save), which are forbidden from the event loop. Without it, every lifecycle line was silently dropped — the handler's broad except swallowed SynchronousOnlyOperation and routed a warning to the module logger that nobody was watching.

Centralizing this in log_async means the bridge lives in one place instead of being re-implemented at every call site (an earlier draft of this PR had three different logging styles in the publish loop alone — module-direct, module-with-traceback, and a hand-rolled sync_to_async(job.logger.error) bridge).

Future intent — route, not mirror. log_async currently fans granular per-job lifecycle out to both loggers. The longer-term preference (documented inline in the log_async docstring) is to route — granular lifecycle lives on job.logger only, matching the convention in ami.jobs.tasks.save_results and friends, where job.logger.propagate = False and per-job detail is kept out of stdout / NR. The module logger then becomes a clean ops channel for true infra signals (connection failures, NATS-side errors) plus an automatic mirror at WARNING+ so error signals always reach ops dashboards. Kept symmetric for now because async ML processing is still being stabilized and the extra stdout visibility is actively helping debug. Switching the fan-out direction is a follow-up once the per-job UI log is trusted as the canonical inspection surface.

Dedup and early-return. _ensure_stream and _ensure_consumer are called on every publish_task (once per image). After the first call per manager session, subsequent calls skip the NATS round-trip entirely via an early return keyed on two set[int] fields (_streams_logged, _consumers_logged).

Concurrency note: Job.cancel() can trigger cleanup_async_job_resources (running in the request thread) while queue_images_to_nats is still in its publish loop (running in the Celery worker), so the stream/consumer can be deleted mid-flight from a different manager session. The early-return stays safe in that case — subsequent publishes in the queue loop fail loudly (publish_task returns False and logs an ERROR) rather than silently recreating an orphan stream without a consumer. That's actually better than the non-deduped baseline behavior, where _ensure_stream would silently recreate the stream on every iteration and the subsequent publishes would succeed to a stream that no worker will ever consume from. Not exercised by a concurrent-cancel test yet; worth adding if we lean harder on this claim.

Exception narrowing. _ensure_consumer previously caught broad Exception on the consumer_info() call, masking auth/API/transient errors as "consumer missing." Narrowed to nats.js.errors.NotFoundError to match the pattern already used in _ensure_stream and _stream_exists.

Caller wiring. cleanup_async_job_resources(job_id) now takes just job_id and resolves the Job (and its per-job logger) internally — matching the save_results pattern in ami/jobs/tasks.py. If the Job row is gone (e.g. the Job.DoesNotExist path in _fail_job), it falls back to the module logger. queue_images_to_nats passes job.logger directly to TaskQueueManager. Callers without a job context (_ack_task_via_nats, the worker-poll view, the DLQ management command) construct TaskQueueManager() without a logger.

Tests

Added TestTaskQueueManagerJobLogger in ami/ml/orchestration/tests/test_nats_queue.py (7 new tests):

  1. First publish logs stream + consumer creation to job_logger, including config snapshot
  2. Second publish in the same session emits no new lifecycle lines (dedup)
  3. Stream/consumer reuse logs include state and stats
  4. cleanup_job_resources logs the forensic snapshot before delete calls
  5. cleanup_job_resources tolerates a missing consumer without raising
  6. publish_task failures surface on job_logger at ERROR level
  7. TaskQueueManager() without a job_logger still works (module logger fallback)

Tests use a real logging.Logger with a capture handler (not MagicMock), so the sync_to_async bridge in log_async() is exercised on the real code path. All 18 tests in test_nats_queue.py pass. Not covered: concurrent-cancel-during-queue (see Dedup and early-return above).

Manual verification

Deployed to a staging environment and ran async_api jobs end-to-end against real ML workers.

Small smoke test (18 images) — two jobs completed the full lifecycle. A third was canceled mid-flight; its UI log showed:

Finalizing NATS consumer job-<id>-consumer before deletion
  (delivered=50 ack_floor=0 num_pending=0 num_ack_pending=16 num_redelivered=16)

Larger test (636 images) — canceled after ~16 minutes (staging server backlogged on result ingestion). The forensic snapshot:

Queuing 636 images to NATS stream for job '<id>'
Created NATS stream job_<id>
Created NATS consumer job-<id>-consumer (max_deliver=5, ack_wait=30s, ...)
Successfully queued 636/636 images to stream for job '<id>'
... (per-result lines, pre-existing behavior) ...
Finalizing NATS consumer job-<id>-consumer before deletion
  (delivered=920 ack_floor=169 num_pending=366 num_ack_pending=43 num_redelivered=141)
Deleted NATS consumer job-<id>-consumer for job '<id>'
Deleted NATS stream job_<id> for job '<id>'
Cleaned up NATS resources for job <id>

920 delivery attempts for 636 tasks (~1.45× factor from redeliveries), 169 acked, 43 in flight, 141 redelivered. Workers were pulling but result POSTs were timing out, causing ack-before-ack_wait failures. Without this line you'd be guessing.

What this PR is NOT

Test plan

  • pytest ami/ml/orchestration/tests/test_nats_queue.py — 18 tests pass
  • pytest ami/ml/orchestration/tests/test_cleanup.py — integration tests pass (real NATS + Redis)
  • black, isort, flake8, pyupgrade clean on all touched files (via pre-commit)
  • Manual verification in staging — lifecycle and cleanup lines land in job.logs.stdout end-to-end

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • Per‑job logging for async queueing and NATS lifecycle events, including one‑time lifecycle messages and a finalizing stats snapshot; cleanup accepts an optional job logger.
  • Bug Fixes

    • Publish failures and cleanup errors now surface in the job log with safe fallback; reduced duplicate lifecycle noise and lowered routine success verbosity.
  • Tests

    • Added tests for job‑scoped logging, lifecycle deduplication, finalizing stats ordering, error reporting, and logger fallback.

TaskQueueManager now accepts an optional job_logger. When set, lifecycle
events (stream/consumer create+reuse with state/config snapshot, publish
failures, cleanup deletions, and a forensic consumer-stats line before
deletion) are mirrored to the per-job logger in addition to the module
logger. The UI job log now reflects what the NATS layer is actually
doing for that specific job instead of a silent gap.

Per-message and per-poll paths (publish_task success, reserve_tasks,
acknowledge_task) intentionally stay on the module logger only — a
10k-image job would otherwise drown its own log. Lifecycle log lines are
deduped per manager session so a loop over N images still only emits a
single "Created NATS stream" line per job.

cleanup_async_job_resources and queue_images_to_nats pass job.logger
through to TaskQueueManager so real async_api jobs pick up the new
logging without further caller changes.

Closes #1220
@netlify
Copy link
Copy Markdown

netlify bot commented Apr 11, 2026

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit 766137c
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/69dd97a5e57a2900084855d7

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 11, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Extended orchestration and TaskQueueManager to accept an optional per-job logger, mirror selected NATS lifecycle and error messages into that job logger, add per-session deduplication of lifecycle messages, propagate job.logger from callers, adjust logging levels and async sync-to-async bridging, and add tests validating job-logger behavior.

Changes

Cohort / File(s) Summary
Orchestration callers
ami/ml/orchestration/jobs.py, ami/jobs/tasks.py
cleanup_async_job_resources signature now accepts `job_logger: logging.Logger
NATS queue manager
ami/ml/orchestration/nats_queue.py
Added optional job_logger to TaskQueueManager; implemented async _log(level, msg) to mirror lifecycle/error messages to the per-job logger (safely via sync_to_async), added per-session _streams_logged/_consumers_logged dedupe, rewrote _ensure_stream/_ensure_consumer to probe info and emit one-time create/reuse logs with stats/config, mirror publish/delete results/errors to job logger, and added _log_final_consumer_stats.
Tests: job-logger behavior
ami/ml/orchestration/tests/test_nats_queue.py
New async TestTaskQueueManagerJobLogger suite verifying creation vs reuse lifecycle messages (with stats/config) are emitted once to provided job logger, publish failures are mirrored to job logger, final consumer stats appear before deletion during cleanup, missing consumer tolerated, and job_logger=None remains safe.
Minor callsite adjustment
ami/ml/orchestration/jobs.py (call sites)
Callers updated to forward job.logger where appropriate; simplified one cleanup_async_job_resources(job_id) fallback path when job lookup fails.

Sequence Diagram(s)

sequenceDiagram
    participant Caller as Job / Caller
    participant TQM as TaskQueueManager
    participant NATS as NATS JetStream
    participant ModLog as Module Logger
    participant JobLog as Job Logger

    Caller->>TQM: __init__(..., job_logger=job.logger)
    Caller->>TQM: publish_task(job_id, payload)
    TQM->>NATS: stream_info(job_<id>)
    alt stream exists
        TQM->>ModLog: log(stream reuse)
        TQM->>JobLog: _log(stream reuse + stats)
    else stream missing
        TQM->>NATS: add_stream(job_<id>)
        TQM->>ModLog: log(stream created)
        TQM->>JobLog: _log(stream created)
    end
    TQM->>NATS: consumer_info(job_<id>)
    alt consumer exists
        TQM->>ModLog: log(consumer reuse)
        TQM->>JobLog: _log(consumer reuse + stats)
    else consumer missing
        TQM->>NATS: add_consumer(job_<id>)
        TQM->>ModLog: log(consumer created)
        TQM->>JobLog: _log(consumer created + config)
    end
    TQM->>NATS: publish()
    alt publish succeeds
        TQM->>ModLog: debug(success)
    else publish fails
        TQM->>ModLog: error(...)
        TQM->>JobLog: _log(ERROR, ...)
    end
    Caller->>TQM: cleanup_job_resources(job_id)
    TQM->>NATS: consumer_info(job_<id>)
    TQM->>JobLog: _log_final_consumer_stats(job_id)
    TQM->>NATS: delete_consumer / delete_stream
    TQM->>ModLog: deletion logs
    TQM->>JobLog: _log(deletion logs)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Poem

🐇 I hopped through streams and mirrored each cue,

Whispered lifecycle traces so jobs could view.
Created, reused, then finalized with care,
Errors echoed gently into job logs there.
A tiny hop for clarity — logs bloom everywhere.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 78.95% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely summarizes the main objective: making NATS queue activity visible in async ML job logs, which aligns with the primary change in the changeset.
Linked Issues check ✅ Passed The PR successfully implements the primary coding objective from #1220 (surface NATS lifecycle events in job logger with config/stats snapshots, forensic consumer logging before deletion, and publish error visibility). #1219 (distinguish transient Redis errors) is explicitly noted as out of scope for this PR.
Out of Scope Changes check ✅ Passed All changes are directly scoped to #1220 objectives: TaskQueueManager now accepts job_logger, lifecycle events are mirrored, dedup logic prevents redundant NATS calls, and callers are wired appropriately. No unrelated refactoring or scope creep detected.
Description check ✅ Passed The PR description comprehensively addresses all required template sections with substantial detail and context.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/nats-queue-job-logger

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@netlify
Copy link
Copy Markdown

netlify bot commented Apr 11, 2026

Deploy Preview for antenna-ssec canceled.

Name Link
🔨 Latest commit 766137c
🔍 Latest deploy log https://app.netlify.com/projects/antenna-ssec/deploys/69dd97a5f8d78f00088d2dc4

TaskQueueManager._log() was calling job_logger.log() synchronously from
inside async methods (_ensure_stream, _ensure_consumer, cleanup). That
triggers JobLogHandler.emit() which does a Django ORM refresh_from_db +
save — forbidden from an event loop. Every lifecycle line was silently
dropped with "Failed to save logs for job #N: You cannot call this from
an async context", defeating the point of the original change.

Convert _log to async and await sync_to_async(job_logger.log)(...) so
the ORM work runs in a thread. Update all call sites to await. Apply
the same fix to the publish-failure path in queue_images_to_nats.

Verified on ami-demo with job #74: lifecycle lines fired on module
logger but "Failed to save logs" errors swallowed the job-logger mirror.
@mihow mihow changed the title feat(nats): surface queue lifecycle events on the per-job logger (#1220) feat(jobs): make NATS queue activity visible in async ML job logs Apr 12, 2026
@mihow mihow marked this pull request as ready for review April 12, 2026 00:37
Copilot AI review requested due to automatic review settings April 12, 2026 00:37
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR makes NATS JetStream queue lifecycle events (stream/consumer setup, cleanup snapshots, and publish failures) visible in the per-job UI log by mirroring selected module-level logs to an optional job_logger, with async-safe bridging for Django ORM-backed log handlers.

Changes:

  • Add optional job_logger to TaskQueueManager and mirror key lifecycle/error logs via an async _log() helper using sync_to_async.
  • Wire job.logger into NATS publishing and cleanup paths so job logs include stream/consumer setup, delete events, and forensic pre-delete consumer stats.
  • Add unit tests validating mirroring, deduping, reuse stats formatting, cleanup snapshot ordering, and failure surfacing.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.

File Description
ami/ml/orchestration/nats_queue.py Adds job_logger support, async-safe log mirroring, lifecycle dedupe, and pre-delete consumer stats logging.
ami/ml/orchestration/jobs.py Passes per-job logger into TaskQueueManager for publish/cleanup and bridges one async error log write via sync_to_async.
ami/ml/orchestration/tests/test_nats_queue.py Adds unit tests covering job-logger mirroring behavior and dedupe/forensics expectations.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread ami/ml/orchestration/nats_queue.py
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
ami/ml/orchestration/nats_queue.py (1)

238-254: ⚠️ Potential issue | 🟠 Major

Only catch NotFoundError to detect missing consumers.

The current code catches all Exception exceptions, treating any consumer_info() failure as "consumer missing". If JetStream returns an auth, API, or transient error here, the code falls through to add_consumer(), which hides the real failure and can emit misleading lifecycle logs.

Change the broad except Exception to except nats.js.errors.NotFoundError so other exceptions propagate correctly. This requires adding the import: from nats.js.errors import NotFoundError.

Suggested fix
         try:
             info = await asyncio.wait_for(
                 self.js.consumer_info(stream_name, consumer_name),
                 timeout=NATS_JETSTREAM_TIMEOUT,
             )
             if job_id not in self._consumers_logged:
                 await self._log(
                     logging.INFO,
                     f"Reusing NATS consumer {consumer_name} ({self._format_consumer_stats(info)})",
                 )
                 self._consumers_logged.add(job_id)
             return
         except asyncio.TimeoutError:
             raise  # NATS unreachable — let caller handle it
-        except Exception:
+        except nats.js.errors.NotFoundError:
             # Consumer doesn't exist, fall through to create it.
             pass

Also add to imports near the top: from nats.js.errors import NotFoundError

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/orchestration/nats_queue.py` around lines 238 - 254, The try/except in
the block calling self.js.consumer_info incorrectly swallows all exceptions
(masking auth/API/transient errors) — replace the broad "except Exception" with
"except NotFoundError" (from nats.js.errors) so only missing-consumer errors
fall through to add_consumer; add the import "from nats.js.errors import
NotFoundError" near the top and keep the existing asyncio.TimeoutError handling
and logging using self._log, consumer_name, _format_consumer_stats, and
_consumers_logged unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ami/ml/orchestration/jobs.py`:
- Around line 47-50: The cleanup function currently forwards whatever _logger it
receives into TaskQueueManager; change cleanup_async_job_resources to accept
job_logger: logging.Logger | None = None and only forward an actual per-job
logger (job.logger) from callers — update callers (e.g., where Job lookup
succeeds) to pass job.logger, and where Job lookup fails pass None; inside
cleanup_async_job_resources use a conditional context: if job_logger is not None
then "async with TaskQueueManager(job_logger=job_logger) as manager" else "async
with TaskQueueManager() as manager" so unrelated module loggers are not used for
per-job NATS lifecycle logging.

---

Outside diff comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 238-254: The try/except in the block calling self.js.consumer_info
incorrectly swallows all exceptions (masking auth/API/transient errors) —
replace the broad "except Exception" with "except NotFoundError" (from
nats.js.errors) so only missing-consumer errors fall through to add_consumer;
add the import "from nats.js.errors import NotFoundError" near the top and keep
the existing asyncio.TimeoutError handling and logging using self._log,
consumer_name, _format_consumer_stats, and _consumers_logged unchanged.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: ef374171-e1d2-4120-8bd7-e1dce1735926

📥 Commits

Reviewing files that changed from the base of the PR and between 8f8b177 and 7e8ce28.

📒 Files selected for processing (3)
  • ami/ml/orchestration/jobs.py
  • ami/ml/orchestration/nats_queue.py
  • ami/ml/orchestration/tests/test_nats_queue.py

Comment thread ami/ml/orchestration/jobs.py Outdated
…gger to TaskQueueManager in cleanup

Addresses CodeRabbit review on #1222.

1. _ensure_consumer caught broad Exception when consumer_info() failed,
   masking auth/API/transient JetStream errors as "consumer missing" and
   emitting misleading creation logs. Narrowed to NotFoundError to match
   the pattern already used in _ensure_stream (line 208).

2. cleanup_async_job_resources forwarded its `_logger` argument into
   TaskQueueManager as `job_logger`. One caller (_fail_job on the
   Job.DoesNotExist path in ami/jobs/tasks.py:198) passes a plain module
   logger, which would then have cleanup lifecycle lines mirrored into an
   unrelated logger via sync_to_async. Added a separate `job_logger`
   parameter, defaulted to None, and updated the two callers that have
   real job context (_fail_job happy path, cleanup_async_job_if_needed)
   to pass `job.logger` explicitly. The DoesNotExist path leaves
   job_logger=None, so TaskQueueManager falls through to the module
   logger only.

Tests: 18/18 in test_nats_queue.py pass, pre-commit clean.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
ami/ml/orchestration/nats_queue.py (1)

375-395: Consider passing job_logger in _ack_task_via_nats for consistency.

The _ack_task_via_nats function in ami/jobs/tasks.py (lines 204-211) has access to job_logger but doesn't forward it to TaskQueueManager. While acknowledge_task only logs at debug/error level and doesn't involve lifecycle events, passing job_logger would ensure any acknowledgment failures surface in the job log.

This is a minor consistency improvement and can be deferred if desired.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/orchestration/nats_queue.py` around lines 375 - 395, Update
TaskQueueManager.acknowledge_task to accept an optional job_logger parameter and
use it instead of the module-level logger for debug/error outputs; then update
the caller _ack_task_via_nats in ami/jobs/tasks.py to forward its job_logger
when calling TaskQueueManager.acknowledge_task so acknowledgment failures are
logged to the job-specific logger. Ensure the new parameter is optional
(defaulting to None) to maintain backward compatibility and only use job_logger
if provided.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 375-395: Update TaskQueueManager.acknowledge_task to accept an
optional job_logger parameter and use it instead of the module-level logger for
debug/error outputs; then update the caller _ack_task_via_nats in
ami/jobs/tasks.py to forward its job_logger when calling
TaskQueueManager.acknowledge_task so acknowledgment failures are logged to the
job-specific logger. Ensure the new parameter is optional (defaulting to None)
to maintain backward compatibility and only use job_logger if provided.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 3d05cb6e-5e33-404c-926f-5e2a45ac59d3

📥 Commits

Reviewing files that changed from the base of the PR and between 7e8ce28 and 439300a.

📒 Files selected for processing (3)
  • ami/jobs/tasks.py
  • ami/ml/orchestration/jobs.py
  • ami/ml/orchestration/nats_queue.py

…e job_logger pattern

Drop the separate `_logger` parameter that was redundant with `job_logger`.
Follow the existing convention (e.g. save_results in ami/jobs/tasks.py):
`_log = job_logger or logger` — use per-job logger when available, module
logger otherwise.

Callers now read cleanly:
  cleanup_async_job_resources(job.pk, job_logger=job.logger)  # has job
  cleanup_async_job_resources(job_id)                         # no job

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ami/ml/orchestration/jobs.py`:
- Around line 120-127: The except block that currently awaits
sync_to_async(job.logger.error)(...) should also emit the full traceback to the
module logger so ops logs keep stack traces; add a module-level
logger.exception(...) call (using the module-level logger variable) inside the
except handler (e.g., in the same except Exception as e in publish/publish_task
path) before or immediately after the sync_to_async(job.logger.error) call so
the exception and traceback are recorded, while keeping the existing
job.logger.error call to write the message to job.logs.stdout.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 8803dbbe-b042-4439-a8ce-bba8e51ce8c0

📥 Commits

Reviewing files that changed from the base of the PR and between 439300a and 9007197.

📒 Files selected for processing (2)
  • ami/jobs/tasks.py
  • ami/ml/orchestration/jobs.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • ami/jobs/tasks.py

Comment thread ami/ml/orchestration/jobs.py
Comment thread ami/ml/orchestration/nats_queue.py Outdated
Comment thread ami/ml/orchestration/nats_queue.py Outdated
# Per-message success logs stay at module level (noise in 10k-image
# jobs), but a failure on even a single publish deserves to surface
# in the job log — otherwise the failure path is invisible to users.
await self._log(logging.ERROR, f"Failed to publish task to stream for job '{job_id}': {e}")
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very helpful!

…dcoding in creation log

The "Created NATS consumer" log line was hardcoding max_deliver=5 and
interpolating TASK_TTR for ack_wait. Now reads from the ConsumerInfo
returned by add_consumer(), so the log always reflects what the server
accepted. Added _format_consumer_config() alongside the existing
_format_consumer_stats() for the two different log contexts (creation
vs runtime stats).

Updated test mocks to return ConsumerInfo-like objects with a config
sub-object from add_consumer.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
ami/ml/orchestration/nats_queue.py (1)

217-226: Consider defensive access for info.state fields.

While StreamInfo.state should always be present when stream_info() succeeds, the code accesses info.state.messages and info.state.last_seq directly. For consistency with the defensive pattern used in _format_consumer_stats(), you might want to handle potential None values.

♻️ Optional defensive access
             if job_id not in self._streams_logged:
+                state = info.state
+                messages = state.messages if state else "?"
+                last_seq = state.last_seq if state else "?"
                 await self._log(
                     logging.INFO,
                     f"Reusing NATS stream {stream_name} "
-                    f"(messages={info.state.messages}, last_seq={info.state.last_seq})",
+                    f"(messages={messages}, last_seq={last_seq})",
                 )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/orchestration/nats_queue.py` around lines 217 - 226, The code accesses
info.state.messages and info.state.last_seq directly; make this defensive like
_format_consumer_stats by first grabbing state = info.state or a default, then
use getattr(state, "messages", 0) and getattr(state, "last_seq", 0) (or sensible
string defaults) when building the log message in the block that calls
self.js.stream_info(stream_name), and keep using self._log and
self._streams_logged as before so the log prints safe values even if info.state
is None.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 121-127: The ack_wait value from ConsumerInfo.config (ack_wait) is
in nanoseconds and is being logged directly, producing misleading values; update
the string construction that builds the consumer config summary (the return that
references cfg.ack_wait) to convert ack_wait from nanoseconds to seconds by
dividing by 1e9 when present so the log shows human-readable seconds (e.g., 30s)
instead of nanoseconds.

---

Nitpick comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 217-226: The code accesses info.state.messages and
info.state.last_seq directly; make this defensive like _format_consumer_stats by
first grabbing state = info.state or a default, then use getattr(state,
"messages", 0) and getattr(state, "last_seq", 0) (or sensible string defaults)
when building the log message in the block that calls
self.js.stream_info(stream_name), and keep using self._log and
self._streams_logged as before so the log prints safe values even if info.state
is None.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: d5c4336b-33b5-4131-ae23-a5487ad2a802

📥 Commits

Reviewing files that changed from the base of the PR and between 9007197 and f8d09ac.

📒 Files selected for processing (2)
  • ami/ml/orchestration/nats_queue.py
  • ami/ml/orchestration/tests/test_nats_queue.py
✅ Files skipped from review due to trivial changes (1)
  • ami/ml/orchestration/tests/test_nats_queue.py

Comment thread ami/ml/orchestration/nats_queue.py
…m rendering in config log

Three cleanup items found on self-review:

1. _ensure_stream and _ensure_consumer were calling stream_info/consumer_info
   on every publish_task (once per image). Since the stream and consumer are
   never deleted mid-flight (cleanup uses a separate manager session), added
   an early return when job_id is already in the logged set. Saves 2 NATS
   round-trips per image after the first.

2. _format_consumer_config was rendering enum fields as "DeliverPolicy.ALL"
   instead of "all" — added a _val() helper that unwraps .value when present.

3. Removed now-redundant inner dedup checks (the early return makes them
   always-true).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
ami/ml/orchestration/nats_queue.py (1)

126-132: ⚠️ Potential issue | 🟡 Minor

Verify ack_wait units before rendering as seconds.

At Line 128, ack_wait is logged with an s suffix directly from ConsumerInfo.config. If this value is nanoseconds in your nats-py version, the UI log will be misleading.

In the exact nats.py version used by this repository, what unit is `ConsumerInfo.config.ack_wait` returned in by `consumer_info()` / `add_consumer()` responses: seconds or nanoseconds?

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: b5e8d277-43a6-40ab-88b0-fce7f830e46d

📥 Commits

Reviewing files that changed from the base of the PR and between f8d09ac and 4c62d34.

📒 Files selected for processing (1)
  • ami/ml/orchestration/nats_queue.py

mihow and others added 6 commits April 13, 2026 15:03
…tate access

Addresses two CodeRabbit findings:

1. queue_images_to_nats publish exception path only wrote a stringified
   error to the job log. Added logger.exception() on the module logger so
   ops dashboards keep the full traceback. The job.logger.error bridge
   still writes the user-facing message.

2. _ensure_stream accessed info.state.messages / info.state.last_seq
   without defending against None, unlike _format_consumer_stats which
   already does. Match the defensive pattern so the reuse line doesn't
   blow up if the server ever returns a StreamInfo with no state.

Also pushing back on the "ack_wait nanoseconds" finding in the review
thread — nats-py does convert it to seconds in from_response via
_convert_nanoseconds (source: val / _NANOSECOND where _NANOSECOND = 1e9).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…sources

Match the pattern used by save_results in ami/jobs/tasks.py: take only
job_id, resolve job.logger internally, fall back to the module logger
when the Job row is missing. Keeps call sites consistent across the
codebase — cleanup_async_job_resources(job.pk) everywhere — and makes
the job object available inside the function for future use.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…h TaskQueueManager._log

Two _log identifiers in the same module had incompatible semantics:
- nats_queue.TaskQueueManager._log(level, msg) — async coroutine that
  fans out to module + job loggers with sync_to_async bridging.
- jobs.cleanup_async_job_resources local _log = job_logger or logger —
  a plain Logger instance called as _log.info(...) / .error(...).

Rename the local to job_logger and assign directly from
`job.logger if job else logger`, matching the save_results pattern.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Drop redundant ternary: cfg = info.config, not `info.config if info.config is not None else None`
- Merge two-line f-string in _log_final_consumer_stats
- Drop redundant `except asyncio.TimeoutError: raise` in _ensure_consumer
  (now that the catch is narrowed to NotFoundError, TimeoutError propagates
  naturally — matches _ensure_stream style)
- Explicit comment on the intentionally-broad `except Exception` in
  _log_final_consumer_stats clarifying it's different from _ensure_consumer

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Rename TaskQueueManager._log → log_async and route every log line in the
queue_images_to_nats async block through it (debug, error + traceback).
Drops the ad-hoc sync_to_async(job.logger.error) bridge and the separate
logger.exception call — one consistent API, one place that knows how to
bridge JobLogHandler's ORM save through sync_to_async.

log_async also now accepts exc_info=True so callers don't need to pair a
module-only logger.exception with a job-logger error call.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
log_async currently mirrors granular per-job lifecycle to both the module
and job loggers. Document why this is intentional for now (async ML
processing still stabilizing, stdout visibility helping us debug) and
the eventual target shape (route to job logger only at INFO/DEBUG, mirror
at WARNING+). Matches the pattern in ami.jobs.tasks.save_results, where
job.logger.propagate=False keeps granular per-job state out of ops logs.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
mihow and others added 2 commits April 13, 2026 18:19
…dedup

The original docstring claimed the stream/consumer won't be deleted
mid-flight because cleanup uses a separate manager session. That's
incomplete — Job.cancel() runs cleanup_async_job_resources in the
request thread while queue_images_to_nats is still in its publish loop
in the Celery worker. So a concurrent delete across manager sessions
is possible.

The early-return is still safe in that scenario, but for a different
reason than the original claim: downstream publish_task fails loudly
(returns False, logs ERROR) when the stream is gone, rather than
silently recreating an orphan stream without a consumer (which is what
the non-deduped baseline would do).

Updates _ensure_stream and _ensure_consumer docstrings to describe the
actual safety argument accurately.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Without this gate, every log_async call fires the job-logger mirror
through sync_to_async — a ThreadPoolExecutor submit per call —
regardless of whether the effective level would drop the record. For a
10k-image queue this amounts to 10k unnecessary thread-pool submissions
when DEBUG is off.

stdlib Logger.log does the same isEnabledFor check internally before
formatting. We need to do it explicitly here because the mirror goes
through sync_to_async, bypassing the in-logger short-circuit.

No behavior change when at least one logger is enabled for the level;
pure short-circuit when both are gated out.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@mihow mihow merged commit 1c6be7a into main Apr 14, 2026
7 checks passed
@mihow mihow deleted the feat/nats-queue-job-logger branch April 14, 2026 01:53
mihow added a commit that referenced this pull request Apr 14, 2026
…pshots

Two Celery Beat periodic tasks so operators can see ML job health without
waiting for a job to finish:

1. check_stale_jobs_task (every 15 min) — thin wrapper around the existing
   check_stale_jobs() that reconciles jobs stuck in running states past
   FAILED_CUTOFF_HOURS. Previously the function existed but was only
   reachable via the update_stale_jobs management command, so nothing
   ran it automatically.

2. log_running_async_job_stats (every 5 min) — for each incomplete
   async_api job, opens a TaskQueueManager with that job's logger and
   logs a delivered/ack_floor/num_pending/num_ack_pending/num_redelivered
   snapshot of the NATS consumer. Read-only; no status changes.
   Builds on the lifecycle-logging landed in #1222 so long-running jobs
   now get mid-flight visibility, not just create + cleanup snapshots.

Both registered via migration 0020 so existing deployments pick them up
on the next migrate without manual Beat configuration.

Co-Authored-By: Claude <noreply@anthropic.com>
mihow added a commit that referenced this pull request Apr 15, 2026
* feat(jobs): schedule periodic stale-job reconcile + NATS consumer snapshots

Two Celery Beat periodic tasks so operators can see ML job health without
waiting for a job to finish:

1. check_stale_jobs_task (every 15 min) — thin wrapper around the existing
   check_stale_jobs() that reconciles jobs stuck in running states past
   FAILED_CUTOFF_HOURS. Previously the function existed but was only
   reachable via the update_stale_jobs management command, so nothing
   ran it automatically.

2. log_running_async_job_stats (every 5 min) — for each incomplete
   async_api job, opens a TaskQueueManager with that job's logger and
   logs a delivered/ack_floor/num_pending/num_ack_pending/num_redelivered
   snapshot of the NATS consumer. Read-only; no status changes.
   Builds on the lifecycle-logging landed in #1222 so long-running jobs
   now get mid-flight visibility, not just create + cleanup snapshots.

Both registered via migration 0020 so existing deployments pick them up
on the next migrate without manual Beat configuration.

Co-Authored-By: Claude <noreply@anthropic.com>

* fix(jobs): address review comments on PR #1227

- log_running_async_job_stats: reuse a single TaskQueueManager per tick
  instead of opening N NATS connections. Cost is now O(1) in the number
  of running async jobs rather than O(n). Guarded outer connection setup
  so a NATS outage drops the tick cleanly instead of crashing the task.
- check_stale_jobs_task: bump expires from 10 to 14 minutes so a delayed
  copy still runs within the 15-minute schedule instead of expiring before
  a worker picks it up under broker pressure.
- migration 0020: use apps.get_model for django_celery_beat models and
  declare an explicit migration dependency so the data migration uses
  historical model state.

Co-Authored-By: Claude <noreply@anthropic.com>

* fix(jobs): fall back to per-job manager when shared path fails

If opening the shared TaskQueueManager raises at setup (or teardown), try
each job with its own fresh manager before giving up. Defends against a
regression in the shared mutation pattern silently costing us snapshot
visibility for every running async job on every tick. If NATS itself is
down, the fallback loop will fail too and log once per job — same
end-state as before the refactor, just more noisy.

Co-Authored-By: Claude <noreply@anthropic.com>

* refactor(jobs): rename beat task to jobs_health_check umbrella

The periodic task is renamed from `check_stale_jobs_task` to
`jobs_health_check` so new job-health checks can share the 15-minute
cadence and `expires` guarantees without a new beat entry. Current
body runs a single `_run_stale_jobs_check()`; future checks plug in
alongside it and return the same `{checked, fixed, unfixable}` shape.

Names chosen to parallel #1188's integrity-check pattern:
- Beat task = noun phrase (`jobs_health_check`) — reads well in flower
- (Future) management command = verb (`manage.py check_jobs`)

Migration 0020 updated in place since it hasn't shipped yet.

Co-Authored-By: Claude <noreply@anthropic.com>

* refactor(jobs): fold snapshot task into umbrella, adopt IntegrityCheckResult

Addresses the takeaway-review findings on PR #1227:

- Fix reverse migration: delete the row we create. The forward step
  registers `jobs.health_check` but the old `delete_periodic_tasks`
  still referenced the pre-rename `jobs.check_stale_jobs`, leaving a
  stranded row on rollback.
- Collapse `log_running_async_job_stats` into a sub-check of the
  umbrella. Drops the second `PeriodicTask` and the shared-connection
  fallback path — on a 15-minute cadence there's no reason to keep
  two beat tasks or defend against a per-manager bug, and a quietly
  hung job now gets a snapshot in the same tick the reconciler will
  decide whether to revoke it.
- Adopt `IntegrityCheckResult` as the shared sub-check shape, housed
  in a new `ami.main.checks.schemas` module so PR #1188 can re-target
  its import without a merge-order dance. Wrap the two sub-checks in
  a `JobsHealthCheckResult` parent dataclass; the umbrella returns
  `dataclasses.asdict(...)` for celery's JSON backend. Observation
  checks leave `fixed=0` and count per-item errors in `unfixable`.

Tests collapse to one `JobsHealthCheckTest` class covering both
sub-checks (reconcile + snapshot) and the edge cases that matter:
per-job snapshot failure, shared-connection setup failure, non-async
jobs skipped, idle deployment returns all zeros.

Co-Authored-By: Claude <noreply@anthropic.com>

* fix(jobs): isolate sub-checks + pre-resolve loggers off event loop

Addresses review findings from the re-review on 6dc7e6e:

- The umbrella had no guard around sub-check calls, so a DB hiccup in
  ``_run_stale_jobs_check`` would kill the snapshot check and fail the
  whole task. Wrap each call in ``_safe_run_sub_check``, which catches,
  logs, and returns ``IntegrityCheckResult(unfixable=1)`` as a sentinel
  — operators watching the task result in Flower see the sub-check
  failed rather than reading all-zeros and assuming all-clear.
- ``Job.logger`` attaches a ``JobLogHandler`` on first access which
  touches the ORM; the file's own docstring says resolve outside the
  event loop, but two accesses were inside the coroutine. Pre-resolve
  into a list of ``(job, job_logger)`` tuples before entering
  ``async_to_sync``.
- Escalate the ``running_job_snapshots`` summary log to WARNING when
  ``errors > 0`` so persistent NATS unavailability is distinguishable
  from a quiet tick in aggregated logs.
- Document that the outer shared-connection except overwrites
  per-iteration error counts on the rare ``__aexit__`` teardown path.

New tests:
- ``test_sub_check_exception_does_not_block_the_other`` — patches the
  snapshot sub-check to raise; stale-jobs still reports correctly and
  snapshots come back as the ``unfixable=1`` sentinel.
- ``test_stale_jobs_fixed_counts_celery_updated_and_revoked_paths`` —
  one stale job with a terminal Celery ``task_id``, one without; both
  branches of ``fixed`` counted so a future refactor dropping one
  branch breaks the test.
- Explicit ``fixed == 0`` assertion in the snapshot test locks the
  observation-only contract.

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

2 participants