Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/+safe-in-parameter-limit.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Avoid exceeding PostgreSQL's 65,535 query parameter limit when filtering by large lists of IDs. This fixes `OperationalError` crashes during large import and copy operations involving more than 65,535 content units.
1 change: 1 addition & 0 deletions CHANGES/plugin_api/+safe-in-parameter-limit.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added `safe_in()` to the plugin API for building `Q` objects that are safe for arbitrarily large value lists, avoiding PostgreSQL's 65,535 query parameter limit.
35 changes: 14 additions & 21 deletions pulpcore/app/models/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
get_prn,
get_view_name_for_model,
reverse,
safe_in,
)
from pulpcore.cache import Cache
from pulpcore.constants import ALL_KNOWN_CONTENT_CHECKSUMS, PROTECTED_REPO_VERSION_MESSAGE
Expand Down Expand Up @@ -988,7 +989,7 @@ def get_content(self, content_qs=None):
Args:
content_qs (django.db.models.QuerySet): The queryset for Content that will be
restricted further to the content present in this repository version. If not given,
``Content.objects.all()`` is used (to return over all content types present in the
`Content.objects.all()` is used (to return over all content types present in the
repository version).

Returns:
Expand All @@ -1004,15 +1005,7 @@ def get_content(self, content_qs=None):
if content_qs is None:
content_qs = Content.objects

content_ids = self.content_ids
if len(content_ids) >= 65535:
# Workaround for PostgreSQL's limit on the number of parameters in a query
content_ids = (
RepositoryVersion.objects.filter(pk=self.pk)
.annotate(cids=Func(F("content_ids"), function="unnest"))
.values_list("cids", flat=True)
)
return content_qs.filter(pk__in=content_ids)
return content_qs.filter(safe_in("pk", self.content_ids))

@property
def content(self):
Expand Down Expand Up @@ -1056,14 +1049,14 @@ def content_batch_qs(self, content_qs=None, order_by_params=("pk",), batch_size=
Args:
content_qs (django.db.models.QuerySet) The queryset for Content that will be
restricted further to the content present in this repository version. If not given,
``Content.objects.all()`` is used (to iterate over all content present in the
`Content.objects.all()` is used (to iterate over all content present in the
repository version). A plugin may want to use a specific subclass of
[pulpcore.plugin.models.Content][] or use e.g. ``filter()`` to select
[pulpcore.plugin.models.Content][] or use e.g. `filter()` to select
a subset of the repository version's content.
order_by_params (tuple of str): The parameters for the ``order_by`` clause
for the content. The Default is ``("pk",)``. This needs to
order_by_params (tuple of str): The parameters for the `order_by` clause
for the content. The Default is `("pk",)`. This needs to
specify a stable order. For example, if you want to iterate by
decreasing creation time stamps use ``("-pulp_created", "pk")`` to
decreasing creation time stamps use `("-pulp_created", "pk")` to
ensure that content records are still sorted by primary key even
if their creation timestamp happens to be equal.
batch_size (int): The maximum batch size.
Expand All @@ -1072,8 +1065,8 @@ def content_batch_qs(self, content_qs=None, order_by_params=("pk",), batch_size=
[django.db.models.QuerySet][]: A QuerySet representing a slice of the content.

Example:
The following code could be used to loop over all ``FileContent`` in
``repository_version``. It prefetches the related
The following code could be used to loop over all `FileContent` in
`repository_version`. It prefetches the related
[pulpcore.plugin.models.ContentArtifact][] instances for every batch::

repository_version = ...
Expand Down Expand Up @@ -1126,8 +1119,8 @@ def added(self, base_version=None):
if not base_version:
return Content.objects.filter(version_memberships__version_added=self)

return Content.objects.filter(pk__in=self.content_ids).exclude(
pk__in=base_version.content_ids
return Content.objects.filter(safe_in("pk", self.content_ids)).exclude(
safe_in("pk", base_version.content_ids)
)

def removed(self, base_version=None):
Expand All @@ -1141,8 +1134,8 @@ def removed(self, base_version=None):
if not base_version:
return Content.objects.filter(version_memberships__version_removed=self)

return Content.objects.filter(pk__in=base_version.content_ids).exclude(
pk__in=self.content_ids
return Content.objects.filter(safe_in("pk", base_version.content_ids)).exclude(
safe_in("pk", self.content_ids)
)

def contains(self, content):
Expand Down
5 changes: 3 additions & 2 deletions pulpcore/app/tasks/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
compute_file_hash,
get_domain,
get_domain_pk,
safe_in,
)
from pulpcore.constants import TASK_STATES
from pulpcore.exceptions.plugin import MissingPlugin
Expand Down Expand Up @@ -410,14 +411,14 @@ def import_repository_version(
for repo_name, content_ids in mapping.items():
repo_name = _get_destination_repo_name(importer, repo_name)
dest_repo = Repository.objects.get(name=repo_name)
content = Content.objects.filter(upstream_id__in=content_ids)
content = Content.objects.filter(safe_in("upstream_id", content_ids))
content_count += len(content_ids)
with dest_repo.new_version() as new_version:
new_version.set_content(content)
else:
# just map all the content to our destination repo
dest_repo = Repository.objects.get(pk=dest_repo_pk)
content = Content.objects.filter(pk__in=resulting_content_ids)
content = Content.objects.filter(safe_in("pk", resulting_content_ids))
content_count += len(resulting_content_ids)
with dest_repo.new_version() as new_version:
new_version.set_content(content)
Expand Down
43 changes: 42 additions & 1 deletion pulpcore/app/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from django.apps import apps
from django.conf import settings
from django.db import connection
from django.db.models import Model, UUIDField
from django.db.models import Field, Lookup, Model, Q, UUIDField
from rest_framework.reverse import reverse as drf_reverse
from rest_framework.serializers import ValidationError

Expand All @@ -26,6 +26,47 @@
from pulpcore.app.loggers import deprecation_logger
from pulpcore.exceptions.validation import InvalidSignatureError

POSTGRES_MAX_QUERY_PARAMS = 65535


class AnyArray(Lookup):
"""PostgreSQL `= ANY(%s)` lookup that passes a list as a single array parameter.

psycopg3 adapts the Python list into a PostgreSQL array, so the entire list
counts as **one** bind parameter regardless of size. This avoids the
protocol-level 65535-parameter limit that `IN ($1, $2, …)` hits.
"""

lookup_name = "any_array"

def get_prep_lookup(self):
return [self.lhs.output_field.get_prep_value(v) for v in self.rhs]

def as_sql(self, compiler, connection):
lhs, lhs_params = self.process_lhs(compiler, connection)
return f"{lhs} = ANY(%s)", lhs_params + [list(self.rhs)]


Field.register_lookup(AnyArray)


def safe_in(field_name, values):
"""Build a `Q` object for `field__in` that is safe for arbitrarily large lists.

* If *values* is already a queryset (or other non-collection type), the
normal `__in` lookup is used — Django turns it into a subquery.
* If the collection has fewer than 65 535 items, `__in` is used as-is.
* Otherwise `__any_array` is used so the whole list travels as a single
PostgreSQL array parameter.
"""
if not isinstance(values, (list, set, tuple, frozenset)):
return Q(**{f"{field_name}__in": values})
values = list(values)
if len(values) < POSTGRES_MAX_QUERY_PARAMS:
return Q(**{f"{field_name}__in": values})
return Q(**{f"{field_name}__any_array": values})

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would there be a downside when we always used this array method?



# a little cache so viewset_for_model doesn't have to iterate over every app every time
_model_viewset_cache = {}

Expand Down
2 changes: 2 additions & 0 deletions pulpcore/plugin/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
raise_for_unknown_content_units,
resolve_prn,
reverse,
safe_in,
set_current_user,
set_domain,
)
Expand Down Expand Up @@ -59,5 +60,6 @@
"reverse",
"set_current_user",
"resolve_prn",
"safe_in",
"cache_key",
]
87 changes: 87 additions & 0 deletions pulpcore/tests/unit/models/test_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,3 +788,90 @@ def test_batch_operations_preserve_correctness(repository, db):
assert rvcd_qs.get(count_type=RepositoryVersionContentDetails.PRESENT).count == 40
assert rvcd_qs.filter(count_type=RepositoryVersionContentDetails.ADDED).first() is None
assert rvcd_qs.get(count_type=RepositoryVersionContentDetails.REMOVED).count == 60


def test_postgresql_parameter_limit(db, repository):
"""
Test repository operations with >65535 content units to verify PostgreSQL parameter limit
workaround.

PostgreSQL limits queries to 65535 parameters. This test verifies that content, added(),
and removed() all handle >65535 items correctly.

The safe_in() utility (used internally) avoids this limit by using ``= ANY(%s)``
(a single array parameter) for large value lists.

Queries MUST be evaluated via .iterator() because psycopg3 uses client-side binding for
regular queries (inlining params into the SQL string, bypassing the limit) but server-side
binding for server-side cursors (.iterator()), which enforces the 65,535 parameter cap.
Without .iterator() the test passes even when the fix is absent.
"""
# Create 66000 content units (exceeds PostgreSQL's 65535 parameter limit)
large_content_set = [Content(pulp_type="core.content") for _ in range(66000)]
Content.objects.bulk_create(large_content_set, batch_size=2000)
large_pks = sorted([c.pk for c in large_content_set])

version0 = repository.latest_version()

# Test 1: Add >65535 content units - tests added() and content with >65535 items
with repository.new_version() as version1:
version1.add_content(Content.objects.filter(pk__in=large_pks))

# Verify content_ids exceeds the PostgreSQL parameter limit threshold
assert isinstance(version1.content_ids, list)
assert len(version1.content_ids) >= 65535

# Test the content property with >65535 items
# .iterator() forces a server-side cursor — the only way to reliably trigger the
# 65,535-parameter limit in psycopg3.
content_pks = set(version1.content.values_list("pk", flat=True).iterator())
assert len(content_pks) == 66000

# Test the added() method with >65535 items
added_pks = set(version1.added(base_version=version0).values_list("pk", flat=True).iterator())
assert len(added_pks) == 66000 # Critical: added() must handle >65535 items

# Test the removed() method returns nothing (nothing was removed)
removed_pks = set(
version1.removed(base_version=version0).values_list("pk", flat=True).iterator()
)
assert len(removed_pks) == 0

# Verify RepositoryVersionContentDetails
rvcd_qs = RepositoryVersionContentDetails.objects.filter(
repository_version=version1, content_type="core.content"
)
assert rvcd_qs.get(count_type=RepositoryVersionContentDetails.PRESENT).count == 66000
assert rvcd_qs.get(count_type=RepositoryVersionContentDetails.ADDED).count == 66000
assert rvcd_qs.filter(count_type=RepositoryVersionContentDetails.REMOVED).first() is None

# Test 2: Remove >65535 content units - tests removed() with >65535 items
with repository.new_version() as version2:
version2.remove_content(Content.objects.filter(pk__in=large_pks))

# Test the content property returns nothing (all content was removed)
content_pks = set(version2.content.values_list("pk", flat=True).iterator())
assert len(content_pks) == 0

# Test the added() method returns nothing (nothing was added)
added_pks = set(version2.added(base_version=version1).values_list("pk", flat=True).iterator())
assert len(added_pks) == 0

# Test the removed() method with >65535 items
removed_pks = set(
version2.removed(base_version=version1).values_list("pk", flat=True).iterator()
)
assert len(removed_pks) == 66000 # Critical: removed() must handle >65535 items

# Verify RepositoryVersionContentDetails
rvcd_qs = RepositoryVersionContentDetails.objects.filter(
repository_version=version2, content_type="core.content"
)
assert rvcd_qs.filter(count_type=RepositoryVersionContentDetails.PRESENT).first() is None
assert rvcd_qs.filter(count_type=RepositoryVersionContentDetails.ADDED).first() is None
assert rvcd_qs.get(count_type=RepositoryVersionContentDetails.REMOVED).count == 66000

# Verify we can iterate and fetch content without errors
first_100 = list(version1.content[:100].values_list("pk", flat=True))
assert len(first_100) == 100
assert all(pk in large_pks for pk in first_100)
Loading