From ba40a9c25c746258331fe72c6de578314609ae87 Mon Sep 17 00:00:00 2001 From: Dimitri Yatsenko Date: Fri, 27 Mar 2026 14:20:41 -0500 Subject: [PATCH 1/2] feat: add skip_duplicates='match' mode to insert/insert1 (fixes #1049) New skip_duplicates='match' option: a row is skipped only if a row with the same primary key already exists AND all secondary unique index values also match. If the primary key exists but unique index values differ, DuplicateError is raised. Compared to skip_duplicates=True (which silently skips any row whose primary key is already present), 'match' mode detects when incoming data conflicts with existing data on non-PK unique constraints. Implementation: two-query approach (select-then-insert) via the new _filter_match_duplicates() method, which works identically for both MySQL and PostgreSQL backends. Unique index metadata is read from self.heading.indexes (populated at heading load time). Not supported for QueryExpression inserts (raises DataJointError). Co-Authored-By: Claude Sonnet 4.6 (1M context) --- src/datajoint/table.py | 76 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 74 insertions(+), 2 deletions(-) diff --git a/src/datajoint/table.py b/src/datajoint/table.py index 256fab6e9..f85b9e53e 100644 --- a/src/datajoint/table.py +++ b/src/datajoint/table.py @@ -744,8 +744,11 @@ def insert( directory with a CSV file, the contents of which will be inserted. replace : bool, optional If True, replaces the existing tuple. - skip_duplicates : bool, optional - If True, silently skip duplicate inserts. + skip_duplicates : bool or str, optional + If True, silently skip rows whose primary key already exists. + If ``'match'``, skip only if the primary key exists AND all secondary + unique index values also match; raise DuplicateError if the primary + key exists but unique index values differ. ignore_extra_fields : bool, optional If False (default), fields that are not in the heading raise error. allow_direct_insert : bool, optional @@ -808,6 +811,8 @@ def insert( quoted_fields = ",".join(self.adapter.quote_identifier(f) for f in fields) # Duplicate handling (backend-agnostic) + if skip_duplicates == "match": + raise DataJointError("skip_duplicates='match' is not supported for QueryExpression inserts.") if skip_duplicates: duplicate = self.adapter.skip_duplicates_clause(self.full_table_name, self.primary_key) else: @@ -831,6 +836,69 @@ def insert( # Single batch insert (original behavior) self._insert_rows(rows, replace, skip_duplicates, ignore_extra_fields) + def _filter_match_duplicates(self, rows): + """ + Filter rows for skip_duplicates='match'. + + For each row: if a row with the same primary key already exists and all + secondary unique index values also match, skip the row silently. + If the primary key exists but unique index values differ, raise DuplicateError. + + Parameters + ---------- + rows : list + Raw rows (dicts, numpy records, or sequences) before encoding. + + Returns + ------- + list + Rows that should be inserted. + """ + unique_col_sets = [list(cols) for cols, info in self.heading.indexes.items() if info["unique"]] + + result = [] + for row in rows: + # Normalize row to dict + if isinstance(row, np.void): + row_dict = {name: row[name] for name in row.dtype.fields} + elif isinstance(row, collections.abc.Mapping): + row_dict = dict(row) + else: + row_dict = dict(zip(self.heading.names, row)) + + # Build PK restriction + pk_dict = {pk: row_dict[pk] for pk in self.primary_key if pk in row_dict} + if len(pk_dict) < len(self.primary_key): + result.append(row) + continue + + existing = (self & pk_dict).fetch(limit=1, as_dict=True) + if not existing: + result.append(row) + continue + + existing_row = existing[0] + + # Check all unique index columns for a match + all_match = True + for cols in unique_col_sets: + for col in cols: + if col in row_dict and col in existing_row: + if row_dict[col] != existing_row[col]: + all_match = False + break + if not all_match: + break + + if not all_match: + raise DuplicateError( + f"Unique index conflict in {self.table_name}: " + f"a row with the same primary key exists but unique index values differ." + ) + # else: silently skip — existing row is an exact match + + return result + def _insert_rows(self, rows, replace, skip_duplicates, ignore_extra_fields): """ Internal helper to insert a batch of rows. @@ -846,6 +914,10 @@ def _insert_rows(self, rows, replace, skip_duplicates, ignore_extra_fields): ignore_extra_fields : bool If True, ignore unknown fields. """ + if skip_duplicates == "match": + rows = self._filter_match_duplicates(list(rows)) + skip_duplicates = False + # collects the field list from first row (passed by reference) field_list = [] rows = list(self.__make_row_to_insert(row, field_list, ignore_extra_fields) for row in rows) From 7a837aaa0bf0a31d34e0d5782f571d5f0d59d3fc Mon Sep 17 00:00:00 2001 From: Dimitri Yatsenko Date: Fri, 27 Mar 2026 15:29:04 -0500 Subject: [PATCH 2/2] refactor: redesign skip_duplicates='match' for correctness Address several edge cases in the 'match' mode implementation: - Batch PK fetch: replace row-by-row SELECT with a single batch query for all incoming PKs. Reduces O(n) queries to O(1). - Better error messages: report the specific unique index and column values that differ, not just "values differ". - Non-PK unique conflicts: when a row's PK is new but it collides on a secondary unique index with a *different* existing row, the DB-level DuplicateError is now caught and re-raised with context explaining the likely cause (secondary unique or race condition). - Race conditions: if a concurrent insert creates a PK that passed the pre-check, the DB-level error is caught and reported clearly. - Columns not supplied: if a unique index column is not in the incoming row dict (DB will use its default), skip comparison for that column rather than false-flagging a mismatch. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/datajoint/table.py | 99 ++++++++++++++++++++++++++++-------------- 1 file changed, 67 insertions(+), 32 deletions(-) diff --git a/src/datajoint/table.py b/src/datajoint/table.py index f85b9e53e..0b7fc0969 100644 --- a/src/datajoint/table.py +++ b/src/datajoint/table.py @@ -840,9 +840,16 @@ def _filter_match_duplicates(self, rows): """ Filter rows for skip_duplicates='match'. - For each row: if a row with the same primary key already exists and all - secondary unique index values also match, skip the row silently. - If the primary key exists but unique index values differ, raise DuplicateError. + For each row whose primary key already exists: skip silently if all + secondary unique index values also match the existing row; raise + DuplicateError if the primary key exists but any unique index value + differs. + + Rows whose primary key is not yet in the table are kept for insert. + If a kept row subsequently violates a *non-PK* unique constraint at + insert time (e.g. a different row already owns that unique value, or a + concurrent insert created a conflict), the caller is responsible for + catching the resulting DuplicateError. Parameters ---------- @@ -852,50 +859,70 @@ def _filter_match_duplicates(self, rows): Returns ------- list - Rows that should be inserted. + Rows that should be inserted (PK-duplicate matches removed). """ unique_col_sets = [list(cols) for cols, info in self.heading.indexes.items() if info["unique"]] - result = [] + # --- normalise every row to a dict so we can inspect values --- + row_dicts = [] for row in rows: - # Normalize row to dict if isinstance(row, np.void): - row_dict = {name: row[name] for name in row.dtype.fields} + row_dicts.append({name: row[name] for name in row.dtype.fields}) elif isinstance(row, collections.abc.Mapping): - row_dict = dict(row) + row_dicts.append(dict(row)) else: - row_dict = dict(zip(self.heading.names, row)) - - # Build PK restriction - pk_dict = {pk: row_dict[pk] for pk in self.primary_key if pk in row_dict} - if len(pk_dict) < len(self.primary_key): + row_dicts.append(dict(zip(self.heading.names, row))) + + # --- batch-fetch existing rows whose PK matches any incoming row --- + pk_lookups = [] + for rd in row_dicts: + pk = {k: rd[k] for k in self.primary_key if k in rd} + if len(pk) == len(self.primary_key): + pk_lookups.append(pk) + + existing_by_pk: dict[tuple, dict] = {} + if pk_lookups: + existing_rows = (self & pk_lookups).fetch(as_dict=True) + for er in existing_rows: + pk_tuple = tuple(er[k] for k in self.primary_key) + existing_by_pk[pk_tuple] = er + + # --- decide per row: insert, skip, or raise --- + result = [] + for row, rd in zip(rows, row_dicts): + pk = {k: rd[k] for k in self.primary_key if k in rd} + if len(pk) < len(self.primary_key): + # incomplete PK — let the DB raise the real error result.append(row) continue - existing = (self & pk_dict).fetch(limit=1, as_dict=True) - if not existing: + pk_tuple = tuple(pk[k] for k in self.primary_key) + existing_row = existing_by_pk.get(pk_tuple) + if existing_row is None: + # PK not yet in table — include for insert. + # If this row collides on a secondary unique index with a + # *different* existing row, the DB will raise at insert time. result.append(row) continue - existing_row = existing[0] - - # Check all unique index columns for a match - all_match = True + # PK exists — check every secondary unique index for cols in unique_col_sets: for col in cols: - if col in row_dict and col in existing_row: - if row_dict[col] != existing_row[col]: - all_match = False - break - if not all_match: - break + if col not in rd: + # Column not supplied by the caller; the DB will use + # its default. We cannot compare, so skip this column. + continue + new_val = rd[col] + old_val = existing_row.get(col) + if new_val != old_val: + raise DuplicateError( + f"Unique index conflict in {self.table_name}: " + f"primary key {pk} exists but unique index " + f"({', '.join(cols)}) differs — " + f"existing {col}={old_val!r}, new {col}={new_val!r}." + ) - if not all_match: - raise DuplicateError( - f"Unique index conflict in {self.table_name}: " - f"a row with the same primary key exists but unique index values differ." - ) - # else: silently skip — existing row is an exact match + # All unique index values match (or there are none) — skip row. return result @@ -914,7 +941,8 @@ def _insert_rows(self, rows, replace, skip_duplicates, ignore_extra_fields): ignore_extra_fields : bool If True, ignore unknown fields. """ - if skip_duplicates == "match": + match_mode = skip_duplicates == "match" + if match_mode: rows = self._filter_match_duplicates(list(rows)) skip_duplicates = False @@ -945,6 +973,13 @@ def _insert_rows(self, rows, replace, skip_duplicates, ignore_extra_fields): except UnknownAttributeError as err: raise err.suggest("To ignore extra fields in insert, set ignore_extra_fields=True") except DuplicateError as err: + if match_mode: + raise DuplicateError( + f"Duplicate entry during skip_duplicates='match' insert into " + f"{self.table_name}. A row with a new primary key may conflict on " + f"a secondary unique index with an existing row, or a concurrent " + f"insert created a conflict. Original error: {err}" + ) raise err.suggest("To ignore duplicate entries in insert, set skip_duplicates=True") def insert_dataframe(self, df, index_as_pk=None, **insert_kwargs):