diff --git a/CHANGES/+safe-in-parameter-limit.bugfix b/CHANGES/+safe-in-parameter-limit.bugfix new file mode 100644 index 00000000000..f6ff013e232 --- /dev/null +++ b/CHANGES/+safe-in-parameter-limit.bugfix @@ -0,0 +1 @@ +Avoid exceeding PostgreSQL's 65,535 query parameter limit when filtering by large lists of IDs. This fixes `OperationalError` crashes during large import and copy operations involving more than 65,535 content units. \ No newline at end of file diff --git a/CHANGES/plugin_api/+safe-in-parameter-limit.feature b/CHANGES/plugin_api/+safe-in-parameter-limit.feature new file mode 100644 index 00000000000..8ef75db667a --- /dev/null +++ b/CHANGES/plugin_api/+safe-in-parameter-limit.feature @@ -0,0 +1 @@ +Added `safe_in()` to the plugin API for building `Q` objects that are safe for arbitrarily large value lists, avoiding PostgreSQL's 65,535 query parameter limit. \ No newline at end of file diff --git a/pulpcore/app/models/repository.py b/pulpcore/app/models/repository.py index 3f1207cb77d..729272a5015 100644 --- a/pulpcore/app/models/repository.py +++ b/pulpcore/app/models/repository.py @@ -25,6 +25,7 @@ get_prn, get_view_name_for_model, reverse, + safe_in, ) from pulpcore.cache import Cache from pulpcore.constants import ALL_KNOWN_CONTENT_CHECKSUMS, PROTECTED_REPO_VERSION_MESSAGE @@ -988,7 +989,7 @@ def get_content(self, content_qs=None): Args: content_qs (django.db.models.QuerySet): The queryset for Content that will be restricted further to the content present in this repository version. If not given, - ``Content.objects.all()`` is used (to return over all content types present in the + `Content.objects.all()` is used (to return over all content types present in the repository version). Returns: @@ -1004,15 +1005,7 @@ def get_content(self, content_qs=None): if content_qs is None: content_qs = Content.objects - content_ids = self.content_ids - if len(content_ids) >= 65535: - # Workaround for PostgreSQL's limit on the number of parameters in a query - content_ids = ( - RepositoryVersion.objects.filter(pk=self.pk) - .annotate(cids=Func(F("content_ids"), function="unnest")) - .values_list("cids", flat=True) - ) - return content_qs.filter(pk__in=content_ids) + return content_qs.filter(safe_in("pk", self.content_ids)) @property def content(self): @@ -1056,14 +1049,14 @@ def content_batch_qs(self, content_qs=None, order_by_params=("pk",), batch_size= Args: content_qs (django.db.models.QuerySet) The queryset for Content that will be restricted further to the content present in this repository version. If not given, - ``Content.objects.all()`` is used (to iterate over all content present in the + `Content.objects.all()` is used (to iterate over all content present in the repository version). A plugin may want to use a specific subclass of - [pulpcore.plugin.models.Content][] or use e.g. ``filter()`` to select + [pulpcore.plugin.models.Content][] or use e.g. `filter()` to select a subset of the repository version's content. - order_by_params (tuple of str): The parameters for the ``order_by`` clause - for the content. The Default is ``("pk",)``. This needs to + order_by_params (tuple of str): The parameters for the `order_by` clause + for the content. The Default is `("pk",)`. This needs to specify a stable order. For example, if you want to iterate by - decreasing creation time stamps use ``("-pulp_created", "pk")`` to + decreasing creation time stamps use `("-pulp_created", "pk")` to ensure that content records are still sorted by primary key even if their creation timestamp happens to be equal. batch_size (int): The maximum batch size. @@ -1072,8 +1065,8 @@ def content_batch_qs(self, content_qs=None, order_by_params=("pk",), batch_size= [django.db.models.QuerySet][]: A QuerySet representing a slice of the content. Example: - The following code could be used to loop over all ``FileContent`` in - ``repository_version``. It prefetches the related + The following code could be used to loop over all `FileContent` in + `repository_version`. It prefetches the related [pulpcore.plugin.models.ContentArtifact][] instances for every batch:: repository_version = ... @@ -1126,8 +1119,8 @@ def added(self, base_version=None): if not base_version: return Content.objects.filter(version_memberships__version_added=self) - return Content.objects.filter(pk__in=self.content_ids).exclude( - pk__in=base_version.content_ids + return Content.objects.filter(safe_in("pk", self.content_ids)).exclude( + safe_in("pk", base_version.content_ids) ) def removed(self, base_version=None): @@ -1141,8 +1134,8 @@ def removed(self, base_version=None): if not base_version: return Content.objects.filter(version_memberships__version_removed=self) - return Content.objects.filter(pk__in=base_version.content_ids).exclude( - pk__in=self.content_ids + return Content.objects.filter(safe_in("pk", base_version.content_ids)).exclude( + safe_in("pk", self.content_ids) ) def contains(self, content): diff --git a/pulpcore/app/tasks/importer.py b/pulpcore/app/tasks/importer.py index 337d41cbbc6..627e55ce751 100644 --- a/pulpcore/app/tasks/importer.py +++ b/pulpcore/app/tasks/importer.py @@ -39,6 +39,7 @@ compute_file_hash, get_domain, get_domain_pk, + safe_in, ) from pulpcore.constants import TASK_STATES from pulpcore.exceptions.plugin import MissingPlugin @@ -410,14 +411,14 @@ def import_repository_version( for repo_name, content_ids in mapping.items(): repo_name = _get_destination_repo_name(importer, repo_name) dest_repo = Repository.objects.get(name=repo_name) - content = Content.objects.filter(upstream_id__in=content_ids) + content = Content.objects.filter(safe_in("upstream_id", content_ids)) content_count += len(content_ids) with dest_repo.new_version() as new_version: new_version.set_content(content) else: # just map all the content to our destination repo dest_repo = Repository.objects.get(pk=dest_repo_pk) - content = Content.objects.filter(pk__in=resulting_content_ids) + content = Content.objects.filter(safe_in("pk", resulting_content_ids)) content_count += len(resulting_content_ids) with dest_repo.new_version() as new_version: new_version.set_content(content) diff --git a/pulpcore/app/util.py b/pulpcore/app/util.py index 237ad50e67a..41b4eeba1e7 100644 --- a/pulpcore/app/util.py +++ b/pulpcore/app/util.py @@ -16,7 +16,7 @@ from django.apps import apps from django.conf import settings from django.db import connection -from django.db.models import Model, UUIDField +from django.db.models import Field, Lookup, Model, Q, UUIDField from rest_framework.reverse import reverse as drf_reverse from rest_framework.serializers import ValidationError @@ -26,6 +26,47 @@ from pulpcore.app.loggers import deprecation_logger from pulpcore.exceptions.validation import InvalidSignatureError +POSTGRES_MAX_QUERY_PARAMS = 65535 + + +class AnyArray(Lookup): + """PostgreSQL `= ANY(%s)` lookup that passes a list as a single array parameter. + + psycopg3 adapts the Python list into a PostgreSQL array, so the entire list + counts as **one** bind parameter regardless of size. This avoids the + protocol-level 65535-parameter limit that `IN ($1, $2, …)` hits. + """ + + lookup_name = "any_array" + + def get_prep_lookup(self): + return [self.lhs.output_field.get_prep_value(v) for v in self.rhs] + + def as_sql(self, compiler, connection): + lhs, lhs_params = self.process_lhs(compiler, connection) + return f"{lhs} = ANY(%s)", lhs_params + [list(self.rhs)] + + +Field.register_lookup(AnyArray) + + +def safe_in(field_name, values): + """Build a `Q` object for `field__in` that is safe for arbitrarily large lists. + + * If *values* is already a queryset (or other non-collection type), the + normal `__in` lookup is used — Django turns it into a subquery. + * If the collection has fewer than 65 535 items, `__in` is used as-is. + * Otherwise `__any_array` is used so the whole list travels as a single + PostgreSQL array parameter. + """ + if not isinstance(values, (list, set, tuple, frozenset)): + return Q(**{f"{field_name}__in": values}) + values = list(values) + if len(values) < POSTGRES_MAX_QUERY_PARAMS: + return Q(**{f"{field_name}__in": values}) + return Q(**{f"{field_name}__any_array": values}) + + # a little cache so viewset_for_model doesn't have to iterate over every app every time _model_viewset_cache = {} diff --git a/pulpcore/plugin/util.py b/pulpcore/plugin/util.py index b8e63db206f..4a9cd1d5dd9 100644 --- a/pulpcore/plugin/util.py +++ b/pulpcore/plugin/util.py @@ -27,6 +27,7 @@ raise_for_unknown_content_units, resolve_prn, reverse, + safe_in, set_current_user, set_domain, ) @@ -59,5 +60,6 @@ "reverse", "set_current_user", "resolve_prn", + "safe_in", "cache_key", ] diff --git a/pulpcore/tests/unit/models/test_repository.py b/pulpcore/tests/unit/models/test_repository.py index 621a86373dd..9520d1db061 100644 --- a/pulpcore/tests/unit/models/test_repository.py +++ b/pulpcore/tests/unit/models/test_repository.py @@ -788,3 +788,90 @@ def test_batch_operations_preserve_correctness(repository, db): assert rvcd_qs.get(count_type=RepositoryVersionContentDetails.PRESENT).count == 40 assert rvcd_qs.filter(count_type=RepositoryVersionContentDetails.ADDED).first() is None assert rvcd_qs.get(count_type=RepositoryVersionContentDetails.REMOVED).count == 60 + + +def test_postgresql_parameter_limit(db, repository): + """ + Test repository operations with >65535 content units to verify PostgreSQL parameter limit + workaround. + + PostgreSQL limits queries to 65535 parameters. This test verifies that content, added(), + and removed() all handle >65535 items correctly. + + The safe_in() utility (used internally) avoids this limit by using ``= ANY(%s)`` + (a single array parameter) for large value lists. + + Queries MUST be evaluated via .iterator() because psycopg3 uses client-side binding for + regular queries (inlining params into the SQL string, bypassing the limit) but server-side + binding for server-side cursors (.iterator()), which enforces the 65,535 parameter cap. + Without .iterator() the test passes even when the fix is absent. + """ + # Create 66000 content units (exceeds PostgreSQL's 65535 parameter limit) + large_content_set = [Content(pulp_type="core.content") for _ in range(66000)] + Content.objects.bulk_create(large_content_set, batch_size=2000) + large_pks = sorted([c.pk for c in large_content_set]) + + version0 = repository.latest_version() + + # Test 1: Add >65535 content units - tests added() and content with >65535 items + with repository.new_version() as version1: + version1.add_content(Content.objects.filter(pk__in=large_pks)) + + # Verify content_ids exceeds the PostgreSQL parameter limit threshold + assert isinstance(version1.content_ids, list) + assert len(version1.content_ids) >= 65535 + + # Test the content property with >65535 items + # .iterator() forces a server-side cursor — the only way to reliably trigger the + # 65,535-parameter limit in psycopg3. + content_pks = set(version1.content.values_list("pk", flat=True).iterator()) + assert len(content_pks) == 66000 + + # Test the added() method with >65535 items + added_pks = set(version1.added(base_version=version0).values_list("pk", flat=True).iterator()) + assert len(added_pks) == 66000 # Critical: added() must handle >65535 items + + # Test the removed() method returns nothing (nothing was removed) + removed_pks = set( + version1.removed(base_version=version0).values_list("pk", flat=True).iterator() + ) + assert len(removed_pks) == 0 + + # Verify RepositoryVersionContentDetails + rvcd_qs = RepositoryVersionContentDetails.objects.filter( + repository_version=version1, content_type="core.content" + ) + assert rvcd_qs.get(count_type=RepositoryVersionContentDetails.PRESENT).count == 66000 + assert rvcd_qs.get(count_type=RepositoryVersionContentDetails.ADDED).count == 66000 + assert rvcd_qs.filter(count_type=RepositoryVersionContentDetails.REMOVED).first() is None + + # Test 2: Remove >65535 content units - tests removed() with >65535 items + with repository.new_version() as version2: + version2.remove_content(Content.objects.filter(pk__in=large_pks)) + + # Test the content property returns nothing (all content was removed) + content_pks = set(version2.content.values_list("pk", flat=True).iterator()) + assert len(content_pks) == 0 + + # Test the added() method returns nothing (nothing was added) + added_pks = set(version2.added(base_version=version1).values_list("pk", flat=True).iterator()) + assert len(added_pks) == 0 + + # Test the removed() method with >65535 items + removed_pks = set( + version2.removed(base_version=version1).values_list("pk", flat=True).iterator() + ) + assert len(removed_pks) == 66000 # Critical: removed() must handle >65535 items + + # Verify RepositoryVersionContentDetails + rvcd_qs = RepositoryVersionContentDetails.objects.filter( + repository_version=version2, content_type="core.content" + ) + assert rvcd_qs.filter(count_type=RepositoryVersionContentDetails.PRESENT).first() is None + assert rvcd_qs.filter(count_type=RepositoryVersionContentDetails.ADDED).first() is None + assert rvcd_qs.get(count_type=RepositoryVersionContentDetails.REMOVED).count == 66000 + + # Verify we can iterate and fetch content without errors + first_100 = list(version1.content[:100].values_list("pk", flat=True)) + assert len(first_100) == 100 + assert all(pk in large_pks for pk in first_100)