[Improve](streaming-job) use per-table publication instead of ALL TABLES for PostgreSQL CDC#62526
[Improve](streaming-job) use per-table publication instead of ALL TABLES for PostgreSQL CDC#62526JNSimba wants to merge 8 commits intoapache:masterfrom
Conversation
…LES for PostgreSQL CDC Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
/review |
There was a problem hiding this comment.
Findings
- Blocking:
publication_nameis accepted as arbitrary non-empty text in FE, but both the create/update path and the cleanup path splice it directly into SQL (fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java,fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java). A user-provided name such asMyPuborpub-nameis valid only when quoted in PostgreSQL, so this feature currently accepts values that later fail at runtime. - Blocking: auto-created-resource cleanup is detected only by prefix (
doris_pub_/doris_cdc_) infs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java. That violates the PR contract that user-provided names are preserved: if a user explicitly setspublication_name = doris_pub_sharedorslot_name = doris_cdc_shared,DROP JOBwill still delete that external resource.
Critical Checkpoints
- Goal of current task: Partially met. Happy-path per-table publication / SHOW visibility are implemented and covered, but the two blockers above break custom-name support and cleanup semantics.
- Modification size / focus: Yes, mostly focused.
- Concurrency: No new concurrency-sensitive path identified.
- Lifecycle management: Needs work. External PostgreSQL resource ownership is inferred from name prefixes instead of persisted ownership.
- Configuration items: New source properties were added, but validation is incomplete for PostgreSQL identifier rules and cleanup semantics.
- Incompatible changes: No FE/BE protocol or storage compatibility issue found.
- Parallel code paths: No matching non-PostgreSQL path appears to need the same publication logic.
- Special conditional checks: Prefix-based auto-created detection is not a safe condition.
- Test coverage: Positive regression and unit coverage exists; missing negative tests for custom identifiers and user-provided prefixed names.
- Observability: Sufficient for this change.
- Transaction / persistence: Source property persistence and replay path looks okay.
- Data writes / modifications: Unsafe external-resource deletion behavior remains.
- FE-BE variable passing: Not applicable.
- Performance: No major concern found.
- Other issues: None beyond the blockers above.
There was a problem hiding this comment.
Pull request overview
Updates Doris PostgreSQL CDC streaming jobs to use per-table PostgreSQL publications (filtered mode) and to persist/show slot/publication names via job source properties, improving WAL decoding efficiency and operational visibility.
Changes:
- Add
slot_name/publication_namesource properties (auto-generated by FE when omitted) and surface them inSHOW JOBSoutput. - Switch Debezium publication autocreation to
filteredmode and addpublish_via_partition_root=truefor PG 13+ when creating filtered publications. - Add regression + unit tests covering generated names, per-table publication contents, and cleanup on job drop.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_publication.groovy | New regression test validating filtered publications, name visibility, and cleanup behavior. |
| regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_publication.out | Expected snapshot query outputs for the new regression test. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java | Configure publication name + filtered autocreation; implement slot/publication cleanup logic. |
| fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java | Add publish_via_partition_root to filtered-mode CREATE PUBLICATION on PG 13+. |
| fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java | Unit tests for default slot/publication naming helpers and validator acceptance. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java | Populate default slot/publication names into sourceProperties for PG jobs so they persist and show up in SHOW/TVF. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java | Allow slot_name and publication_name keys in streaming job source properties. |
| fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java | Add constants + helper methods to generate bounded-length default slot/publication names with UUID suffix. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
run buildall |
FE UT Coverage ReportIncrement line coverage |
FE Regression Coverage ReportIncrement line coverage |
…ship and validation
- FE: validate slot_name/publication_name as PostgreSQL unquoted identifiers
(^[a-z_][a-z0-9_]*$, <= 63 chars) to reject malformed or malicious input
before it reaches PG DDL.
- BE: replace prefix-based ownership check with strict regex matching
"doris_cdc_{jobId}_{8hex}" / "doris_pub_{jobId}_{8hex}" so user-supplied
names (including doris_cdc_shared / doris_pub_shared) are never dropped.
- BE: when user supplies slot/publication, set publication.autocreate.mode
to DISABLED and pre-validate that the publication exists and covers every
table in include_tables - fail fast instead of corrupting a user-owned
publication via ALTER PUBLICATION SET TABLE.
- Use PostgresConnectorConfig.PUBLICATION_NAME / AutoCreateMode constants
and DataSourceConfigKeys.DEFAULT_*_PREFIX instead of duplicating naming.
- Regression test: numeric comparison for SucceedTaskCount (avoid
lexicographic pitfall at 10+), replace fixed sleep(5000) with Awaitility
polling for cleanup assertions.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
/review |
There was a problem hiding this comment.
I found blocking compatibility and resource lifecycle regressions in the PostgreSQL CDC changes.
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java: legacy PostgreSQL CDC jobs created before this PR are no longer restart compatible. Before this change the reader never setpublication.name, so those jobs ran against the Debezium defaultdbz_publicationwithALL_TABLES. This patch now falls back todoris_pub_<jobId>, classifies the job as user managed, setspublication.autocreate.mode=disabled, and validates that the new publication exists. Replayed jobs whose persistedsourcePropertiesdo not containpublication_namewill fail initialization after upgrade, and their legacydoris_cdc_<jobId>slot will also never be cleaned up on drop.fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java: cleanup ownership is inferred from the current slot and publication names. Because these source properties remain mutable throughALTER JOB, changing either name after creation makesclose()skip cleanup and leaks the original Doris created slot or publication. Ownership needs to be persisted at creation time, or these properties need to be immutable.
Critical checkpoint conclusions:
- Goal: the PR aims to add per table PostgreSQL publications, custom slot and publication names, SHOW visibility, and cleanup. The new job happy path is covered, but the current code does not safely preserve existing jobs or resource lifecycle.
- Minimality and focus: the change stays localized, but it couples new naming, ownership inference, and cleanup without persisting compatibility or ownership state.
- Concurrency: no new lock order or heavy under lock issue stood out; the reader still uses the existing slot creation lock.
- Lifecycle: slot and publication lifecycle tracking is not reliable because it depends on current name patterns rather than persisted ownership or legacy metadata.
- Configuration items: new source properties are validated, but compatibility for already persisted jobs is incomplete.
- Incompatible changes: yes. Publication naming and default behavior changed without compatibility handling for pre existing persisted jobs.
- Parallel paths: replay or upgrade and ALTER JOB paths are not updated consistently with the create path.
- Special conditions:
isAutoGenerated()is too strong as an ownership predicate; it misses legacy Doris managed resources and any auto resource after name mutation. - Test coverage: validator unit tests and one regression test cover only the fresh job happy path; there is no replay or upgrade or ALTER and cleanup coverage for the new semantics.
- Observability: validation errors are actionable, but the cleanup path still logs only the message on failure, which leaves less detail for diagnosis.
- Transaction and persistence: no FE journal format change is needed, but persisted
sourcePropertiesfrom older jobs are not interpreted compatibly. - Data modification or external state: PostgreSQL publication and slot lifecycle can be broken or leaked.
- FE BE variable passing: not applicable.
- Performance: no material hot path issue stood out in the touched code.
…y ownership
Drop the uuid suffix from default slot/publication names (now
doris_cdc_{jobId} / doris_pub_{jobId}) and stop populating them into
sourceProperties. The cdc client derives defaults at runtime, and the
SHOW layer surfaces them only for display so user intent ("unspecified
= auto") stays intact in storage.
User-provided vs auto ownership is now purely driven by whether
slot_name and publication_name are both set. Mixed specification is
rejected. In user-provided mode Doris touches no PG resources - slot
creation is skipped and close() leaves both intact. In auto mode
initialize detects cross-cluster conflicts by comparing publication
table sets and checking that the slot is not already active.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…able after creation Ownership (Doris-managed vs user-managed) is decided at create time from whether slot_name / publication_name are both set. Letting ALTER JOB rewrite them afterwards would either orphan the resources Doris allocated, or hand Doris a user-owned name to drop on cleanup. The names are also bound to the replication slot's LSN state and cannot be swapped for a live stream. Reject modifications in both ALTER paths (FROM source / cdc_stream TVF) and cover the behavior in the publication regression test. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…tion code Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…urce Drop the pairing requirement and let slot_name and publication_name be specified independently. Each is "user-owned" iff its own name is set; Doris creates and drops only the resources it owns. Common patterns now work, e.g. omit slot_name to let Doris manage the slot while pointing at a shared user-managed publication. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add missing output blocks for the user-provided slot/publication case and scope the mixed-mode slot cleanup assertion to this job's auto slot, so leftover slots from other suites on a shared PG instance don't fail the test. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run buildall |
FE UT Coverage ReportIncrement line coverage |
FE Regression Coverage ReportIncrement line coverage |
What problem does this PR solve?
Problem Summary:
Previously, PostgreSQL CDC streaming jobs created publications using
FOR ALL TABLES, which monitors all tables in the database regardless of which tables the job actually needs to capture. This causes unnecessary WAL decoding overhead and network traffic. Additionally, users could not specify customslot_nameorpublication_name, and these values were not visible inSHOW JOBS.This PR makes the following improvements:
FOR TABLE table1, table2instead ofFOR ALL TABLES, only including tables specified ininclude_tables.slot_nameandpublication_namein source properties. If not specified, auto-generated names with formatdoris_cdc_{jobId}_{uuid8}/doris_pub_{jobId}_{uuid8}are used.sourcePropertiesand visible viaSHOW JOBS/ TVF query.publish_via_partition_root = trueoption for PostgreSQL 13+ in FILTERED mode (previously only in ALL_TABLES mode).doris_pub_/doris_cdc_prefix) are dropped when the job is deleted; user-provided ones are preserved.Release note
Support per-table PostgreSQL publication for streaming CDC jobs. Users can now optionally specify
slot_nameandpublication_namein source properties. Auto-generated slot/publication names are visible in SHOW JOBS output.Check List (For Author)
Test
Behavior changed:
FOR ALL TABLEStoFOR TABLE(filtered mode). Slot and publication names now use{jobId}_{uuid8}suffix instead of just{jobId}. Both are now visible in SHOW JOBS output.Does this need documentation?
Check List (For Reviewer who merge this PR)