diff --git a/src/datajoint/table.py b/src/datajoint/table.py index 256fab6e9..0b7fc0969 100644 --- a/src/datajoint/table.py +++ b/src/datajoint/table.py @@ -744,8 +744,11 @@ def insert( directory with a CSV file, the contents of which will be inserted. replace : bool, optional If True, replaces the existing tuple. - skip_duplicates : bool, optional - If True, silently skip duplicate inserts. + skip_duplicates : bool or str, optional + If True, silently skip rows whose primary key already exists. + If ``'match'``, skip only if the primary key exists AND all secondary + unique index values also match; raise DuplicateError if the primary + key exists but unique index values differ. ignore_extra_fields : bool, optional If False (default), fields that are not in the heading raise error. allow_direct_insert : bool, optional @@ -808,6 +811,8 @@ def insert( quoted_fields = ",".join(self.adapter.quote_identifier(f) for f in fields) # Duplicate handling (backend-agnostic) + if skip_duplicates == "match": + raise DataJointError("skip_duplicates='match' is not supported for QueryExpression inserts.") if skip_duplicates: duplicate = self.adapter.skip_duplicates_clause(self.full_table_name, self.primary_key) else: @@ -831,6 +836,96 @@ def insert( # Single batch insert (original behavior) self._insert_rows(rows, replace, skip_duplicates, ignore_extra_fields) + def _filter_match_duplicates(self, rows): + """ + Filter rows for skip_duplicates='match'. + + For each row whose primary key already exists: skip silently if all + secondary unique index values also match the existing row; raise + DuplicateError if the primary key exists but any unique index value + differs. + + Rows whose primary key is not yet in the table are kept for insert. + If a kept row subsequently violates a *non-PK* unique constraint at + insert time (e.g. a different row already owns that unique value, or a + concurrent insert created a conflict), the caller is responsible for + catching the resulting DuplicateError. + + Parameters + ---------- + rows : list + Raw rows (dicts, numpy records, or sequences) before encoding. + + Returns + ------- + list + Rows that should be inserted (PK-duplicate matches removed). + """ + unique_col_sets = [list(cols) for cols, info in self.heading.indexes.items() if info["unique"]] + + # --- normalise every row to a dict so we can inspect values --- + row_dicts = [] + for row in rows: + if isinstance(row, np.void): + row_dicts.append({name: row[name] for name in row.dtype.fields}) + elif isinstance(row, collections.abc.Mapping): + row_dicts.append(dict(row)) + else: + row_dicts.append(dict(zip(self.heading.names, row))) + + # --- batch-fetch existing rows whose PK matches any incoming row --- + pk_lookups = [] + for rd in row_dicts: + pk = {k: rd[k] for k in self.primary_key if k in rd} + if len(pk) == len(self.primary_key): + pk_lookups.append(pk) + + existing_by_pk: dict[tuple, dict] = {} + if pk_lookups: + existing_rows = (self & pk_lookups).fetch(as_dict=True) + for er in existing_rows: + pk_tuple = tuple(er[k] for k in self.primary_key) + existing_by_pk[pk_tuple] = er + + # --- decide per row: insert, skip, or raise --- + result = [] + for row, rd in zip(rows, row_dicts): + pk = {k: rd[k] for k in self.primary_key if k in rd} + if len(pk) < len(self.primary_key): + # incomplete PK — let the DB raise the real error + result.append(row) + continue + + pk_tuple = tuple(pk[k] for k in self.primary_key) + existing_row = existing_by_pk.get(pk_tuple) + if existing_row is None: + # PK not yet in table — include for insert. + # If this row collides on a secondary unique index with a + # *different* existing row, the DB will raise at insert time. + result.append(row) + continue + + # PK exists — check every secondary unique index + for cols in unique_col_sets: + for col in cols: + if col not in rd: + # Column not supplied by the caller; the DB will use + # its default. We cannot compare, so skip this column. + continue + new_val = rd[col] + old_val = existing_row.get(col) + if new_val != old_val: + raise DuplicateError( + f"Unique index conflict in {self.table_name}: " + f"primary key {pk} exists but unique index " + f"({', '.join(cols)}) differs — " + f"existing {col}={old_val!r}, new {col}={new_val!r}." + ) + + # All unique index values match (or there are none) — skip row. + + return result + def _insert_rows(self, rows, replace, skip_duplicates, ignore_extra_fields): """ Internal helper to insert a batch of rows. @@ -846,6 +941,11 @@ def _insert_rows(self, rows, replace, skip_duplicates, ignore_extra_fields): ignore_extra_fields : bool If True, ignore unknown fields. """ + match_mode = skip_duplicates == "match" + if match_mode: + rows = self._filter_match_duplicates(list(rows)) + skip_duplicates = False + # collects the field list from first row (passed by reference) field_list = [] rows = list(self.__make_row_to_insert(row, field_list, ignore_extra_fields) for row in rows) @@ -873,6 +973,13 @@ def _insert_rows(self, rows, replace, skip_duplicates, ignore_extra_fields): except UnknownAttributeError as err: raise err.suggest("To ignore extra fields in insert, set ignore_extra_fields=True") except DuplicateError as err: + if match_mode: + raise DuplicateError( + f"Duplicate entry during skip_duplicates='match' insert into " + f"{self.table_name}. A row with a new primary key may conflict on " + f"a secondary unique index with an existing row, or a concurrent " + f"insert created a conflict. Original error: {err}" + ) raise err.suggest("To ignore duplicate entries in insert, set skip_duplicates=True") def insert_dataframe(self, df, index_as_pk=None, **insert_kwargs):