[SPARK-56870][SDP] Implement SCD1 Batch Processor; Extend Microbatch with CDC Metadata#55970
[SPARK-56870][SDP] Implement SCD1 Batch Processor; Extend Microbatch with CDC Metadata#55970AnishMahto wants to merge 16 commits into
Conversation
552e33c to
9a566ff
Compare
szehon-ho
left a comment
There was a problem hiding this comment.
Review of the incremental diff on top of #55969 (extend microbatch with CDC metadata). Overall this looks good to merge with minor nits.
What looks good
- The delete/upsert encoding in
_cdc_metadatamatches the SPIP story: mutually exclusivedeleteSequence/upsertSequence, persisted beforecolumnSelectioncan dropdeleteConditioncolumns. resolvedSequencingTypeat processor construction is the right split (flow setup vs per-microbatch work); the Int→Long cast test and incompatible cast test are valuable.- Reserved-column conflict uses
conf.resolverandCaseSensitivityLabels— consistent with session case sensitivity. constructCdcMetadataColdriven offcdcMetadataColSchemawith ordered fields is clean; companion constants keep tests readable.AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT/ SQLSTATE42710is appropriate.- Test coverage for classification, no delete condition, column ordering, cast success/failure, and reserved-name conflict is solid.
Incremental diff is focused and stacks cleanly on #55836 + #55969.
9a566ff to
f9c2aed
Compare
f9c2aed to
02473ba
Compare
szehon-ho
left a comment
There was a problem hiding this comment.
Re-reviewed the incremental diff on #55969. CDC metadata encoding and resolvedSequencingType casting look correct; reserved-column validation and tests LGTM.
Left three inline nits (comment wording, extend input contract, reserved-prefix scope) — all non-blocking. Approved.
| resolvedSequencingType = LongType | ||
| ) | ||
|
|
||
| // Mutual-exclusivity invariant: each row's _cdc_metadata struct has exactly one of |
There was a problem hiding this comment.
Nit (non-blocking): this comment still says _cdc_metadata; the implementation uses Scd1BatchProcessor.cdcMetadataColName (__spark_autocdc_metadata). Consider aligning the comment with the constant so future readers are not confused.
| * The returned dataframe has all of the columns in the input microbatch + the CDC metadata | ||
| * column. | ||
| */ | ||
| def extendMicrobatchRowsWithCdcMetadata(microbatchDf: DataFrame): DataFrame = { |
There was a problem hiding this comment.
Nit (non-blocking): deduplicateMicrobatch documents a validated microbatch (non-null, orderable sequencing). Consider mirroring that contract here — e.g. rename microbatchDf → validatedMicrobatch and add a scaladoc @param — so foreachBatch wiring keeps the same precondition for both steps.
| ) | ||
| } | ||
|
|
||
| private def validateCdcMetadataColumnNotPresent(microbatchDf: DataFrame): Unit = { |
There was a problem hiding this comment.
Nit (non-blocking): this PR only guards __spark_autocdc_metadata on the microbatch (__spark_autocdc_winning_row is covered in #55969 for dedup). When target/auxiliary schemas are wired, worth applying the same reserved-prefix policy there in a follow-up so user columns cannot collide with persisted AutoCDC metadata.
Approved AutoCDC SPIP: https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7
This is a stacked PR. Review incremental diff here: AnishMahto/spark@SPARK-56856-SCD1-microbatch-deduplication...SPARK-56870-extend-microbatch-with-cdc-metadata
Preamble:
The SCD type 1 flow is a foreachBatch streaming query on an input change-data-feed, and is responsible for reconciling the incoming change data onto some target table that follows SCD1 replication semantics.
SCD1 flows also maintain an "auxiliary" table to keep track of early-arriving out-of-order received events state. Each microbatch will need to reconcile against this auxiliary table as well, and update the auxiliary table's state appropriately for future microbatches.
Extend Microbatch with CDC Metadata:
After deduplication, all of the incoming rows can be classified as either a delete event or an upsert event (mutually exclusive), and there's at most one per key.
If we identify a row as a delete event, remember its sequencing as its
deleteSequence. If we identify a row as an upsert event, remember its sequencing as itsupsertSequence. That is,deleteSequence/upsertSequenceencode both the sequencing for the row as well as the row classification (delete or upsert).We need to persist this encoded information now, because in future stages we may drop the columns that
deleteConditionneeded to do the classification in the first place, depending on which columns were selected byChangeArgs.columnSelection.Where is the CDC Metadata stored?
Within the microbatch, we append a
_cdc_metadatastruct column, that stores thedeleteSequenceandupsertSequence.This
_cdc_metadatacolumn will eventually also land in the persisted target and auxiliary tables, which are the artifacts of an AutoCDC flow. This column represents operational metadata that the AutoCDC flow has tagged a row with, and is necessary for out-of-order correctness of the SCD decomposition.Users will not be able to opt out of persisting this column in the target table using
ChangeArgs.columnSelection, as it is necessary for correctness. The column will not have a stable public contract, and users should make no assumptions on its contents.