Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 109 additions & 2 deletions src/datajoint/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,8 +744,11 @@ def insert(
directory with a CSV file, the contents of which will be inserted.
replace : bool, optional
If True, replaces the existing tuple.
skip_duplicates : bool, optional
If True, silently skip duplicate inserts.
skip_duplicates : bool or str, optional
If True, silently skip rows whose primary key already exists.
If ``'match'``, skip only if the primary key exists AND all secondary
unique index values also match; raise DuplicateError if the primary
key exists but unique index values differ.
ignore_extra_fields : bool, optional
If False (default), fields that are not in the heading raise error.
allow_direct_insert : bool, optional
Expand Down Expand Up @@ -808,6 +811,8 @@ def insert(
quoted_fields = ",".join(self.adapter.quote_identifier(f) for f in fields)

# Duplicate handling (backend-agnostic)
if skip_duplicates == "match":
raise DataJointError("skip_duplicates='match' is not supported for QueryExpression inserts.")
if skip_duplicates:
duplicate = self.adapter.skip_duplicates_clause(self.full_table_name, self.primary_key)
else:
Expand All @@ -831,6 +836,96 @@ def insert(
# Single batch insert (original behavior)
self._insert_rows(rows, replace, skip_duplicates, ignore_extra_fields)

def _filter_match_duplicates(self, rows):
"""
Filter rows for skip_duplicates='match'.

For each row whose primary key already exists: skip silently if all
secondary unique index values also match the existing row; raise
DuplicateError if the primary key exists but any unique index value
differs.

Rows whose primary key is not yet in the table are kept for insert.
If a kept row subsequently violates a *non-PK* unique constraint at
insert time (e.g. a different row already owns that unique value, or a
concurrent insert created a conflict), the caller is responsible for
catching the resulting DuplicateError.

Parameters
----------
rows : list
Raw rows (dicts, numpy records, or sequences) before encoding.

Returns
-------
list
Rows that should be inserted (PK-duplicate matches removed).
"""
unique_col_sets = [list(cols) for cols, info in self.heading.indexes.items() if info["unique"]]

# --- normalise every row to a dict so we can inspect values ---
row_dicts = []
for row in rows:
if isinstance(row, np.void):
row_dicts.append({name: row[name] for name in row.dtype.fields})
elif isinstance(row, collections.abc.Mapping):
row_dicts.append(dict(row))
else:
row_dicts.append(dict(zip(self.heading.names, row)))

# --- batch-fetch existing rows whose PK matches any incoming row ---
pk_lookups = []
for rd in row_dicts:
pk = {k: rd[k] for k in self.primary_key if k in rd}
if len(pk) == len(self.primary_key):
pk_lookups.append(pk)

existing_by_pk: dict[tuple, dict] = {}
if pk_lookups:
existing_rows = (self & pk_lookups).fetch(as_dict=True)
for er in existing_rows:
pk_tuple = tuple(er[k] for k in self.primary_key)
existing_by_pk[pk_tuple] = er

# --- decide per row: insert, skip, or raise ---
result = []
for row, rd in zip(rows, row_dicts):
pk = {k: rd[k] for k in self.primary_key if k in rd}
if len(pk) < len(self.primary_key):
# incomplete PK — let the DB raise the real error
result.append(row)
continue

pk_tuple = tuple(pk[k] for k in self.primary_key)
existing_row = existing_by_pk.get(pk_tuple)
if existing_row is None:
# PK not yet in table — include for insert.
# If this row collides on a secondary unique index with a
# *different* existing row, the DB will raise at insert time.
result.append(row)
continue

# PK exists — check every secondary unique index
for cols in unique_col_sets:
for col in cols:
if col not in rd:
# Column not supplied by the caller; the DB will use
# its default. We cannot compare, so skip this column.
continue
new_val = rd[col]
old_val = existing_row.get(col)
if new_val != old_val:
raise DuplicateError(
f"Unique index conflict in {self.table_name}: "
f"primary key {pk} exists but unique index "
f"({', '.join(cols)}) differs — "
f"existing {col}={old_val!r}, new {col}={new_val!r}."
)

# All unique index values match (or there are none) — skip row.

return result

def _insert_rows(self, rows, replace, skip_duplicates, ignore_extra_fields):
"""
Internal helper to insert a batch of rows.
Expand All @@ -846,6 +941,11 @@ def _insert_rows(self, rows, replace, skip_duplicates, ignore_extra_fields):
ignore_extra_fields : bool
If True, ignore unknown fields.
"""
match_mode = skip_duplicates == "match"
if match_mode:
rows = self._filter_match_duplicates(list(rows))
skip_duplicates = False

# collects the field list from first row (passed by reference)
field_list = []
rows = list(self.__make_row_to_insert(row, field_list, ignore_extra_fields) for row in rows)
Expand Down Expand Up @@ -873,6 +973,13 @@ def _insert_rows(self, rows, replace, skip_duplicates, ignore_extra_fields):
except UnknownAttributeError as err:
raise err.suggest("To ignore extra fields in insert, set ignore_extra_fields=True")
except DuplicateError as err:
if match_mode:
raise DuplicateError(
f"Duplicate entry during skip_duplicates='match' insert into "
f"{self.table_name}. A row with a new primary key may conflict on "
f"a secondary unique index with an existing row, or a concurrent "
f"insert created a conflict. Original error: {err}"
)
raise err.suggest("To ignore duplicate entries in insert, set skip_duplicates=True")

def insert_dataframe(self, df, index_as_pk=None, **insert_kwargs):
Expand Down
Loading