diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 4d6762793..ef433d6f1 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -45,7 +45,13 @@ def run_job(self, job_id: int) -> None: raise e # self.retry(exc=e, countdown=1, max_retries=1) else: - job.logger.info(f"Running job {job}") + # Log the Redis target at task start so cross-host DB-index drift surfaces + # in every job's log without needing to know where to look. The cluster + # convention is DB 0 = cache, DB 1 = Celery results (see + # config/settings/base.py); the Job state manager also uses the "default" + # connection, so every worker in the pool must agree on the DB number or + # initialize_job writes to one DB while update_state reads from another. + job.logger.info(f"Running job {job} on {_describe_redis_target()}") try: job.run() except Exception as e: @@ -171,9 +177,17 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub if not progress_info: # State keys genuinely missing (the total-images key returned None). # Ack so NATS stops redelivering and fail the job — there's no state - # left to reconcile against. + # left to reconcile against. The reason string is built from a live + # Redis snapshot (DB index, keys present under job:{id}:*) so the + # FAILURE log and the UI progress.errors entry name the actual cause + # instead of the previous hardcoded "likely cleaned up concurrently" + # guess — which conflated DB-index misconfig, eviction, and genuine + # concurrent cleanup into a single misleading string. _ack_task_via_nats(reply_subject, logger) - _fail_job(job_id, "Job state keys not found in Redis (likely cleaned up concurrently)") + _fail_job( + job_id, + f"Job state missing from Redis (stage=process): {state_manager.diagnose_missing_state()}", + ) return try: @@ -255,7 +269,10 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub # first so NATS stops redelivering a message whose state is gone, # then fail the job. Mirrors the stage=process missing-state path. _ack_task_via_nats(reply_subject, job.logger) - _fail_job(job_id, "Job state keys not found in Redis (likely cleaned up concurrently)") + _fail_job( + job_id, + f"Job state missing from Redis (stage=results): {state_manager.diagnose_missing_state()}", + ) return # update complete state based on latest progress info after saving results @@ -304,6 +321,24 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub job.logger.error(error) +def _describe_redis_target() -> str: + """Return a ``host:port/dbN`` string for the "default" Redis connection. + + Logged at the start of every ``run_job`` so DB-index drift across hosts + (the class of misconfig that manifests as silent ``process_nats_pipeline_result`` + FAILUREs on whichever worker happens to read state from the wrong DB) is + visible in each job's log without requiring a separate diagnostic. + """ + try: + from django_redis import get_redis_connection + + redis = get_redis_connection("default") + kwargs = getattr(redis.connection_pool, "connection_kwargs", {}) or {} + return f"redis={kwargs.get('host', '?')}:{kwargs.get('port', '?')}/db{kwargs.get('db', '?')}" + except Exception as e: + return f"redis=(unavailable: {e})" + + def _fail_job(job_id: int, reason: str) -> None: from ami.jobs.models import Job, JobState from ami.ml.orchestration.jobs import cleanup_async_job_resources @@ -313,6 +348,15 @@ def _fail_job(job_id: int, reason: str) -> None: job = Job.objects.select_for_update().get(pk=job_id) if job.status in (JobState.CANCELING, *JobState.final_states()): return + # Mirror the reason into progress.errors so the UI surfaces it + # alongside the FAILURE state. Previously the reason lived only in + # job.logger, which meant the UI showed errors=[] and operators had + # to dig into Celery worker logs to find out why a job died. + try: + job.progress.errors.append(reason) + except Exception: + # Don't let diagnostic-write failures mask the original FAILURE. + pass job.update_status(JobState.FAILURE, save=False) job.finished_at = datetime.datetime.now() job.save(update_fields=["status", "progress", "finished_at"]) diff --git a/ami/jobs/tests/test_tasks.py b/ami/jobs/tests/test_tasks.py index 145e70399..8dfacd553 100644 --- a/ami/jobs/tests/test_tasks.py +++ b/ami/jobs/tests/test_tasks.py @@ -346,10 +346,14 @@ def test_genuinely_missing_state_acks_and_fails_job(self, mock_manager_class, mo mock_ack.assert_called_once() mock_fail.assert_called_once() - # New, accurate message — no longer the misleading "Redis state missing" - # that users saw in the UI for transient connection drops. + # Reason string now leads with the stage and embeds a live Redis + # snapshot (DB index + key listing from diagnose_missing_state) so the + # failure cause — DB-index drift, eviction, or never-initialized — + # is visible in the FAILURE log instead of the previous single + # hardcoded "likely cleaned up concurrently" guess. args, _ = mock_fail.call_args - self.assertIn("Job state keys not found in Redis", args[1]) + self.assertIn("Job state missing from Redis", args[1]) + self.assertIn("stage=process", args[1]) @patch("ami.jobs.tasks._ack_task_via_nats") @patch("ami.jobs.tasks.TaskQueueManager") @@ -625,6 +629,95 @@ def test_task_failure_marks_sync_api_job_failure_and_cleans_up(self, mock_cleanu mock_cleanup.assert_called_once() +class TestFailJob(TransactionTestCase): + """ + Regression tests for ``_fail_job`` — specifically for the reason-string + mirroring into ``progress.errors`` that this PR adds. + + The FAILURE log line alone is not enough for operators; the UI reads + ``progress.errors``, and prior to this PR that list stayed empty on the + missing-Redis-state path. Any regression that stops appending the reason + (e.g. silently dropping it via the defensive ``try/except``) would put + operators back in the position of digging through Celery worker logs to + find out why a job died. + """ + + def setUp(self): + cache.clear() + self.project = Project.objects.create(name="FailJob Test Project") + self.pipeline = Pipeline.objects.create(name="FailJob Pipeline", slug="fail-job-pipeline") + self.pipeline.projects.add(self.project) + self.collection = SourceImageCollection.objects.create(name="FailJob Collection", project=self.project) + + def tearDown(self): + cache.clear() + + def _make_job(self, dispatch_mode: JobDispatchMode = JobDispatchMode.ASYNC_API) -> Job: + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name=f"{dispatch_mode} fail-job test", + pipeline=self.pipeline, + source_image_collection=self.collection, + dispatch_mode=dispatch_mode, + ) + job.update_status(JobState.STARTED, save=True) + return job + + @patch("ami.ml.orchestration.jobs.cleanup_async_job_resources") + def test_fail_job_appends_reason_to_progress_errors(self, mock_cleanup): + """ + Reason string must end up in ``job.progress.errors`` (persisted) so the + UI shows the cause of the FAILURE alongside the status change. Before + this PR the reason lived only in ``job.logger`` and the UI showed + ``errors=[]``. A silent regression here would not be caught by the + ``_fail_job`` call-site tests in ``TestProcessNatsPipelineResultError`` + (they mock ``_fail_job`` entirely). + """ + from ami.jobs.tasks import _fail_job + + job = self._make_job() + reason = "Job state missing from Redis (stage=process): redis=host:6379/db1 keys_for_job=" + + _fail_job(job.pk, reason) + + job.refresh_from_db() + self.assertEqual(job.status, JobState.FAILURE) + self.assertIn( + reason, + job.progress.errors, + f"expected reason in progress.errors, got: {job.progress.errors!r}", + ) + # Sanity: the fix also propagates to the DB-persisted copy (i.e. the + # update_fields tuple on job.save includes 'progress'). Re-read from a + # fresh Job instance to prove the append wasn't only visible on the + # in-memory object returned by select_for_update. + reloaded = Job.objects.get(pk=job.pk) + self.assertIn(reason, reloaded.progress.errors) + mock_cleanup.assert_called_once_with(job.pk) + + @patch("ami.ml.orchestration.jobs.cleanup_async_job_resources") + def test_fail_job_is_noop_on_already_final_job(self, mock_cleanup): + """ + If the job is already in a final state (e.g. concurrent cleanup + beat us), ``_fail_job`` must return early without touching status + or progress. This protects against double-failing a job that has + already been reconciled to SUCCESS by the reconciler path. + """ + from ami.jobs.tasks import _fail_job + + job = self._make_job() + job.update_status(JobState.SUCCESS, save=True) + errors_before = list(job.progress.errors) + + _fail_job(job.pk, "should be ignored") + + job.refresh_from_db() + self.assertEqual(job.status, JobState.SUCCESS) + self.assertEqual(job.progress.errors, errors_before) + mock_cleanup.assert_not_called() + + class TestResultEndpointWithError(APITestCase): """Integration test for the result API endpoint with error results.""" diff --git a/ami/ml/orchestration/async_job_state.py b/ami/ml/orchestration/async_job_state.py index 77efa1d25..5f4bd2dc7 100644 --- a/ami/ml/orchestration/async_job_state.py +++ b/ami/ml/orchestration/async_job_state.py @@ -163,6 +163,18 @@ def update_state( newly_removed = results[0] if processed_image_ids else 0 if total_raw is None: + # Loud diagnostic before the silent None return. The caller will mark + # the job FAILURE based on this result, so the operator needs to see + # *why* the total key is gone. Distinguishes three different causes + # that previously all surfaced as the same hardcoded "likely cleaned + # up concurrently" reason string: Redis DB mismatch across hosts, + # key eviction, and genuinely-never-initialized state. + logger.warning( + "Job %s state missing in Redis (stage=%s): %s", + self.job_id, + stage, + self.diagnose_missing_state(), + ) return None total = int(total_raw) @@ -182,6 +194,53 @@ def update_state( newly_removed=newly_removed, ) + def diagnose_missing_state(self) -> str: + """ + One-line snapshot of what Redis actually holds for this job. + + Called from the missing-state path in ``update_state`` and from + ``_fail_job`` so the FAILURE log and the UI ``progress.errors`` entry + distinguish the three common causes — DB mismatch across hosts, key + eviction, and never-initialized state — instead of a single hardcoded + "likely cleaned up concurrently" guess that all three collapse to. + + Cost: the internal ``SCAN`` runs only on the failure path (once per + job-lifetime FAILURE), and the per-job key fanout is at most four + (pending:process, pending:results, failed, total), so the cost is + negligible compared to the FAILURE branch it only helps diagnose. + + Intentionally defensive: any failure to collect diagnostics is + swallowed, because the caller is already about to fail the job and + an exception from diagnostics would mask the original cause. + """ + try: + redis = self._get_redis() + kwargs = getattr(redis.connection_pool, "connection_kwargs", {}) or {} + db = kwargs.get("db", "?") + host = kwargs.get("host", "?") + port = kwargs.get("port", "?") + # Cursor-safe SCAN over the job's keyspace — cheap even on a busy + # Redis because it's filtered server-side and the per-job fanout is + # at most a handful of keys (pending:process, pending:results, + # failed, total). + keys = sorted(k.decode() if isinstance(k, bytes) else k for k in redis.scan_iter(match=self._pattern())) + sizes: list[str] = [] + for key in keys: + if key == self._total_key: + sizes.append(f"{key}=") + continue + try: + sizes.append(f"{key}=SCARD:{redis.scard(key)}") + except RedisError: + sizes.append(f"{key}=") + keys_summary = ", ".join(sizes) if sizes else "" + return f"redis={host}:{port}/db{db} keys_for_job={keys_summary}" + except Exception as e: + return f"(diagnostics failed: {e})" + + def _pattern(self) -> str: + return f"job:{self.job_id}:*" + def get_progress(self, stage: str) -> "JobStateProgress | None": """Read-only progress snapshot for the given stage.""" try: diff --git a/ami/ml/tests.py b/ami/ml/tests.py index 6bc55bdf1..27466720a 100644 --- a/ami/ml/tests.py +++ b/ami/ml/tests.py @@ -1407,6 +1407,35 @@ def test_update_state_returns_none_when_state_genuinely_missing(self): progress = self.manager.update_state({"img1", "img2"}, "process") self.assertIsNone(progress) + def test_diagnose_missing_state_when_never_initialized(self): + """ + Diagnostic string for the "never initialized" case: no keys are + present under ``job:{id}:*``. Output must still name the Redis host + and DB so cross-host DB drift is distinguishable from eviction and + truly-never-initialized state in one log line. + """ + # initialize_job has NOT been called; nothing under job:123:*. + diagnosis = self.manager.diagnose_missing_state() + self.assertIn("redis=", diagnosis) + self.assertIn("/db", diagnosis) + self.assertIn("keys_for_job=", diagnosis) + + def test_diagnose_missing_state_lists_present_keys(self): + """ + Diagnostic string for the partial-cleanup / eviction case: some keys + remain under ``job:{id}:*`` and their SCARDs should appear so the + operator can tell "total key evicted but pending sets still present" + from "nothing here, this DB never saw the job". + """ + self.manager.initialize_job(self.image_ids) + # Drop the total key to simulate eviction while pending sets survive. + redis = self.manager._get_redis() + redis.delete(self.manager._total_key) + + diagnosis = self.manager.diagnose_missing_state() + self.assertIn(f"job:{self.job_id}:pending_images:process=SCARD:", diagnosis) + self.assertNotIn(self.manager._total_key, diagnosis) + class TestSaveResultsRefreshesDeploymentCounts(TestCase): """save_results must refresh Deployment cached counts, not just Event counts.