Skip to content

[SPARK-56856][SDP] Implement SCD1 Batch Processor; Microbatch Deduplication#55969

Open
AnishMahto wants to merge 19 commits into
apache:masterfrom
AnishMahto:SPARK-56856-SCD1-microbatch-deduplication
Open

[SPARK-56856][SDP] Implement SCD1 Batch Processor; Microbatch Deduplication#55969
AnishMahto wants to merge 19 commits into
apache:masterfrom
AnishMahto:SPARK-56856-SCD1-microbatch-deduplication

Conversation

@AnishMahto
Copy link
Copy Markdown
Contributor

Approved AutoCDC SPIP: https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7


This is a stacked PR. Review incremental diff here: AnishMahto/spark@SPARK-56838-introduce-ChangeArgs...SPARK-56856-SCD1-microbatch-deduplication


Preamble:

The SCD type 1 flow is a foreachBatch streaming query on an input change-data-feed, and is responsible for reconciling the incoming change data onto some target table that follows SCD1 replication semantics.

SCD1 flows also maintain an "auxiliary" table to keep track of early-arriving out-of-order received events state. Each microbatch will need to reconcile against this auxiliary table as well, and update the auxiliary table's state appropriately for future microbatches.

Microbatch Deduplication:

The first step of microbatch reconciliation for SCD1 is deduplicating the microbatch such that there is a single row per key.

Since SCD1 is only concerned with maintaining latest state per key from the change data source, within a microbatch we only care about the row with the latest sequencing per key - drop all other rows for that same key.

@AnishMahto AnishMahto changed the title [SPARK-56856][SDP] SCD1 Microbatch Deduplication [SPARK-56856][SDP] Implement SCD1 Batch Processor; Microbatch Deduplication May 19, 2026
Copy link
Copy Markdown
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review of the incremental SCD1 microbatch dedup diff (on top of #55836). A few nits below.

*
* The schema of the returned dataframe matches the schema of the microbatch exactly.
*/
def deduplicateMicrobatch(microbatchDf: DataFrame): DataFrame = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scaladoc documents tie-breaking and null sequencing behavior; consider adding tests for:

  1. Equal sequencing for the same key — even a lightweight test that documents non-determinism (or runs twice) would lock in the contract.
  2. Null sequencingmax_by has subtle null ordering (see DataFrameAggregateSuite "max_by"); worth defining expected CDC behavior or asserting we reject nulls upstream.
  3. Single row per key (no-op) — cheap sanity check that one input row passes through unchanged.

Not blocking if you prefer to add these when merge logic lands.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added these tests but just FYI I'm actually going to add microbatch validation to disallow null sequencing when I put together the foreachBatch body in https://issues.apache.org/jira/browse/SPARK-56953.

.toImmutableArraySeq

microbatchDf
.groupBy(changeArgs.keys.map(k => F.col(k.quoted)): _*)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If changeArgs.keys is empty, groupBy() collapses the entire microbatch into a single group (one output row). Worth guarding with require(changeArgs.keys.nonEmpty, ...) here or validating at ChangeArgs construction in the registration PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we will be validating against empty keys on ChangeArgs construction once we get to AutoCDC flow registration within the SDP engine - both SCD1/SCD2 semantics would break if there is an empty key set.

def deduplicateMicrobatch(microbatchDf: DataFrame): DataFrame = {
// The `max_by` API can only return a single column, so pack/unpack the entire row into a
// temporary column before and after the `max_by` operation.
val winningRowCol = OutOfOrderCdcMergeUtils.tempColName("__winning_row")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tempColName generates a fresh UUID on every deduplicateMicrobatch call, so the logical plan column name differs across invocations. Fine for correctness; just a heads-up if you later add plan-golden / EXPLAIN tests — you may want a stable internal name with a collision-safe prefix instead. Non-blocking.

Copy link
Copy Markdown
Contributor Author

@AnishMahto AnishMahto May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good callout. I see that Spark CDC uses the "__spark_cdc" reserved prefix, so I'm choosing to adopt "__spark_autocdc" as the reserved prefix for SDP system column names.

@AnishMahto AnishMahto requested a review from szehon-ho May 19, 2026 21:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants