feat: add check_occurrences for occurrence data integrity#1188
feat: add check_occurrences for occurrence data integrity#1188
Conversation
✅ Deploy Preview for antenna-preview canceled.
|
✅ Deploy Preview for antenna-ssec canceled.
|
📝 WalkthroughWalkthroughIntroduces a data integrity checking system for occurrence and detection relationships across the project. Includes a core checking module with optional automated fixes, a Django management command for manual execution, a Celery periodic task for scheduled monitoring, comprehensive test coverage, and design specification documentation. Changes
Sequence DiagramsequenceDiagram
participant User as User/Scheduler
participant Cmd as Management Command<br/>or Celery Task
participant Module as check_occurrences<br/>Module
participant DB as Database
participant Log as Logger
User->>Cmd: Execute with project_id, fix
Cmd->>Module: call check_occurrences(project_id, fix)
Module->>DB: Query occurrences missing determinations
DB-->>Module: Return missing determination IDs
Module->>DB: Query orphaned occurrences
DB-->>Module: Return orphaned occurrence IDs
Module->>DB: Query orphaned detections
DB-->>Module: Return orphaned detection IDs
alt fix is True
Module->>DB: Update missing determinations
DB-->>Module: Confirm updates
Module->>DB: Delete orphaned occurrences
DB-->>Module: Confirm deletions
Module->>Log: Log fixed counts and summary
else fix is False
Module->>Log: Log issue counts and summary
end
Module-->>Cmd: Return OccurrenceCheckReport
Cmd->>User: Display summary output
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
Adds a reusable data-integrity check for occurrences/detections, with optional repair actions, and exposes it via a management command, a Celery task, and documentation to support ongoing monitoring and manual remediation.
Changes:
- Introduces
check_occurrences()and anOccurrenceCheckReportfor detecting (and optionally fixing) common occurrence/detection integrity issues. - Adds a
check_occurrencesmanagement command and a Celery task for periodic/reporting usage. - Adds tests and a design/spec document describing queries, fixes, and operational usage.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| docs/superpowers/specs/2026-03-25-check-occurrences-design.md | Design notes/spec for the new integrity check and operational plan. |
| ami/main/checks.py | Implements check_occurrences() and OccurrenceCheckReport. |
| ami/main/management/commands/check_occurrences.py | Adds CLI entrypoint to run the check (optionally with --fix). |
| ami/main/tasks.py | Adds Celery task to run the check periodically (report-only). |
| ami/main/tests.py | Adds TestCheckOccurrences coverage for detection, fixes, project scoping, and summary. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| det_qs = Detection.objects.all() | ||
| if project_id is not None: | ||
| occ_qs = occ_qs.filter(project_id=project_id) | ||
| det_qs = det_qs.filter(source_image__deployment__project_id=project_id) |
There was a problem hiding this comment.
When scoping detections by project, this uses source_image__deployment__project_id. Elsewhere in the codebase project scoping is typically done via detection__source_image__project_id / source_image__project_id (and SourceImage.save() backfills project from deployment). Using source_image__project_id here would be more consistent and avoids edge cases if deployment/project ever diverge.
| det_qs = det_qs.filter(source_image__deployment__project_id=project_id) | |
| det_qs = det_qs.filter(source_image__project_id=project_id) |
| self.event = Event.objects.create( | ||
| deployment=self.deployment, | ||
| project=self.project, | ||
| start=datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc), |
There was a problem hiding this comment.
Event.group_by is a required field (non-null, no default). Creating an Event without group_by will raise an integrity error in tests. Set a deterministic group_by value in the test setup (e.g., based on the start date) to satisfy the model constraint.
| start=datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc), | |
| start=datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc), | |
| group_by="2024-01-01", |
| detection=detection, | ||
| taxon=self.taxon, | ||
| score=0.9, | ||
| terminal=True, |
There was a problem hiding this comment.
Classification.timestamp is non-nullable. This test helper creates a Classification without a timestamp, which will fail at runtime. Provide a timestamp (e.g., from the source image/event start) when creating the classification.
| terminal=True, | |
| terminal=True, | |
| timestamp=self.event.start, |
| other_event = Event.objects.create( | ||
| deployment=other_deployment, | ||
| project=other_project, | ||
| start=datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc), |
There was a problem hiding this comment.
Same issue as above: Event.group_by is required. This other_event creation in the project filter test omits group_by and will error. Add a group_by value here as well.
| start=datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc), | |
| start=datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc), | |
| group_by=self.event.group_by, |
| import logging | ||
|
|
||
| from django.core.management.base import BaseCommand | ||
|
|
||
| from ami.main.checks import check_occurrences | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
There was a problem hiding this comment.
logging/logger are defined but never used in this command. Please remove the unused import/variable to avoid lint noise and keep the command minimal.
| import logging | |
| from django.core.management.base import BaseCommand | |
| from ami.main.checks import check_occurrences | |
| logger = logging.getLogger(__name__) | |
| from django.core.management.base import BaseCommand | |
| from ami.main.checks import check_occurrences |
| label = "Missing determination" | ||
| count = len(report.missing_determination) | ||
| if fix and report.fixed_determinations: | ||
| self.stdout.write(f" {label}: {count} found, {report.fixed_determinations} fixed") | ||
| elif count: | ||
| self.stdout.write(self.style.WARNING(f" {label}: {count} found")) | ||
| else: | ||
| self.stdout.write(f" {label}: 0") |
There was a problem hiding this comment.
In --fix mode, the output only shows “found, fixed/deleted” when the fixed/deleted count is non-zero; if fixes were attempted but none were applied (or only partially applied), the output degrades to the same “X found” warning as report-only mode. Consider always printing the fixed/deleted counts when --fix is set (including 0), and optionally highlighting when fixed != found so operators can tell whether anything was actually repaired.
| deleted_count, _ = orphaned_occ.delete() | ||
| report.deleted_occurrences = deleted_count | ||
| logger.info("Deleted %d orphaned occurrences", deleted_count) |
There was a problem hiding this comment.
QuerySet.delete() returns the total number of objects deleted across all cascades, not just Occurrence rows. As a result deleted_occurrences can be inflated (e.g., if related rows are cascaded), and the management command/test expectations may become inaccurate. Consider computing the occurrence count before deleting, or use the per-model breakdown from the second value returned by delete() to record only the Occurrence deletions.
| deleted_count, _ = orphaned_occ.delete() | |
| report.deleted_occurrences = deleted_count | |
| logger.info("Deleted %d orphaned occurrences", deleted_count) | |
| deleted_total, per_model_counts = orphaned_occ.delete() | |
| deleted_occurrences = per_model_counts.get(Occurrence._meta.label, 0) | |
| report.deleted_occurrences = deleted_occurrences | |
| logger.info("Deleted %d orphaned occurrences", deleted_occurrences) |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (1)
ami/main/tests.py (1)
3846-3863: Extend the project-scope test to cover orphaned detections too.
check_occurrences()scopes orphaned detections throughsource_image__deployment__project_id, which is a different path from the occurrence filters.test_project_filter()currently only proves the occurrence side, so a regression in detection scoping would slip through.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/main/tests.py` around lines 3846 - 3863, test_project_filter only verifies occurrence scoping but not orphaned detection scoping; extend the test to also create an orphaned detection whose source_image is tied to a deployment/project different from self.project and then assert that check_occurrences(project_id=self.project.pk).orphaned_occurrences does not include that detection; specifically create an entity that exercises the detection path used by check_occurrences (the source_image__deployment__project_id lookup) and confirm orphaned_occurrences length remains 0.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ami/main/checks.py`:
- Around line 72-80: Wrap the per-occurrence repair call so a DB exception on
one record doesn't abort the loop: inside the loop iterating over
missing.iterator() that calls update_occurrence_determination(occ,
current_determination=None, save=True), catch exceptions (e.g., Exception or the
specific DB error) around that call, log a warning including the occurrence
id/context and the exception, and continue; still increment
report.fixed_determinations only on successful updates and keep the final
logger.info(...) unchanged so the rest of the repair pass and subsequent orphan
checks run.
In `@ami/main/management/commands/check_occurrences.py`:
- Around line 33-69: The summary footer currently prints SUCCESS on --fix runs
even when unresolved issues remain; update the final summary logic to compute
remaining issues after attempted fixes (e.g. remaining = max(0,
len(report.missing_determination) - (report.fixed_determinations or 0)) + max(0,
len(report.orphaned_occurrences) - (report.deleted_occurrences or 0)) +
len(report.orphaned_detections)) and then: if remaining > 0 print a NOTICE that
unresolved issues remain (instead of SUCCESS), if fix is true and remaining == 0
print SUCCESS ("Done. Applied fixes."), if not fix and report.has_issues keep
the existing NOTICE prompt, otherwise print SUCCESS ("No issues found."). Use
the existing symbols report, fix, report.fixed_determinations,
report.deleted_occurrences, report.missing_determination,
report.orphaned_occurrences, report.orphaned_detections, and report.has_issues
to implement this.
In `@docs/superpowers/specs/2026-03-25-check-occurrences-design.md`:
- Around line 136-144: Update the "Tests" path in the File locations table to
point to the actual test file that contains TestCheckOccurrences; replace
`ami/main/tests/test_checks.py` with `ami/main/tests.py` so readers are directed
to the file containing the TestCheckOccurrences test class referenced in the PR.
- Around line 95-118: Three fenced code blocks in the check-occurrences design
doc are missing language tags; add a language tag of "bash" to the command block
containing "manage.py check_occurrences..." and add "text" to the two
output/result blocks (the "Checking occurrence integrity..." block and the
"Missing determination..." block) so the fences read ```bash and ```text
respectively, preserving the exact block contents and indentation.
---
Nitpick comments:
In `@ami/main/tests.py`:
- Around line 3846-3863: test_project_filter only verifies occurrence scoping
but not orphaned detection scoping; extend the test to also create an orphaned
detection whose source_image is tied to a deployment/project different from
self.project and then assert that
check_occurrences(project_id=self.project.pk).orphaned_occurrences does not
include that detection; specifically create an entity that exercises the
detection path used by check_occurrences (the
source_image__deployment__project_id lookup) and confirm orphaned_occurrences
length remains 0.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 4a2a97e4-f4af-405e-9029-fbed1d39994b
📒 Files selected for processing (5)
ami/main/checks.pyami/main/management/commands/check_occurrences.pyami/main/tasks.pyami/main/tests.pydocs/superpowers/specs/2026-03-25-check-occurrences-design.md
| if fix and report.missing_determination: | ||
| for occ in missing.iterator(): | ||
| if update_occurrence_determination(occ, current_determination=None, save=True): | ||
| report.fixed_determinations += 1 | ||
| logger.info( | ||
| "Fixed %d/%d missing determinations", | ||
| report.fixed_determinations, | ||
| len(report.missing_determination), | ||
| ) |
There was a problem hiding this comment.
Keep one bad occurrence from aborting the whole repair pass.
update_occurrence_determination(..., save=True) re-queries and saves the row, and the callee does not swallow DB errors. Right now any exception there stops the rest of the batch, so later repairs and the orphan checks never run.
🛠️ Suggested fix
if fix and report.missing_determination:
for occ in missing.iterator():
- if update_occurrence_determination(occ, current_determination=None, save=True):
- report.fixed_determinations += 1
+ try:
+ if update_occurrence_determination(occ, current_determination=None, save=True):
+ report.fixed_determinations += 1
+ except Exception:
+ logger.exception("Failed to fix missing determination for occurrence %s", occ.pk)
logger.info(
"Fixed %d/%d missing determinations",
report.fixed_determinations,
len(report.missing_determination),
)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ami/main/checks.py` around lines 72 - 80, Wrap the per-occurrence repair call
so a DB exception on one record doesn't abort the loop: inside the loop
iterating over missing.iterator() that calls
update_occurrence_determination(occ, current_determination=None, save=True),
catch exceptions (e.g., Exception or the specific DB error) around that call,
log a warning including the occurrence id/context and the exception, and
continue; still increment report.fixed_determinations only on successful updates
and keep the final logger.info(...) unchanged so the rest of the repair pass and
subsequent orphan checks run.
| report = check_occurrences(project_id=project_id, fix=fix) | ||
|
|
||
| # Missing determination | ||
| label = "Missing determination" | ||
| count = len(report.missing_determination) | ||
| if fix and report.fixed_determinations: | ||
| self.stdout.write(f" {label}: {count} found, {report.fixed_determinations} fixed") | ||
| elif count: | ||
| self.stdout.write(self.style.WARNING(f" {label}: {count} found")) | ||
| else: | ||
| self.stdout.write(f" {label}: 0") | ||
|
|
||
| # Orphaned occurrences | ||
| label = "Orphaned occurrences" | ||
| count = len(report.orphaned_occurrences) | ||
| if fix and report.deleted_occurrences: | ||
| self.stdout.write(f" {label}: {count} found, {report.deleted_occurrences} deleted") | ||
| elif count: | ||
| self.stdout.write(self.style.WARNING(f" {label}: {count} found")) | ||
| else: | ||
| self.stdout.write(f" {label}: 0") | ||
|
|
||
| # Orphaned detections | ||
| label = "Orphaned detections" | ||
| count = len(report.orphaned_detections) | ||
| if count: | ||
| self.stdout.write(self.style.WARNING(f" {label}: {count} found")) | ||
| else: | ||
| self.stdout.write(f" {label}: 0") | ||
|
|
||
| # Summary | ||
| if report.has_issues and not fix: | ||
| self.stdout.write(self.style.NOTICE("\nRun with --fix to repair fixable issues.")) | ||
| elif report.has_issues and fix: | ||
| self.stdout.write(self.style.SUCCESS("\nDone. Applied fixes.")) | ||
| else: | ||
| self.stdout.write(self.style.SUCCESS("\nNo issues found.")) |
There was a problem hiding this comment.
Don't end --fix runs with a success footer when issues remain.
Orphaned detections are never auto-fixed, and the other two categories can be only partially repaired. This branch still prints SUCCESS, so the command can look clean even when the counts above show unresolved problems.
🛠️ Suggested fix
report = check_occurrences(project_id=project_id, fix=fix)
+ remaining_missing = max(len(report.missing_determination) - report.fixed_determinations, 0)
+ remaining_orphaned_occurrences = max(len(report.orphaned_occurrences) - report.deleted_occurrences, 0)
+ remaining_issues = remaining_missing + remaining_orphaned_occurrences + len(report.orphaned_detections)
@@
- elif report.has_issues and fix:
- self.stdout.write(self.style.SUCCESS("\nDone. Applied fixes."))
+ elif fix and remaining_issues:
+ self.stdout.write(
+ self.style.WARNING(
+ f"\nDone. Applied fixes, but {remaining_issues} issue(s) still require attention."
+ )
+ )
+ elif fix:
+ self.stdout.write(self.style.SUCCESS("\nDone. All fixable issues were repaired."))
else:
self.stdout.write(self.style.SUCCESS("\nNo issues found."))📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| report = check_occurrences(project_id=project_id, fix=fix) | |
| # Missing determination | |
| label = "Missing determination" | |
| count = len(report.missing_determination) | |
| if fix and report.fixed_determinations: | |
| self.stdout.write(f" {label}: {count} found, {report.fixed_determinations} fixed") | |
| elif count: | |
| self.stdout.write(self.style.WARNING(f" {label}: {count} found")) | |
| else: | |
| self.stdout.write(f" {label}: 0") | |
| # Orphaned occurrences | |
| label = "Orphaned occurrences" | |
| count = len(report.orphaned_occurrences) | |
| if fix and report.deleted_occurrences: | |
| self.stdout.write(f" {label}: {count} found, {report.deleted_occurrences} deleted") | |
| elif count: | |
| self.stdout.write(self.style.WARNING(f" {label}: {count} found")) | |
| else: | |
| self.stdout.write(f" {label}: 0") | |
| # Orphaned detections | |
| label = "Orphaned detections" | |
| count = len(report.orphaned_detections) | |
| if count: | |
| self.stdout.write(self.style.WARNING(f" {label}: {count} found")) | |
| else: | |
| self.stdout.write(f" {label}: 0") | |
| # Summary | |
| if report.has_issues and not fix: | |
| self.stdout.write(self.style.NOTICE("\nRun with --fix to repair fixable issues.")) | |
| elif report.has_issues and fix: | |
| self.stdout.write(self.style.SUCCESS("\nDone. Applied fixes.")) | |
| else: | |
| self.stdout.write(self.style.SUCCESS("\nNo issues found.")) | |
| report = check_occurrences(project_id=project_id, fix=fix) | |
| remaining_missing = max(len(report.missing_determination) - report.fixed_determinations, 0) | |
| remaining_orphaned_occurrences = max(len(report.orphaned_occurrences) - report.deleted_occurrences, 0) | |
| remaining_issues = remaining_missing + remaining_orphaned_occurrences + len(report.orphaned_detections) | |
| # Missing determination | |
| label = "Missing determination" | |
| count = len(report.missing_determination) | |
| if fix and report.fixed_determinations: | |
| self.stdout.write(f" {label}: {count} found, {report.fixed_determinations} fixed") | |
| elif count: | |
| self.stdout.write(self.style.WARNING(f" {label}: {count} found")) | |
| else: | |
| self.stdout.write(f" {label}: 0") | |
| # Orphaned occurrences | |
| label = "Orphaned occurrences" | |
| count = len(report.orphaned_occurrences) | |
| if fix and report.deleted_occurrences: | |
| self.stdout.write(f" {label}: {count} found, {report.deleted_occurrences} deleted") | |
| elif count: | |
| self.stdout.write(self.style.WARNING(f" {label}: {count} found")) | |
| else: | |
| self.stdout.write(f" {label}: 0") | |
| # Orphaned detections | |
| label = "Orphaned detections" | |
| count = len(report.orphaned_detections) | |
| if count: | |
| self.stdout.write(self.style.WARNING(f" {label}: {count} found")) | |
| else: | |
| self.stdout.write(f" {label}: 0") | |
| # Summary | |
| if report.has_issues and not fix: | |
| self.stdout.write(self.style.NOTICE("\nRun with --fix to repair fixable issues.")) | |
| elif fix and remaining_issues: | |
| self.stdout.write( | |
| self.style.WARNING( | |
| f"\nDone. Applied fixes, but {remaining_issues} issue(s) still require attention." | |
| ) | |
| ) | |
| elif fix: | |
| self.stdout.write(self.style.SUCCESS("\nDone. All fixable issues were repaired.")) | |
| else: | |
| self.stdout.write(self.style.SUCCESS("\nNo issues found.")) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ami/main/management/commands/check_occurrences.py` around lines 33 - 69, The
summary footer currently prints SUCCESS on --fix runs even when unresolved
issues remain; update the final summary logic to compute remaining issues after
attempted fixes (e.g. remaining = max(0, len(report.missing_determination) -
(report.fixed_determinations or 0)) + max(0, len(report.orphaned_occurrences) -
(report.deleted_occurrences or 0)) + len(report.orphaned_detections)) and then:
if remaining > 0 print a NOTICE that unresolved issues remain (instead of
SUCCESS), if fix is true and remaining == 0 print SUCCESS ("Done. Applied
fixes."), if not fix and report.has_issues keep the existing NOTICE prompt,
otherwise print SUCCESS ("No issues found."). Use the existing symbols report,
fix, report.fixed_determinations, report.deleted_occurrences,
report.missing_determination, report.orphaned_occurrences,
report.orphaned_detections, and report.has_issues to implement this.
| ``` | ||
| manage.py check_occurrences [--project-id N] [--fix] | ||
| ``` | ||
|
|
||
| Output format: | ||
| ``` | ||
| Checking occurrence integrity... | ||
| Project: Vermont Atlas of Life (#5) | ||
|
|
||
| Missing determination: 12 found, 12 fixed | ||
| Orphaned occurrences: 3 found, 3 deleted | ||
| Orphaned detections: 0 found | ||
|
|
||
| Done. Fixed 15 issues. | ||
| ``` | ||
|
|
||
| Without `--fix`: | ||
| ``` | ||
| Missing determination: 12 found | ||
| Orphaned occurrences: 3 found | ||
| Orphaned detections: 0 found | ||
|
|
||
| Found 15 issues. Run with --fix to repair. | ||
| ``` |
There was a problem hiding this comment.
Add language tags to these fenced blocks.
markdownlint is already flagging these three fences. Annotating them as bash / text clears the warning and improves rendering.
📝 Suggested doc fix
-```
+```bash
manage.py check_occurrences [--project-id N] [--fix]Output format:
- +text
Checking occurrence integrity...
Project: Vermont Atlas of Life (#5)
@@
Done. Fixed 15 issues.
Without `--fix`:
-```
+```text
Missing determination: 12 found
@@
Found 15 issues. Run with --fix to repair.
</details>
<details>
<summary>🧰 Tools</summary>
<details>
<summary>🪛 markdownlint-cli2 (0.21.0)</summary>
[warning] 95-95: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
---
[warning] 100-100: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
---
[warning] 112-112: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
</details>
</details>
<details>
<summary>🤖 Prompt for AI Agents</summary>
Verify each finding against the current code and only fix it if needed.
In @docs/superpowers/specs/2026-03-25-check-occurrences-design.md around lines
95 - 118, Three fenced code blocks in the check-occurrences design doc are
missing language tags; add a language tag of "bash" to the command block
containing "manage.py check_occurrences..." and add "text" to the two
output/result blocks (the "Checking occurrence integrity..." block and the
"Missing determination..." block) so the fences read bash and text
respectively, preserving the exact block contents and indentation.
</details>
<!-- fingerprinting:phantom:medusa:grasshopper -->
<!-- This is an auto-generated comment by CodeRabbit -->
| ## File locations | ||
|
|
||
| | Component | Path | | ||
| |-----------|------| | ||
| | Core function | `ami/main/checks.py` | | ||
| | Management command | `ami/main/management/commands/check_occurrences.py` | | ||
| | Celery task | `ami/main/tasks.py` (add to existing) | | ||
| | Tests | `ami/main/tests/test_checks.py` | | ||
|
|
There was a problem hiding this comment.
Point the spec at the actual test location.
The table says the coverage lives in ami/main/tests/test_checks.py, but this PR adds TestCheckOccurrences to ami/main/tests.py. Leaving the old path here will send readers to the wrong place.
📝 Suggested doc fix
-| Tests | `ami/main/tests/test_checks.py` |
+| Tests | `ami/main/tests.py` (`TestCheckOccurrences`) |🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@docs/superpowers/specs/2026-03-25-check-occurrences-design.md` around lines
136 - 144, Update the "Tests" path in the File locations table to point to the
actual test file that contains TestCheckOccurrences; replace
`ami/main/tests/test_checks.py` with `ami/main/tests.py` so readers are directed
to the file containing the TestCheckOccurrences test class referenced in the PR.
|
See another backend implementation at #1185 |
…heck Adds a reusable integrity-check framework under `ami/main/checks/`. Each check pair is a `get_*` queryset function (composable, side-effect free) and a `reconcile_*` action function returning an IntegrityCheckResult. This PR ships one check: occurrences with classifications but no determination set, caused by partial pipeline save failures observed in the demo environment. - `ami/main/checks/occurrences.py` — query + reconcile for missing determinations, scoped by project, job, or explicit occurrence ids. Each row is fixed in its own try/except so one failure doesn't abort the batch. - `ami/main/management/commands/check_data_integrity.py` — CLI with `--dry-run` (default) / `--no-dry-run`, `--project`, `--job`. - `ami/main/tasks.py` — `check_data_integrity` Celery task for scheduling via django_celery_beat in the admin. - `ami/main/tests.py::TestIntegrityChecks` — 7 tests covering query scoping (project, classifications, occurrence-ids), dry-run no-op behavior, reconciliation fixing nulled determinations, and project isolation. Future checks add a new module under `checks/` and extend the command and task dispatch. Co-Authored-By: Claude <noreply@anthropic.com>
103dce5 to
62f22aa
Compare
The periodic task is renamed from `check_stale_jobs_task` to
`jobs_health_check` so new job-health checks can share the 15-minute
cadence and `expires` guarantees without a new beat entry. Current
body runs a single `_run_stale_jobs_check()`; future checks plug in
alongside it and return the same `{checked, fixed, unfixable}` shape.
Names chosen to parallel RolnickLab#1188's integrity-check pattern:
- Beat task = noun phrase (`jobs_health_check`) — reads well in flower
- (Future) management command = verb (`manage.py check_jobs`)
Migration 0020 updated in place since it hasn't shipped yet.
Co-Authored-By: Claude <noreply@anthropic.com>
|
Claude says: Naming alignment note — we're landing a sibling beat task in #1227 ( One small asymmetry worth resolving eventually, in this PR or a follow-up:
Happy to address either of these on your side when you pick this up — flagging now so we stay aligned. |
…kResult Addresses the takeaway-review findings on PR #1227: - Fix reverse migration: delete the row we create. The forward step registers `jobs.health_check` but the old `delete_periodic_tasks` still referenced the pre-rename `jobs.check_stale_jobs`, leaving a stranded row on rollback. - Collapse `log_running_async_job_stats` into a sub-check of the umbrella. Drops the second `PeriodicTask` and the shared-connection fallback path — on a 15-minute cadence there's no reason to keep two beat tasks or defend against a per-manager bug, and a quietly hung job now gets a snapshot in the same tick the reconciler will decide whether to revoke it. - Adopt `IntegrityCheckResult` as the shared sub-check shape, housed in a new `ami.main.checks.schemas` module so PR #1188 can re-target its import without a merge-order dance. Wrap the two sub-checks in a `JobsHealthCheckResult` parent dataclass; the umbrella returns `dataclasses.asdict(...)` for celery's JSON backend. Observation checks leave `fixed=0` and count per-item errors in `unfixable`. Tests collapse to one `JobsHealthCheckTest` class covering both sub-checks (reconcile + snapshot) and the edge cases that matter: per-job snapshot failure, shared-connection setup failure, non-async jobs skipped, idle deployment returns all zeros. Co-Authored-By: Claude <noreply@anthropic.com>
* feat(jobs): schedule periodic stale-job reconcile + NATS consumer snapshots Two Celery Beat periodic tasks so operators can see ML job health without waiting for a job to finish: 1. check_stale_jobs_task (every 15 min) — thin wrapper around the existing check_stale_jobs() that reconciles jobs stuck in running states past FAILED_CUTOFF_HOURS. Previously the function existed but was only reachable via the update_stale_jobs management command, so nothing ran it automatically. 2. log_running_async_job_stats (every 5 min) — for each incomplete async_api job, opens a TaskQueueManager with that job's logger and logs a delivered/ack_floor/num_pending/num_ack_pending/num_redelivered snapshot of the NATS consumer. Read-only; no status changes. Builds on the lifecycle-logging landed in #1222 so long-running jobs now get mid-flight visibility, not just create + cleanup snapshots. Both registered via migration 0020 so existing deployments pick them up on the next migrate without manual Beat configuration. Co-Authored-By: Claude <noreply@anthropic.com> * fix(jobs): address review comments on PR #1227 - log_running_async_job_stats: reuse a single TaskQueueManager per tick instead of opening N NATS connections. Cost is now O(1) in the number of running async jobs rather than O(n). Guarded outer connection setup so a NATS outage drops the tick cleanly instead of crashing the task. - check_stale_jobs_task: bump expires from 10 to 14 minutes so a delayed copy still runs within the 15-minute schedule instead of expiring before a worker picks it up under broker pressure. - migration 0020: use apps.get_model for django_celery_beat models and declare an explicit migration dependency so the data migration uses historical model state. Co-Authored-By: Claude <noreply@anthropic.com> * fix(jobs): fall back to per-job manager when shared path fails If opening the shared TaskQueueManager raises at setup (or teardown), try each job with its own fresh manager before giving up. Defends against a regression in the shared mutation pattern silently costing us snapshot visibility for every running async job on every tick. If NATS itself is down, the fallback loop will fail too and log once per job — same end-state as before the refactor, just more noisy. Co-Authored-By: Claude <noreply@anthropic.com> * refactor(jobs): rename beat task to jobs_health_check umbrella The periodic task is renamed from `check_stale_jobs_task` to `jobs_health_check` so new job-health checks can share the 15-minute cadence and `expires` guarantees without a new beat entry. Current body runs a single `_run_stale_jobs_check()`; future checks plug in alongside it and return the same `{checked, fixed, unfixable}` shape. Names chosen to parallel #1188's integrity-check pattern: - Beat task = noun phrase (`jobs_health_check`) — reads well in flower - (Future) management command = verb (`manage.py check_jobs`) Migration 0020 updated in place since it hasn't shipped yet. Co-Authored-By: Claude <noreply@anthropic.com> * refactor(jobs): fold snapshot task into umbrella, adopt IntegrityCheckResult Addresses the takeaway-review findings on PR #1227: - Fix reverse migration: delete the row we create. The forward step registers `jobs.health_check` but the old `delete_periodic_tasks` still referenced the pre-rename `jobs.check_stale_jobs`, leaving a stranded row on rollback. - Collapse `log_running_async_job_stats` into a sub-check of the umbrella. Drops the second `PeriodicTask` and the shared-connection fallback path — on a 15-minute cadence there's no reason to keep two beat tasks or defend against a per-manager bug, and a quietly hung job now gets a snapshot in the same tick the reconciler will decide whether to revoke it. - Adopt `IntegrityCheckResult` as the shared sub-check shape, housed in a new `ami.main.checks.schemas` module so PR #1188 can re-target its import without a merge-order dance. Wrap the two sub-checks in a `JobsHealthCheckResult` parent dataclass; the umbrella returns `dataclasses.asdict(...)` for celery's JSON backend. Observation checks leave `fixed=0` and count per-item errors in `unfixable`. Tests collapse to one `JobsHealthCheckTest` class covering both sub-checks (reconcile + snapshot) and the edge cases that matter: per-job snapshot failure, shared-connection setup failure, non-async jobs skipped, idle deployment returns all zeros. Co-Authored-By: Claude <noreply@anthropic.com> * fix(jobs): isolate sub-checks + pre-resolve loggers off event loop Addresses review findings from the re-review on 6dc7e6e: - The umbrella had no guard around sub-check calls, so a DB hiccup in ``_run_stale_jobs_check`` would kill the snapshot check and fail the whole task. Wrap each call in ``_safe_run_sub_check``, which catches, logs, and returns ``IntegrityCheckResult(unfixable=1)`` as a sentinel — operators watching the task result in Flower see the sub-check failed rather than reading all-zeros and assuming all-clear. - ``Job.logger`` attaches a ``JobLogHandler`` on first access which touches the ORM; the file's own docstring says resolve outside the event loop, but two accesses were inside the coroutine. Pre-resolve into a list of ``(job, job_logger)`` tuples before entering ``async_to_sync``. - Escalate the ``running_job_snapshots`` summary log to WARNING when ``errors > 0`` so persistent NATS unavailability is distinguishable from a quiet tick in aggregated logs. - Document that the outer shared-connection except overwrites per-iteration error counts on the rare ``__aexit__`` teardown path. New tests: - ``test_sub_check_exception_does_not_block_the_other`` — patches the snapshot sub-check to raise; stale-jobs still reports correctly and snapshots come back as the ``unfixable=1`` sentinel. - ``test_stale_jobs_fixed_counts_celery_updated_and_revoked_paths`` — one stale job with a terminal Celery ``task_id``, one without; both branches of ``fixed`` counted so a future refactor dropping one branch breaks the test. - Explicit ``fixed == 0`` assertion in the snapshot test locks the observation-only contract. Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
Summary
Adds a reusable data integrity check framework under
ami/main/checks/. Each check is a pair of functions: aget_*queryset (side-effect free, composable) and areconcile_*action returning anIntegrityCheckResult. Future checks add a new module underchecks/and plug into the management command and Celery task.This PR ships one check: occurrences with classifications but no determination set — the partial-save failure observed in the demo environment.
Components
ami/main/checks/occurrences.py—get_occurrences_missing_determination()andreconcile_missing_determinations(), scoped byproject_id,job_id, or explicitoccurrence_ids. Each row is fixed in its owntry/exceptso one failure doesn't abort the batch.ami/main/management/commands/check_data_integrity.py—--dry-run(default) /--no-dry-run, plus--project,--job.ami/main/tasks.py—check_data_integrityCelery task, schedulable via django_celery_beat in the Django admin.ami/main/tests.py::TestIntegrityChecks— 7 tests against the real CI database.CLI
Tests
TestIntegrityCheckscovers:get_*includes only occurrences with classifications but no determinationget_*excludes occurrences with no classifications at allreconcile_*indry_run=Truemode reports counts but doesn't modify the DBreconcile_*indry_run=Falsemode repairs nulled determinationsAll 7 tests pass against the CI Postgres stack.
Related
Alternate implementation: #1185 (
fix/null-determination-resilience). This PR adopts the sameget_*/reconcile_*separation andReconcileResult-style return, but lays it out as an extensible package (checks/) rather than a single file, with the expectation that future integrity checks add one module per domain.