From f96507628f974e4bd48587f3e0bf169278cbdb24 Mon Sep 17 00:00:00 2001 From: wudi Date: Fri, 17 Apr 2026 14:48:06 +0800 Subject: [PATCH 1/3] [Improve](streaming-job) harden PG slot/publication ownership and validation - Use per-table publication (pgoutput publish_via_partition_root=true via Debezium fork) instead of ALL TABLES, so non-CDC tables are not streamed. - Split ownership per-resource: slot_name / publication_name are independently user-owned or Doris-owned. Doris only creates/drops what it owns, with deterministic default names doris_cdc_{jobId} / doris_pub_{jobId}. - Reject slot_name / publication_name changes in ALTER JOB (immutable). - Validate PG identifiers (lowercase, <=63 chars) at CREATE JOB time. - Move fail-fast resource validation (user-provided slot/pub existence, publication coverage, auto slot active-conflict) from cdc client to FE via StreamingJobUtils.validateSourceResources(). Covers both StreamingInsertJob and cdc_stream TVF paths; runs once on create, not on each BE restart. - Regression tests for publication ownership matrix, restart-FE safety, TVF validation cases, and PG user privilege requirements. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../doris/job/cdc/DataSourceConfigKeys.java | 14 + .../streaming/DataSourceConfigValidator.java | 17 +- .../streaming/PostgresResourceValidator.java | 172 ++++++++ .../insert/streaming/StreamingInsertJob.java | 20 +- .../doris/job/util/StreamingJobUtils.java | 15 + .../trees/plans/commands/AlterJobCommand.java | 23 +- .../CdcStreamTableValuedFunction.java | 12 + .../DataSourceConfigValidatorTest.java | 145 ++++++ .../PostgresReplicationConnection.java | 18 +- .../reader/postgres/PostgresSourceReader.java | 88 +++- ...est_streaming_postgres_job_publication.out | 13 + .../test_streaming_postgres_job_priv.groovy | 4 +- ..._streaming_postgres_job_publication.groovy | 412 ++++++++++++++++++ .../tvf/test_cdc_stream_tvf_postgres.groovy | 76 ++++ 14 files changed, 999 insertions(+), 30 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/PostgresResourceValidator.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java create mode 100644 regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_publication.out create mode 100644 regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_publication.groovy diff --git a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java index d1efc0b4c88536..3e7eaabe2bfb09 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java +++ b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java @@ -41,6 +41,20 @@ public class DataSourceConfigKeys { public static final String SSL_MODE = "ssl_mode"; public static final String SSL_ROOTCERT = "ssl_rootcert"; + // PostgreSQL replication slot and publication config + public static final String SLOT_NAME = "slot_name"; + public static final String PUBLICATION_NAME = "publication_name"; + public static final String DEFAULT_SLOT_PREFIX = "doris_cdc_"; + public static final String DEFAULT_PUBLICATION_PREFIX = "doris_pub_"; + + public static String defaultSlotName(String jobId) { + return DEFAULT_SLOT_PREFIX + jobId; + } + + public static String defaultPublicationName(String jobId) { + return DEFAULT_PUBLICATION_PREFIX + jobId; + } + // per-table config: key format is "table.." public static final String TABLE = "table"; public static final String TABLE_EXCLUDE_COLUMNS_SUFFIX = "exclude_columns"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java index b75e202b1a842e..006c98a48d9171 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java @@ -24,8 +24,13 @@ import java.util.Map; import java.util.Set; +import java.util.regex.Pattern; public class DataSourceConfigValidator { + + // PostgreSQL unquoted identifier: lowercase letters, digits, underscores, not starting with a digit. + private static final Pattern PG_IDENTIFIER_PATTERN = Pattern.compile("^[a-z_][a-z0-9_]*$"); + private static final int PG_MAX_IDENTIFIER_LENGTH = 63; private static final Set ALLOW_SOURCE_KEYS = Sets.newHashSet( DataSourceConfigKeys.JDBC_URL, DataSourceConfigKeys.USER, @@ -40,7 +45,9 @@ public class DataSourceConfigValidator { DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE, DataSourceConfigKeys.SNAPSHOT_PARALLELISM, DataSourceConfigKeys.SSL_MODE, - DataSourceConfigKeys.SSL_ROOTCERT + DataSourceConfigKeys.SSL_ROOTCERT, + DataSourceConfigKeys.SLOT_NAME, + DataSourceConfigKeys.PUBLICATION_NAME ); // Known suffixes for per-table config keys (format: "table..") @@ -114,6 +121,14 @@ private static boolean isValidValue(String key, String value) { || value.equals(DataSourceConfigKeys.OFFSET_SNAPSHOT))) { return false; } + + // slot_name / publication_name are interpolated into PG DDL without quoting, + // so enforce unquoted-identifier grammar to prevent injection and runtime errors. + if (key.equals(DataSourceConfigKeys.SLOT_NAME) + || key.equals(DataSourceConfigKeys.PUBLICATION_NAME)) { + return value.length() <= PG_MAX_IDENTIFIER_LENGTH + && PG_IDENTIFIER_PATTERN.matcher(value).matches(); + } return true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/PostgresResourceValidator.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/PostgresResourceValidator.java new file mode 100644 index 00000000000000..d5ad3c42bae29a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/PostgresResourceValidator.java @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.extensions.insert.streaming; + +import org.apache.doris.datasource.jdbc.client.JdbcClient; +import org.apache.doris.job.cdc.DataSourceConfigKeys; +import org.apache.doris.job.common.DataSourceType; +import org.apache.doris.job.exception.JobException; +import org.apache.doris.job.util.StreamingJobUtils; + +import org.apache.commons.lang3.StringUtils; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Fail-fast validation of PostgreSQL replication slot and publication at CREATE JOB time, + * before the CDC client connects. Catches mistakes (user-provided slot/publication missing, + * conflicting jobId) with actionable errors. Validation runs only on initial create; restarts + * skip this path by design, so an active slot held by the previous BE does not self-conflict. + */ +public class PostgresResourceValidator { + + public static void validate(Map sourceProperties, String jobId, List tableNames) + throws JobException { + String slotName = resolveSlotName(sourceProperties, jobId); + String publicationName = resolvePublicationName(sourceProperties, jobId); + boolean slotUserProvided = isSlotUserProvided(sourceProperties); + boolean pubUserProvided = isPublicationUserProvided(sourceProperties); + String pgSchema = sourceProperties.get(DataSourceConfigKeys.SCHEMA); + List qualifiedTables = new ArrayList<>(); + for (String name : tableNames) { + qualifiedTables.add(pgSchema + "." + name); + } + + JdbcClient jdbcClient = StreamingJobUtils.getJdbcClient(DataSourceType.POSTGRES, sourceProperties); + try (Connection conn = jdbcClient.getConnection()) { + boolean pubExists = publicationExists(conn, publicationName); + if (!pubExists && pubUserProvided) { + throw new JobException( + "publication does not exist: " + publicationName + + ". Create it before starting the job or omit " + + DataSourceConfigKeys.PUBLICATION_NAME + + " to let Doris create one."); + } + if (pubExists) { + List missing = findMissingTables(conn, publicationName, qualifiedTables); + if (!missing.isEmpty()) { + if (pubUserProvided) { + throw new JobException( + "publication " + publicationName + + " is missing required tables: " + missing + + ". Add them via ALTER PUBLICATION ... ADD TABLE before starting."); + } else { + throw new JobException( + "publication " + publicationName + + " already exists but does not cover the configured" + + " include_tables (missing: " + missing + + "). Another Doris cluster may be using the same jobId." + + " Please set " + DataSourceConfigKeys.PUBLICATION_NAME + + " explicitly to avoid the conflict."); + } + } + } + Boolean slotActive = queryReplicationSlotActive(conn, slotName); + if (slotUserProvided && slotActive == null) { + throw new JobException( + "replication slot does not exist: " + slotName + + ". Create it before starting the job or omit " + + DataSourceConfigKeys.SLOT_NAME + + " to let Doris create one."); + } + if (!slotUserProvided && Boolean.TRUE.equals(slotActive)) { + throw new JobException( + "replication slot " + slotName + + " is active, held by another consumer. Another Doris" + + " cluster may be using the same jobId. Please set " + + DataSourceConfigKeys.SLOT_NAME + + " explicitly to avoid the conflict."); + } + } catch (JobException e) { + throw e; + } catch (Exception e) { + throw new JobException( + "Failed to validate PG resources for publication " + publicationName + + ": " + e.getMessage(), e); + } + } + + private static String resolveSlotName(Map config, String jobId) { + String name = config.get(DataSourceConfigKeys.SLOT_NAME); + return StringUtils.isNotBlank(name) ? name : DataSourceConfigKeys.defaultSlotName(jobId); + } + + private static String resolvePublicationName(Map config, String jobId) { + String name = config.get(DataSourceConfigKeys.PUBLICATION_NAME); + return StringUtils.isNotBlank(name) ? name : DataSourceConfigKeys.defaultPublicationName(jobId); + } + + private static boolean isSlotUserProvided(Map config) { + return StringUtils.isNotBlank(config.get(DataSourceConfigKeys.SLOT_NAME)); + } + + private static boolean isPublicationUserProvided(Map config) { + return StringUtils.isNotBlank(config.get(DataSourceConfigKeys.PUBLICATION_NAME)); + } + + private static boolean publicationExists(Connection conn, String publicationName) throws Exception { + try (PreparedStatement ps = conn.prepareStatement("SELECT 1 FROM pg_publication WHERE pubname = ?")) { + ps.setString(1, publicationName); + try (ResultSet rs = ps.executeQuery()) { + return rs.next(); + } + } + } + + private static List findMissingTables(Connection conn, String publicationName, List tables) + throws Exception { + Set covered = new HashSet<>(); + try (PreparedStatement ps = conn.prepareStatement( + "SELECT schemaname, tablename FROM pg_publication_tables WHERE pubname = ?")) { + ps.setString(1, publicationName); + try (ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + covered.add(rs.getString(1) + "." + rs.getString(2)); + } + } + } + List missing = new ArrayList<>(); + for (String table : tables) { + if (!covered.contains(table)) { + missing.add(table); + } + } + return missing; + } + + /** Returns the slot's active flag, or null when the slot does not exist. */ + private static Boolean queryReplicationSlotActive(Connection conn, String slotName) throws Exception { + try (PreparedStatement ps = conn.prepareStatement( + "SELECT active FROM pg_replication_slots WHERE slot_name = ?")) { + ps.setString(1, slotName); + try (ResultSet rs = ps.executeQuery()) { + if (!rs.next()) { + return null; + } + return rs.getBoolean(1); + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index d7a325e22f541e..24efbfbda1a414 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -230,6 +230,8 @@ private void initSourceJob() { String includeTables = String.join(",", createTbls); sourceProperties.put(DataSourceConfigKeys.INCLUDE_TABLES, includeTables); } + StreamingJobUtils.validateSourceResources( + dataSourceType, sourceProperties, String.valueOf(getJobId()), createTbls); this.offsetProvider = new JdbcSourceOffsetProvider(getJobId(), dataSourceType, StreamingJobUtils.convertCertFile(getDbId(), sourceProperties)); JdbcSourceOffsetProvider rdsOffsetProvider = (JdbcSourceOffsetProvider) this.offsetProvider; @@ -879,7 +881,8 @@ private String getShowSQL() { StringBuilder sb = new StringBuilder(); sb.append("FROM ").append(dataSourceType.name()); sb.append("("); - for (Map.Entry entry : sourceProperties.entrySet()) { + Map displaySourceProps = buildDisplaySourceProperties(); + for (Map.Entry entry : displaySourceProps.entrySet()) { if (entry.getKey().equalsIgnoreCase("password")) { continue; } @@ -984,6 +987,21 @@ private static boolean checkHasSourceJobPriv(ConnectContext ctx, String targetDb return true; } + // PG jobs don't persist default slot/publication names; surface them here so SHOW reflects + // the values the cdc client actually uses. + private Map buildDisplaySourceProperties() { + if (dataSourceType != DataSourceType.POSTGRES) { + return sourceProperties; + } + Map display = new LinkedHashMap<>(sourceProperties); + String jobIdStr = String.valueOf(getJobId()); + display.putIfAbsent(DataSourceConfigKeys.SLOT_NAME, + DataSourceConfigKeys.defaultSlotName(jobIdStr)); + display.putIfAbsent(DataSourceConfigKeys.PUBLICATION_NAME, + DataSourceConfigKeys.defaultPublicationName(jobIdStr)); + return display; + } + private String generateEncryptedSql() { makeConnectContext(); TreeMap, String> indexInSqlToString = new TreeMap<>(new Pair.PairComparator<>()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java index 46a47036ccd655..a349f51dbae849 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java @@ -36,6 +36,7 @@ import org.apache.doris.job.cdc.split.SnapshotSplit; import org.apache.doris.job.common.DataSourceType; import org.apache.doris.job.exception.JobException; +import org.apache.doris.job.extensions.insert.streaming.PostgresResourceValidator; import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob; import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand; import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition; @@ -448,6 +449,20 @@ public static List getColumns(JdbcClient jdbcClient, * The remoteDB implementation differs for each data source; * refer to the hierarchical mapping in the JDBC catalog. */ + /** + * Validate source-side resources (e.g. PG replication slot and publication) at CREATE JOB time + * so users get an actionable error before the CDC client connects. No-op for sources that + * have no such concept. + */ + public static void validateSourceResources(DataSourceType sourceType, + Map properties, + String jobId, + List tables) throws JobException { + if (sourceType == DataSourceType.POSTGRES) { + PostgresResourceValidator.validate(properties, jobId, tables); + } + } + public static String getRemoteDbName(DataSourceType sourceType, Map properties) { String remoteDb = null; switch (sourceType) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java index fc63710f93ddf1..089a00f354c839 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java @@ -212,6 +212,22 @@ private void checkUnmodifiableSourceProperties(Map originSourceP sourceProperties.get(DataSourceConfigKeys.EXCLUDE_TABLES)), "The exclude_tables property cannot be modified in ALTER JOB"); } + + // slot_name / publication_name decide Doris-vs-user ownership at create time; flipping + // them afterwards would orphan Doris-created resources or let Doris drop user-owned ones. + if (sourceProperties.containsKey(DataSourceConfigKeys.SLOT_NAME)) { + Preconditions.checkArgument(Objects.equals( + originSourceProperties.get(DataSourceConfigKeys.SLOT_NAME), + sourceProperties.get(DataSourceConfigKeys.SLOT_NAME)), + "The slot_name property cannot be modified in ALTER JOB"); + } + + if (sourceProperties.containsKey(DataSourceConfigKeys.PUBLICATION_NAME)) { + Preconditions.checkArgument(Objects.equals( + originSourceProperties.get(DataSourceConfigKeys.PUBLICATION_NAME), + sourceProperties.get(DataSourceConfigKeys.PUBLICATION_NAME)), + "The publication_name property cannot be modified in ALTER JOB"); + } } private void validateProps(StreamingInsertJob streamingJob) throws AnalysisException { @@ -247,14 +263,17 @@ private void checkUnmodifiableProperties(String originExecuteSql) throws Analysi "The uri property cannot be modified in ALTER JOB"); break; case "cdc_stream": - // type, jdbc_url, database, schema, and table identify the source and cannot be changed. + // type, jdbc_url, database, schema, table identify the source and cannot be changed. + // slot_name / publication_name are fixed at create time to keep ownership stable. // user, password, driver_url, driver_class, etc. are modifiable (credential rotation). for (String unmodifiable : new String[] { DataSourceConfigKeys.TYPE, DataSourceConfigKeys.JDBC_URL, DataSourceConfigKeys.DATABASE, DataSourceConfigKeys.SCHEMA, - DataSourceConfigKeys.TABLE}) { + DataSourceConfigKeys.TABLE, + DataSourceConfigKeys.SLOT_NAME, + DataSourceConfigKeys.PUBLICATION_NAME}) { Preconditions.checkArgument( Objects.equals( originTvf.getProperties().getMap().get(unmodifiable), diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java index ce91f8a7fe70fc..0bb8994060233f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java @@ -25,6 +25,7 @@ import org.apache.doris.job.cdc.DataSourceConfigKeys; import org.apache.doris.job.cdc.request.FetchRecordRequest; import org.apache.doris.job.common.DataSourceType; +import org.apache.doris.job.exception.JobException; import org.apache.doris.job.util.StreamingJobUtils; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TFileType; @@ -36,6 +37,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -110,6 +112,16 @@ private void validate(Map properties) throws AnalysisException { if (!properties.containsKey(DataSourceConfigKeys.OFFSET)) { throw new AnalysisException("offset is required"); } + DataSourceType sourceType = DataSourceType.valueOf( + properties.get(DataSourceConfigKeys.TYPE).toUpperCase()); + String jobId = properties.getOrDefault(JOB_ID_KEY, + UUID.randomUUID().toString().replace("-", "")); + List tables = Collections.singletonList(properties.get(DataSourceConfigKeys.TABLE)); + try { + StreamingJobUtils.validateSourceResources(sourceType, properties, jobId, tables); + } catch (JobException e) { + throw new AnalysisException(e.getMessage(), e); + } } private void generateFileStatus() { diff --git a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java new file mode 100644 index 00000000000000..50fd2ece020edd --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.extensions.insert.streaming; + +import org.apache.doris.job.cdc.DataSourceConfigKeys; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class DataSourceConfigValidatorTest { + + private static final int PG_MAX_IDENTIFIER_LENGTH = 63; + + @Test + public void testSlotNameAndPublicationNameAllowed() { + Map props = new HashMap<>(); + props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:postgresql://localhost:5432/db"); + props.put(DataSourceConfigKeys.SLOT_NAME, "my_custom_slot"); + props.put(DataSourceConfigKeys.PUBLICATION_NAME, "my_custom_pub"); + // Should not throw + DataSourceConfigValidator.validateSource(props); + } + + @Test + public void testSlotNameAndPublicationNameNotRequired() { + Map props = new HashMap<>(); + props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:postgresql://localhost:5432/db"); + // Should not throw without slot_name and publication_name + DataSourceConfigValidator.validateSource(props); + } + + @Test + public void testDefaultSlotNameFormat() { + String slotName = DataSourceConfigKeys.defaultSlotName("12345"); + Assert.assertEquals("doris_cdc_12345", slotName); + Assert.assertTrue(slotName.length() <= PG_MAX_IDENTIFIER_LENGTH); + } + + @Test + public void testDefaultPublicationNameFormat() { + String pubName = DataSourceConfigKeys.defaultPublicationName("12345"); + Assert.assertEquals("doris_pub_12345", pubName); + Assert.assertTrue(pubName.length() <= PG_MAX_IDENTIFIER_LENGTH); + } + + @Test + public void testDefaultNamesWithLargeJobId() { + String maxJobId = String.valueOf(Long.MAX_VALUE); + String slotName = DataSourceConfigKeys.defaultSlotName(maxJobId); + String pubName = DataSourceConfigKeys.defaultPublicationName(maxJobId); + Assert.assertTrue("Slot name should not exceed PG limit, actual: " + slotName.length(), + slotName.length() <= PG_MAX_IDENTIFIER_LENGTH); + Assert.assertTrue("Publication name should not exceed PG limit, actual: " + pubName.length(), + pubName.length() <= PG_MAX_IDENTIFIER_LENGTH); + } + + @Test + public void testSlotNameRejectsInvalidPgIdentifiers() { + String[] invalids = { + "MyPub", // uppercase + "pub-name", // hyphen + "pub name", // whitespace + "pub'name", // single quote + "pub\"name", // double quote + "pub;drop", // semicolon (SQL injection attempt) + "1pub", // starts with digit + "", // empty + "pub.name" // dot + }; + for (String invalid : invalids) { + Map props = new HashMap<>(); + props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:postgresql://localhost:5432/db"); + props.put(DataSourceConfigKeys.SLOT_NAME, invalid); + try { + DataSourceConfigValidator.validateSource(props); + Assert.fail("Expected IllegalArgumentException for slot_name='" + invalid + "'"); + } catch (IllegalArgumentException expected) { + // ok + } + } + } + + @Test + public void testPublicationNameRejectsInvalidPgIdentifiers() { + String[] invalids = {"MyPub", "pub-name", "pub'x", "pub\"x", "1pub", "pub name"}; + for (String invalid : invalids) { + Map props = new HashMap<>(); + props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:postgresql://localhost:5432/db"); + props.put(DataSourceConfigKeys.PUBLICATION_NAME, invalid); + try { + DataSourceConfigValidator.validateSource(props); + Assert.fail("Expected IllegalArgumentException for publication_name='" + invalid + "'"); + } catch (IllegalArgumentException expected) { + // ok + } + } + } + + @Test + public void testSlotNameRejectsOverlongIdentifier() { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i <= PG_MAX_IDENTIFIER_LENGTH; i++) { + sb.append('a'); + } + Map props = new HashMap<>(); + props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:postgresql://localhost:5432/db"); + props.put(DataSourceConfigKeys.SLOT_NAME, sb.toString()); + try { + DataSourceConfigValidator.validateSource(props); + Assert.fail("Expected IllegalArgumentException for slot_name exceeding " + + PG_MAX_IDENTIFIER_LENGTH + " chars"); + } catch (IllegalArgumentException expected) { + // ok + } + } + + @Test + public void testSlotNameAcceptsValidPgIdentifiers() { + String[] valids = {"my_slot", "_slot", "slot1", "a", "slot_with_digits_123"}; + for (String valid : valids) { + Map props = new HashMap<>(); + props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:postgresql://localhost:5432/db"); + props.put(DataSourceConfigKeys.SLOT_NAME, valid); + DataSourceConfigValidator.validateSource(props); + } + } +} diff --git a/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java index ac372bbb8cb24e..eeee1e725efce6 100644 --- a/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java +++ b/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -51,7 +51,7 @@ /** * Copied from Flink Cdc 3.5.0 * - *

Line 192~199: add publish_via_partition_root for partition table. + *

Line 248~251, 258: add publish_via_partition_root for partition table (FILTERED mode). */ public class PostgresReplicationConnection extends JdbcConnection implements ReplicationConnection { @@ -189,12 +189,8 @@ protected void initPublication() { throw new ConnectException( "Publication autocreation is disabled, please create one and restart the connector."); case ALL_TABLES: - boolean supportPartitionRoot = ((BaseConnection) conn).haveMinimumServerVersion(ServerVersion.v13); - createPublicationStmt = supportPartitionRoot - ? String.format( - "CREATE PUBLICATION %s FOR ALL TABLES WITH (publish_via_partition_root = true);", - publicationName) - : String.format( + createPublicationStmt = + String.format( "CREATE PUBLICATION %s FOR ALL TABLES;", publicationName); LOGGER.info( @@ -248,14 +244,18 @@ private void createOrUpdatePublicationModeFilterted( "No table filters found for filtered publication %s", publicationName)); } + boolean supportPartitionRoot = !isUpdate + && ((BaseConnection) pgConnection()).haveMinimumServerVersion(ServerVersion.v13); + String pubViaRootSuffix = supportPartitionRoot + ? " WITH (publish_via_partition_root = true)" : ""; createOrUpdatePublicationStmt = isUpdate ? String.format( "ALTER PUBLICATION %s SET TABLE %s;", publicationName, tableFilterString) : String.format( - "CREATE PUBLICATION %s FOR TABLE %s;", - publicationName, tableFilterString); + "CREATE PUBLICATION %s FOR TABLE %s%s;", + publicationName, tableFilterString, pubViaRootSuffix); LOGGER.info( isUpdate ? "Updating Publication with statement '{}'" diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java index b6d28510613c94..c0d333996c7e67 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java @@ -63,6 +63,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.connector.postgresql.PostgresConnectorConfig.AutoCreateMode; import io.debezium.connector.postgresql.PostgresOffsetContext; import io.debezium.connector.postgresql.SourceInfo; import io.debezium.connector.postgresql.connection.PostgresConnection; @@ -93,9 +95,12 @@ public PostgresSourceReader() { public void initialize(String jobId, DataSource dataSource, Map config) { PostgresSourceConfig sourceConfig = generatePostgresConfig(config, jobId, 0); PostgresDialect dialect = new PostgresDialect(sourceConfig); - synchronized (SLOT_CREATION_LOCK) { - LOG.info("Creating slot for job {}, user {}", jobId, sourceConfig.getUsername()); - createSlotForGlobalStreamSplit(dialect); + // Slot is caller-owned when slot_name is set; Doris neither creates nor drops it. + if (!isSlotUserProvided(config)) { + synchronized (SLOT_CREATION_LOCK) { + LOG.info("Creating slot for job {}, user {}", jobId, sourceConfig.getUsername()); + createSlotForGlobalStreamSplit(dialect); + } } super.initialize(jobId, dataSource, config); // Inject PG schema refresher so the deserializer can fetch accurate column types on DDL @@ -227,6 +232,19 @@ private PostgresSourceConfig generatePostgresConfig( Properties dbzProps = ConfigUtil.getDefaultDebeziumProps(); dbzProps.put("interval.handling.mode", "string"); + + // Publication-only ownership: user-provided = DISABLED, otherwise FILTERED. + String publicationName = resolvePublicationName(cdcConfig, jobId); + String slotName = resolveSlotName(cdcConfig, jobId); + AutoCreateMode autocreateMode = + isPublicationUserProvided(cdcConfig) + ? AutoCreateMode.DISABLED + : AutoCreateMode.FILTERED; + dbzProps.put(PostgresConnectorConfig.PUBLICATION_NAME.name(), publicationName); + dbzProps.put( + PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE.name(), + autocreateMode.getValue()); + configFactory.debeziumProperties(dbzProps); // setting ssl @@ -243,7 +261,7 @@ private PostgresSourceConfig generatePostgresConfig( configFactory.serverTimeZone( ConfigUtil.getPostgresServerTimeZoneFromProps(props).toString()); - configFactory.slotName(getSlotName(jobId)); + configFactory.slotName(slotName); configFactory.decodingPluginName("pgoutput"); configFactory.heartbeatInterval( Duration.ofMillis(Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS)); @@ -255,8 +273,26 @@ private PostgresSourceConfig generatePostgresConfig( return configFactory.create(subtaskId); } - private String getSlotName(String jobId) { - return "doris_cdc_" + jobId; + private String resolveSlotName(Map config, String jobId) { + String name = config.get(DataSourceConfigKeys.SLOT_NAME); + return StringUtils.isNotBlank(name) ? name : DataSourceConfigKeys.defaultSlotName(jobId); + } + + private String resolvePublicationName(Map config, String jobId) { + String name = config.get(DataSourceConfigKeys.PUBLICATION_NAME); + return StringUtils.isNotBlank(name) + ? name + : DataSourceConfigKeys.defaultPublicationName(jobId); + } + + // Per-resource ownership: each name is independent. A set name means the caller owns that + // resource - Doris will neither create nor drop it. + private static boolean isSlotUserProvided(Map config) { + return StringUtils.isNotBlank(config.get(DataSourceConfigKeys.SLOT_NAME)); + } + + private static boolean isPublicationUserProvided(Map config) { + return StringUtils.isNotBlank(config.get(DataSourceConfigKeys.PUBLICATION_NAME)); } @Override @@ -453,21 +489,41 @@ public Map extractBinlogStateOffset(Object splitState) { @Override public void close(JobBaseConfig jobConfig) { super.close(jobConfig); - // drop pg slot + Map config = jobConfig.getConfig(); + String jobId = jobConfig.getJobId(); + String slotName = resolveSlotName(config, jobId); + String pubName = resolvePublicationName(config, jobId); + boolean dropSlot = !isSlotUserProvided(config); + boolean dropPub = !isPublicationUserProvided(config); + if (!dropSlot && !dropPub) { + LOG.info( + "Skipping drop of user-provided slot {} / publication {} for job {}", + slotName, + pubName, + jobId); + return; + } try { PostgresSourceConfig sourceConfig = getSourceConfig(jobConfig); PostgresDialect dialect = new PostgresDialect(sourceConfig); - String slotName = getSlotName(jobConfig.getJobId()); - LOG.info( - "Dropping postgres replication slot {} for job {}", - slotName, - jobConfig.getJobId()); - dialect.removeSlot(slotName); + if (dropSlot) { + LOG.info("Dropping auto-created replication slot {} for job {}", slotName, jobId); + dialect.removeSlot(slotName); + } else { + LOG.info("Skipping drop of user-provided slot {} for job {}", slotName, jobId); + } + if (dropPub) { + LOG.info("Dropping auto-created publication {} for job {}", pubName, jobId); + try (PostgresConnection connection = dialect.openJdbcConnection()) { + connection.execute("DROP PUBLICATION IF EXISTS " + pubName); + } + } else { + LOG.info( + "Skipping drop of user-provided publication {} for job {}", pubName, jobId); + } } catch (Exception ex) { LOG.warn( - "Failed to drop postgres replication slot for job {}: {}", - jobConfig.getJobId(), - ex.getMessage()); + "Failed to clean up postgres resources for job {}: {}", jobId, ex.getMessage()); } } } diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_publication.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_publication.out new file mode 100644 index 00000000000000..5071356eb0f6fa --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_publication.out @@ -0,0 +1,13 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_pub_snapshot_table1 -- +1 Alice + +-- !select_pub_snapshot_table2 -- +1 Bob + +-- !select_user_pub_table1 -- +1 Alice + +-- !select_user_pub_table2 -- +1 Bob + diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy index 5747438b717cb0..8309bbe204715f 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy @@ -90,9 +90,11 @@ suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,extern exception "Failed to init source reader" } - // grant replication to user + // grant replication + CREATE on database + table ownership (needed to auto-create publication) connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { sql """ALTER ROLE ${newPgUser} WITH REPLICATION""" + sql """GRANT CREATE ON DATABASE ${pgDB} TO ${newPgUser}""" + sql """ALTER TABLE ${pgDB}.${pgSchema}.${tableName} OWNER TO ${newPgUser}""" } diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_publication.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_publication.groovy new file mode 100644 index 00000000000000..1d29b659a60751 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_publication.groovy @@ -0,0 +1,412 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +// Verify per-resource ownership of PG replication slot and publication: +// Test 1: auto-generated slot & publication — created on job start, cleaned up on drop +// Test 2: user-provided slot & publication — Doris uses but never drops them +// Test 3: mixed (user publication + auto slot) — only auto slot is dropped on job deletion +// Test 4: slot_name / publication_name are immutable via ALTER JOB +suite("test_streaming_postgres_job_publication", "p0,external,pg,external_docker,external_docker_pg,nondatalake") { + def jobName = "test_pg_pub_job" + def currentDb = (sql "select database()")[0][0] + def table1 = "pub_test_table1" + def table2 = "pub_test_table2" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + sql """drop table if exists ${currentDb}.${table2} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + // Setup PG tables + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table2}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} ( + "id" int PRIMARY KEY, + "name" varchar(200) + )""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${table2} ( + "id" int PRIMARY KEY, + "name" varchar(200) + )""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, name) VALUES (1, 'Alice');""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} (id, name) VALUES (1, 'Bob');""" + } + + // ========== Test 1: auto-generated slot_name and publication_name ========== + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1},${table2}", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + // Verify SHOW contains the expected auto-generated slot_name and publication_name + def jobRow = sql """select Id, ExecuteSql from jobs("type"="insert") where Name='${jobName}'""" + def jobId = jobRow.get(0).get(0).toString() + def executeSql = jobRow.get(0).get(1).toString() + log.info("jobId: ${jobId}, ExecuteSql: ${executeSql}") + def expectedSlot = "doris_cdc_${jobId}" + def expectedPub = "doris_pub_${jobId}" + assert executeSql.contains(expectedSlot) : "ExecuteSql should contain slot_name ${expectedSlot}" + assert executeSql.contains(expectedPub) : "ExecuteSql should contain publication_name ${expectedPub}" + + // Wait for job to start running and create slot/publication on PG + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(1, SECONDS).until( + { + def jobSuccendCount = sql """ select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ + log.info("jobSuccendCount: " + jobSuccendCount) + if (jobSuccendCount.size() != 1) { + return false + } + def succeedTaskCount = jobSuccendCount.get(0).get(0).toString().toLong() + succeedTaskCount >= 2L + } + ) + } catch (Exception ex){ + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex; + } + + // Verify on PG side: publication is NOT "FOR ALL TABLES" + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + def pubResult = sql """SELECT pubname, puballtables FROM pg_publication WHERE pubname = '${expectedPub}'""" + log.info("pg_publication: " + pubResult) + assert pubResult.size() == 1 : "Publication should exist" + assert pubResult[0][1] == false : "Publication should NOT be FOR ALL TABLES" + + // Verify publication contains only the specified tables + def pubTables = sql """SELECT tablename FROM pg_publication_tables WHERE pubname = '${expectedPub}' AND schemaname = '${pgSchema}' ORDER BY tablename""" + log.info("pg_publication_tables: " + pubTables) + assert pubTables.size() == 2 : "Publication should contain exactly 2 tables" + def tableNames = pubTables.collect { it[0] } + assert tableNames.contains(table1) : "Publication should contain ${table1}" + assert tableNames.contains(table2) : "Publication should contain ${table2}" + + // Verify replication slot exists + def slotResult = sql """SELECT slot_name, active FROM pg_replication_slots WHERE slot_name = '${expectedSlot}'""" + log.info("pg_replication_slots: " + slotResult) + assert slotResult.size() == 1 : "Replication slot should exist" + } + + // Check snapshot data synced correctly + qt_select_pub_snapshot_table1 """ SELECT * FROM ${table1} order by id asc """ + qt_select_pub_snapshot_table2 """ SELECT * FROM ${table2} order by id asc """ + + // Drop job and verify PG resources are cleaned up + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + + // Poll until cleanup completes - avoids flakiness on slower environments + Awaitility.await() + .atMost(60, SECONDS) + .pollInterval(1, SECONDS) + .untilAsserted { + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + def pubAfterDrop = sql """SELECT COUNT(1) FROM pg_publication WHERE pubname = '${expectedPub}'""" + log.info("pg_publication after drop: " + pubAfterDrop) + assert pubAfterDrop[0][0] == 0 : "Publication should be dropped after job deletion" + + def slotAfterDrop = sql """SELECT COUNT(1) FROM pg_replication_slots WHERE slot_name = '${expectedSlot}'""" + log.info("pg_replication_slots after drop: " + slotAfterDrop) + assert slotAfterDrop[0][0] == 0 : "Replication slot should be dropped after job deletion" + } + } + + // ========== Test 2: user-provided slot_name and publication_name ========== + def userSlot = "test_user_slot" + def userPub = "test_user_pub" + + sql """drop table if exists ${currentDb}.${table1} force""" + sql """drop table if exists ${currentDb}.${table2} force""" + + // Pre-create publication AND slot; with user-provided names Doris manages neither. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table2}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} ( + "id" int PRIMARY KEY, + "name" varchar(200) + )""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${table2} ( + "id" int PRIMARY KEY, + "name" varchar(200) + )""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, name) VALUES (1, 'Alice');""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} (id, name) VALUES (1, 'Bob');""" + sql """DROP PUBLICATION IF EXISTS ${userPub}""" + sql """CREATE PUBLICATION ${userPub} FOR TABLE ${pgDB}.${pgSchema}.${table1}, ${pgDB}.${pgSchema}.${table2}""" + def existing = sql """SELECT COUNT(1) FROM pg_replication_slots WHERE slot_name = '${userSlot}'""" + if (existing[0][0] != 0) { + sql """SELECT pg_drop_replication_slot('${userSlot}')""" + } + sql """SELECT pg_create_logical_replication_slot('${userSlot}', 'pgoutput')""" + } + + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1},${table2}", + "slot_name" = "${userSlot}", + "publication_name" = "${userPub}", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(1, SECONDS).until( + { + def jobSuccendCount = sql """ select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ + if (jobSuccendCount.size() != 1) { + return false + } + def succeedTaskCount = jobSuccendCount.get(0).get(0).toString().toLong() + succeedTaskCount >= 2L + } + ) + } catch (Exception ex){ + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + log.info("show job: " + showjob) + throw ex; + } + + qt_select_user_pub_table1 """ SELECT * FROM ${table1} order by id asc """ + qt_select_user_pub_table2 """ SELECT * FROM ${table2} order by id asc """ + + // Drop job: Doris must NOT touch user-provided slot or publication. + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + + sleep(5000) + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + def pubAfter = sql """SELECT COUNT(1) FROM pg_publication WHERE pubname = '${userPub}'""" + assert pubAfter[0][0] == 1 : "User-provided publication must be preserved after job deletion" + def slotAfter = sql """SELECT COUNT(1) FROM pg_replication_slots WHERE slot_name = '${userSlot}'""" + assert slotAfter[0][0] == 1 : "User-provided slot must be preserved after job deletion" + sql """SELECT pg_drop_replication_slot('${userSlot}')""" + sql """DROP PUBLICATION IF EXISTS ${userPub}""" + } + + // ========== Test 3: only publication_name provided (slot is Doris-managed) ========== + def mixedJob = "test_pg_pub_job_mixed" + def mixedPub = "test_mixed_pub" + sql """DROP JOB IF EXISTS where jobname = '${mixedJob}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + sql """drop table if exists ${currentDb}.${table2} force""" + + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table2}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} ( + "id" int PRIMARY KEY, + "name" varchar(200) + )""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${table2} ( + "id" int PRIMARY KEY, + "name" varchar(200) + )""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, name) VALUES (1, 'Alice');""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} (id, name) VALUES (1, 'Bob');""" + sql """DROP PUBLICATION IF EXISTS ${mixedPub}""" + sql """CREATE PUBLICATION ${mixedPub} FOR TABLE ${pgDB}.${pgSchema}.${table1}, ${pgDB}.${pgSchema}.${table2}""" + } + + sql """CREATE JOB ${mixedJob} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1},${table2}", + "publication_name" = "${mixedPub}", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(1, SECONDS).until( + { + def jobSuccendCount = sql """ select SucceedTaskCount from jobs("type"="insert") where Name = '${mixedJob}' and ExecuteType='STREAMING' """ + if (jobSuccendCount.size() != 1) { + return false + } + jobSuccendCount.get(0).get(0).toString().toLong() >= 2L + } + ) + } catch (Exception ex){ + def showjob = sql """select * from jobs("type"="insert") where Name='${mixedJob}'""" + log.info("show job: " + showjob) + throw ex; + } + + def mixedJobRow = sql """select Id, ExecuteSql from jobs("type"="insert") where Name='${mixedJob}'""" + def mixedJobId = mixedJobRow.get(0).get(0).toString() + def mixedExecuteSql = mixedJobRow.get(0).get(1).toString() + def mixedExpectedSlot = "doris_cdc_${mixedJobId}" + def mixedExpectedPub = "doris_pub_${mixedJobId}" + log.info("Mixed job id: ${mixedJobId}, auto slot: ${mixedExpectedSlot}") + assert mixedExecuteSql.contains(mixedExpectedSlot) : "ExecuteSql should contain slot_name ${mixedExpectedSlot}" + assert mixedExecuteSql.contains(mixedPub) : "ExecuteSql should contain user-provided publication_name ${mixedPub}" + + // Drop job: Doris drops the auto-managed slot but preserves the user-provided publication. + sql """DROP JOB IF EXISTS where jobname = '${mixedJob}'""" + + Awaitility.await().atMost(60, SECONDS).pollInterval(1, SECONDS).untilAsserted { + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + def slotCount = sql """SELECT COUNT(1) FROM pg_replication_slots WHERE slot_name = '${mixedExpectedSlot}'""" + assert slotCount[0][0] == 0 : "Doris-managed slot ${mixedExpectedSlot} should be dropped" + } + } + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + def pubAfter = sql """SELECT COUNT(1) FROM pg_publication WHERE pubname = '${mixedPub}'""" + assert pubAfter[0][0] == 1 : "User-provided publication must be preserved" + sql """DROP PUBLICATION IF EXISTS ${mixedPub}""" + } + + // ========== Test 4: slot_name / publication_name are immutable via ALTER JOB ========== + def alterJob = "test_pg_pub_alter_immutable" + sql """DROP JOB IF EXISTS where jobname = '${alterJob}'""" + sql """CREATE JOB ${alterJob} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1},${table2}", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + sql """PAUSE JOB where jobname = '${alterJob}'""" + Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until( + { + def status = sql """select status from jobs("type"="insert") where Name='${alterJob}'""" + status.size() == 1 && status.get(0).get(0).toString() == "PAUSED" + } + ) + + // Reject ALTER that introduces a new slot_name + test { + sql """ALTER JOB ${alterJob} + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1},${table2}", + "slot_name" = "some_other_slot", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + exception "The slot_name property cannot be modified in ALTER JOB" + } + + // Reject ALTER that introduces a new publication_name + test { + sql """ALTER JOB ${alterJob} + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1},${table2}", + "publication_name" = "some_other_pub", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + exception "The publication_name property cannot be modified in ALTER JOB" + } + + sql """DROP JOB IF EXISTS where jobname = '${alterJob}'""" + + // Cleanup PG tables + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table2}""" + } + sql """drop table if exists ${currentDb}.${table1} force""" + sql """drop table if exists ${currentDb}.${table2} force""" + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy index f0cf2feb3733bd..498251a9a99362 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy @@ -61,6 +61,82 @@ suite("test_cdc_stream_tvf_postgres", "p0,external,pg,external_docker,external_d exception "Unsupported offset: initial" } + // Fail fast when a user-provided slot does not exist on PG. + test { + sql """ + select * from cdc_stream( + "type" = "postgres", + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "table" = "${table1}", + "slot_name" = "tvf_missing_slot_xyz", + "offset" = 'latest') + """ + exception "replication slot does not exist" + } + + // Fail fast when a user-provided publication does not exist on PG. + test { + sql """ + select * from cdc_stream( + "type" = "postgres", + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "table" = "${table1}", + "publication_name" = "tvf_missing_pub_xyz", + "offset" = 'latest') + """ + exception "publication does not exist" + } + + // Fail fast when a user-provided publication exists but does not cover the target table. + def otherTable = "user_info_pg_normal2_tvf" + def partialPub = "tvf_partial_pub" + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${otherTable}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${otherTable} ( + "name" varchar(200), + "age" int2, + PRIMARY KEY ("name") + )""" + sql """DROP PUBLICATION IF EXISTS ${partialPub}""" + sql """CREATE PUBLICATION ${partialPub} FOR TABLE ${pgDB}.${pgSchema}.${otherTable}""" + } + try { + test { + sql """ + select * from cdc_stream( + "type" = "postgres", + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "table" = "${table1}", + "publication_name" = "${partialPub}", + "offset" = 'latest') + """ + exception "is missing required tables" + } + } finally { + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP PUBLICATION IF EXISTS ${partialPub}""" + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${otherTable}""" + } + } + // Here, because PG consumption requires creating a slot first, // we only verify whether the execution can be successful. def result = sql """ From 8c54460b9b6a624fa144b975e75eb850fa17265e Mon Sep 17 00:00:00 2001 From: wudi Date: Fri, 17 Apr 2026 19:23:09 +0800 Subject: [PATCH 2/3] [Improve](streaming-job) scope PG resource validation to job-wrapped cdc_stream TVF MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move PG slot/publication validation out of CdcStreamTableValuedFunction into StreamingInsertJob.initInsertJob so standalone cdc_stream TVF queries (no enclosing job) no longer run ownership checks — the job is the unit that owns a slot/publication, not the TVF. Populate default slot/publication names in two places: - JdbcTvfSourceOffsetProvider.ensureInitialized so sourceProperties carries the resolved names; cleanMeta -> /api/close uses sourceProperties and without this step cdcclient fell back to dbz_publication on close and skipped dropping the Doris-managed publication on DROP JOB. - JdbcTvfSourceOffsetProvider.rewriteTvfParams so each task's TVF props carry the resolved names for cdcclient ownership logic at create time. Add test_cdc_stream_tvf_publication covering auto / user-provided / mixed ownership on the cdc_stream TVF path. Prune the three standalone-TVF fail-fast cases from test_cdc_stream_tvf_postgres since standalone TVF no longer validates PG resources. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../doris/job/cdc/DataSourceConfigKeys.java | 3 + .../streaming/PostgresResourceValidator.java | 17 +- .../insert/streaming/StreamingInsertJob.java | 23 +- .../jdbc/JdbcTvfSourceOffsetProvider.java | 18 +- .../doris/job/util/StreamingJobUtils.java | 67 ++++- .../CdcStreamTableValuedFunction.java | 25 +- .../reader/postgres/PostgresSourceReader.java | 39 +-- .../tvf/test_cdc_stream_tvf_publication.out | 7 + .../tvf/test_cdc_stream_tvf_postgres.groovy | 79 +----- .../test_cdc_stream_tvf_publication.groovy | 262 ++++++++++++++++++ 10 files changed, 393 insertions(+), 147 deletions(-) create mode 100644 regression-test/data/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_publication.out create mode 100644 regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_publication.groovy diff --git a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java index 3e7eaabe2bfb09..d31794766ad7f6 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java +++ b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java @@ -46,6 +46,9 @@ public class DataSourceConfigKeys { public static final String PUBLICATION_NAME = "publication_name"; public static final String DEFAULT_SLOT_PREFIX = "doris_cdc_"; public static final String DEFAULT_PUBLICATION_PREFIX = "doris_pub_"; + // Pre-PR default (Debezium auto-created). Legacy jobs reconnecting on a newer version fall + // back to this name when no publication_name was persisted. + public static final String LEGACY_PUBLICATION_NAME = "dbz_publication"; public static String defaultSlotName(String jobId) { return DEFAULT_SLOT_PREFIX + jobId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/PostgresResourceValidator.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/PostgresResourceValidator.java index d5ad3c42bae29a..d35cda9fe41c42 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/PostgresResourceValidator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/PostgresResourceValidator.java @@ -46,8 +46,11 @@ public static void validate(Map sourceProperties, String jobId, throws JobException { String slotName = resolveSlotName(sourceProperties, jobId); String publicationName = resolvePublicationName(sourceProperties, jobId); - boolean slotUserProvided = isSlotUserProvided(sourceProperties); - boolean pubUserProvided = isPublicationUserProvided(sourceProperties); + // Pattern-match ownership: name equals the default = Doris-owned (auto); otherwise user. + String defaultSlot = DataSourceConfigKeys.defaultSlotName(jobId); + String defaultPub = DataSourceConfigKeys.defaultPublicationName(jobId); + boolean slotUserProvided = !defaultSlot.equals(slotName); + boolean pubUserProvided = !defaultPub.equals(publicationName); String pgSchema = sourceProperties.get(DataSourceConfigKeys.SCHEMA); List qualifiedTables = new ArrayList<>(); for (String name : tableNames) { @@ -105,6 +108,8 @@ public static void validate(Map sourceProperties, String jobId, throw new JobException( "Failed to validate PG resources for publication " + publicationName + ": " + e.getMessage(), e); + } finally { + jdbcClient.closeClient(); } } @@ -118,14 +123,6 @@ private static String resolvePublicationName(Map config, String return StringUtils.isNotBlank(name) ? name : DataSourceConfigKeys.defaultPublicationName(jobId); } - private static boolean isSlotUserProvided(Map config) { - return StringUtils.isNotBlank(config.get(DataSourceConfigKeys.SLOT_NAME)); - } - - private static boolean isPublicationUserProvided(Map config) { - return StringUtils.isNotBlank(config.get(DataSourceConfigKeys.PUBLICATION_NAME)); - } - private static boolean publicationExists(Connection conn, String publicationName) throws Exception { try (PreparedStatement ps = conn.prepareStatement("SELECT 1 FROM pg_publication WHERE pubname = ?")) { ps.setString(1, publicationName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index 24efbfbda1a414..0f498ba0104ac5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -230,7 +230,7 @@ private void initSourceJob() { String includeTables = String.join(",", createTbls); sourceProperties.put(DataSourceConfigKeys.INCLUDE_TABLES, includeTables); } - StreamingJobUtils.validateSourceResources( + StreamingJobUtils.resolveAndValidateSource( dataSourceType, sourceProperties, String.valueOf(getJobId()), createTbls); this.offsetProvider = new JdbcSourceOffsetProvider(getJobId(), dataSourceType, StreamingJobUtils.convertCertFile(getDbId(), sourceProperties)); @@ -307,6 +307,9 @@ private void initInsertJob() { this.originTvfProps = currentTvf.getProperties().getMap(); this.offsetProvider = SourceOffsetProviderFactory.createSourceOffsetProvider(currentTvf.getFunctionName()); this.offsetProvider.ensureInitialized(getJobId(), originTvfProps); + // Validate source-side resources (e.g. PG slot/publication ownership) once at job + // creation so conflicts fail fast. No-op for standalone cdc_stream TVF (no job). + StreamingJobUtils.validateTvfSource(tvfType, originTvfProps, String.valueOf(getJobId())); this.offsetProvider.initOnCreate(); // validate offset props, only for s3 cause s3 tvf no offset prop if (jobProperties.getOffsetProperty() != null @@ -881,8 +884,7 @@ private String getShowSQL() { StringBuilder sb = new StringBuilder(); sb.append("FROM ").append(dataSourceType.name()); sb.append("("); - Map displaySourceProps = buildDisplaySourceProperties(); - for (Map.Entry entry : displaySourceProps.entrySet()) { + for (Map.Entry entry : sourceProperties.entrySet()) { if (entry.getKey().equalsIgnoreCase("password")) { continue; } @@ -987,21 +989,6 @@ private static boolean checkHasSourceJobPriv(ConnectContext ctx, String targetDb return true; } - // PG jobs don't persist default slot/publication names; surface them here so SHOW reflects - // the values the cdc client actually uses. - private Map buildDisplaySourceProperties() { - if (dataSourceType != DataSourceType.POSTGRES) { - return sourceProperties; - } - Map display = new LinkedHashMap<>(sourceProperties); - String jobIdStr = String.valueOf(getJobId()); - display.putIfAbsent(DataSourceConfigKeys.SLOT_NAME, - DataSourceConfigKeys.defaultSlotName(jobIdStr)); - display.putIfAbsent(DataSourceConfigKeys.PUBLICATION_NAME, - DataSourceConfigKeys.defaultPublicationName(jobIdStr)); - return display; - } - private String generateEncryptedSql() { makeConnectContext(); TreeMap, String> indexInSqlToString = new TreeMap<>(new Pair.PairComparator<>()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java index 803deec145a61b..33da084c3e5a93 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java @@ -96,8 +96,16 @@ public JdbcTvfSourceOffsetProvider() { */ @Override public void ensureInitialized(Long jobId, Map originTvfProps) throws JobException { + String type = originTvfProps.get(DataSourceConfigKeys.TYPE); + Preconditions.checkArgument(type != null, "type is required"); + DataSourceType resolvedType = DataSourceType.valueOf(type.toUpperCase()); + + // Populate default slot/pub into sourceProperties so cleanMeta -> /api/close + // carries the resolved names for cdcclient ownership-based cleanup. + Map effective = new HashMap<>(originTvfProps); + StreamingJobUtils.populateDefaultSourceProperties(resolvedType, effective, String.valueOf(jobId)); // Always refresh fields that may be updated via ALTER JOB (e.g. credentials, parallelism). - this.sourceProperties = originTvfProps; + this.sourceProperties = effective; this.snapshotParallelism = Integer.parseInt( originTvfProps.getOrDefault(DataSourceConfigKeys.SNAPSHOT_PARALLELISM, DataSourceConfigKeys.SNAPSHOT_PARALLELISM_DEFAULT)); @@ -109,9 +117,7 @@ public void ensureInitialized(Long jobId, Map originTvfProps) th // is reconstructed fresh (getPersistInfo returns null), so jobId is null then too. this.jobId = jobId; this.chunkHighWatermarkMap = new HashMap<>(); - String type = originTvfProps.get(DataSourceConfigKeys.TYPE); - Preconditions.checkArgument(type != null, "type is required"); - this.sourceType = DataSourceType.valueOf(type.toUpperCase()); + this.sourceType = resolvedType; String table = originTvfProps.get(DataSourceConfigKeys.TABLE); Preconditions.checkArgument(table != null, "table is required for cdc_stream TVF"); } @@ -143,6 +149,10 @@ public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand originComm props.put(CdcStreamTableValuedFunction.META_KEY, new Gson().toJson(offset.generateMeta())); props.put(CdcStreamTableValuedFunction.JOB_ID_KEY, String.valueOf(jobId)); props.put(CdcStreamTableValuedFunction.TASK_ID_KEY, String.valueOf(taskId)); + // Inject default slot/publication names so cdc client ownership logic on BE + // sees the resolved names. Users who specified their own values are preserved + // via putIfAbsent. + StreamingJobUtils.populateDefaultSourceProperties(sourceType, props, String.valueOf(jobId)); return new UnboundTVFRelation( originTvfRel.getRelationId(), originTvfRel.getFunctionName(), new Properties(props)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java index a349f51dbae849..438fb29417912e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java @@ -450,19 +450,72 @@ public static List getColumns(JdbcClient jdbcClient, * refer to the hierarchical mapping in the JDBC catalog. */ /** - * Validate source-side resources (e.g. PG replication slot and publication) at CREATE JOB time - * so users get an actionable error before the CDC client connects. No-op for sources that - * have no such concept. + * Populate default resource names into properties, then validate. No-op for sources that + * don't need it. Mutates properties: callers should expect default values to be inserted. */ - public static void validateSourceResources(DataSourceType sourceType, - Map properties, - String jobId, - List tables) throws JobException { + public static void resolveAndValidateSource(DataSourceType sourceType, + Map properties, + String jobId, + List tables) throws JobException { + if (sourceType == DataSourceType.POSTGRES) { + // PG slot/pub: values equal to default = Doris-owned; any other value = user-owned. + // (users cannot specify default names since jobId is unknown pre-CREATE) + properties.putIfAbsent(DataSourceConfigKeys.SLOT_NAME, + DataSourceConfigKeys.defaultSlotName(jobId)); + properties.putIfAbsent(DataSourceConfigKeys.PUBLICATION_NAME, + DataSourceConfigKeys.defaultPublicationName(jobId)); + validateSource(sourceType, properties, jobId, tables); + } + } + + public static void validateSource(DataSourceType sourceType, + Map properties, + String jobId, + List tables) throws JobException { if (sourceType == DataSourceType.POSTGRES) { PostgresResourceValidator.validate(properties, jobId, tables); } } + /** + * Validate source-side resources for a streaming job backed by a TVF. Only cdc_stream + * TVF is subject to source validation (e.g. PG slot/publication ownership); other TVFs + * (s3, ...) are no-ops. + * + *

originTvfProps is treated as read-only (Nereids may hand back an immutable map). + * Defaults are populated into a temporary copy so ownership checks see the effective + * slot/pub names without mutating the caller's map. + */ + public static void validateTvfSource(String tvfType, + Map originTvfProps, + String jobId) throws JobException { + if (!"cdc_stream".equalsIgnoreCase(tvfType)) { + return; + } + DataSourceType sourceType = DataSourceType.valueOf( + originTvfProps.get(DataSourceConfigKeys.TYPE).toUpperCase()); + List tables = Collections.singletonList( + originTvfProps.get(DataSourceConfigKeys.TABLE)); + Map effective = new HashMap<>(originTvfProps); + populateDefaultSourceProperties(sourceType, effective, jobId); + validateSource(sourceType, effective, jobId, tables); + } + + + /** Persist resolved resource names so ownership is self-describing after restart. */ + public static void populateDefaultSourceProperties(DataSourceType sourceType, + Map properties, + String jobId) { + if (sourceType == DataSourceType.POSTGRES) { + // PG slot/pub: values equal to default = Doris-owned; any other value = user-owned. + // (users cannot specify default names since jobId is unknown pre-CREATE) + properties.putIfAbsent(DataSourceConfigKeys.SLOT_NAME, + DataSourceConfigKeys.defaultSlotName(jobId)); + properties.putIfAbsent(DataSourceConfigKeys.PUBLICATION_NAME, + DataSourceConfigKeys.defaultPublicationName(jobId)); + } + } + public static String getRemoteDbName(DataSourceType sourceType, Map properties) { String remoteDb = null; switch (sourceType) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java index 0bb8994060233f..86ac016f579249 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java @@ -25,7 +25,6 @@ import org.apache.doris.job.cdc.DataSourceConfigKeys; import org.apache.doris.job.cdc.request.FetchRecordRequest; import org.apache.doris.job.common.DataSourceType; -import org.apache.doris.job.exception.JobException; import org.apache.doris.job.util.StreamingJobUtils; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TFileType; @@ -37,7 +36,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -63,6 +61,13 @@ public CdcStreamTableValuedFunction(Map properties) throws Analy private void processProps(Map properties) throws AnalysisException { Map copyProps = new HashMap<>(properties); copyProps.put("format", "json"); + + // Materialize jobId so generateParams has a stable value across TVF invocations. + // Standalone TVF path: not tied to any job, so we generate a random id here. + // TVF-inside-job path: job.id is already injected by rewriteTvfParams. + copyProps.computeIfAbsent(JOB_ID_KEY, + k -> UUID.randomUUID().toString().replace("-", "")); + super.parseCommonProperties(copyProps); this.processedParams.put(ENABLE_CDC_CLIENT_KEY, "true"); this.processedParams.put(URI_KEY, URI); @@ -70,7 +75,7 @@ private void processProps(Map properties) throws AnalysisExcepti this.processedParams.put(HTTP_ENABLE_CHUNK_RESPONSE_KEY, "true"); this.processedParams.put(HTTP_METHOD_KEY, "POST"); - String payload = generateParams(properties); + String payload = generateParams(copyProps); this.processedParams.put(HTTP_PAYLOAD_KEY, payload); this.backendConnectProperties.putAll(processedParams); generateFileStatus(); @@ -78,8 +83,7 @@ private void processProps(Map properties) throws AnalysisExcepti private String generateParams(Map properties) throws AnalysisException { FetchRecordRequest recordRequest = new FetchRecordRequest(); - String defaultJobId = UUID.randomUUID().toString().replace("-", ""); - recordRequest.setJobId(properties.getOrDefault(JOB_ID_KEY, defaultJobId)); + recordRequest.setJobId(properties.get(JOB_ID_KEY)); recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE)); recordRequest.setConfig(properties); try { @@ -91,7 +95,6 @@ private String generateParams(Map properties) throws AnalysisExc Map metaMap = objectMapper.readValue(meta, new TypeReference>() {}); recordRequest.setMeta(metaMap); } - return objectMapper.writeValueAsString(recordRequest); } catch (IOException e) { LOG.warn("Failed to serialize fetch record request", e); @@ -112,16 +115,6 @@ private void validate(Map properties) throws AnalysisException { if (!properties.containsKey(DataSourceConfigKeys.OFFSET)) { throw new AnalysisException("offset is required"); } - DataSourceType sourceType = DataSourceType.valueOf( - properties.get(DataSourceConfigKeys.TYPE).toUpperCase()); - String jobId = properties.getOrDefault(JOB_ID_KEY, - UUID.randomUUID().toString().replace("-", "")); - List tables = Collections.singletonList(properties.get(DataSourceConfigKeys.TABLE)); - try { - StreamingJobUtils.validateSourceResources(sourceType, properties, jobId, tables); - } catch (JobException e) { - throw new AnalysisException(e.getMessage(), e); - } } private void generateFileStatus() { diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java index c0d333996c7e67..1d9ec6bd626822 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java @@ -95,8 +95,9 @@ public PostgresSourceReader() { public void initialize(String jobId, DataSource dataSource, Map config) { PostgresSourceConfig sourceConfig = generatePostgresConfig(config, jobId, 0); PostgresDialect dialect = new PostgresDialect(sourceConfig); - // Slot is caller-owned when slot_name is set; Doris neither creates nor drops it. - if (!isSlotUserProvided(config)) { + // Only create the slot when Doris owns it (name == default); user-provided slots must + // pre-exist, validated at CREATE JOB. + if (isSlotDorisOwned(config, jobId)) { synchronized (SLOT_CREATION_LOCK) { LOG.info("Creating slot for job {}, user {}", jobId, sourceConfig.getUsername()); createSlotForGlobalStreamSplit(dialect); @@ -233,13 +234,14 @@ private PostgresSourceConfig generatePostgresConfig( Properties dbzProps = ConfigUtil.getDefaultDebeziumProps(); dbzProps.put("interval.handling.mode", "string"); - // Publication-only ownership: user-provided = DISABLED, otherwise FILTERED. + // Doris-owned = FILTERED (auto-create per-table publication); otherwise DISABLED + // (user-provided or legacy dbz_publication already present on PG). String publicationName = resolvePublicationName(cdcConfig, jobId); String slotName = resolveSlotName(cdcConfig, jobId); AutoCreateMode autocreateMode = - isPublicationUserProvided(cdcConfig) - ? AutoCreateMode.DISABLED - : AutoCreateMode.FILTERED; + isPublicationDorisOwned(cdcConfig, jobId) + ? AutoCreateMode.FILTERED + : AutoCreateMode.DISABLED; dbzProps.put(PostgresConnectorConfig.PUBLICATION_NAME.name(), publicationName); dbzProps.put( PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE.name(), @@ -278,21 +280,24 @@ private String resolveSlotName(Map config, String jobId) { return StringUtils.isNotBlank(name) ? name : DataSourceConfigKeys.defaultSlotName(jobId); } + // Legacy jobs (created before slot/pub names were persisted) keep no publication_name in + // sourceProperties; fall back to the pre-PR Debezium default so they continue to use the + // existing publication on PG. private String resolvePublicationName(Map config, String jobId) { String name = config.get(DataSourceConfigKeys.PUBLICATION_NAME); - return StringUtils.isNotBlank(name) - ? name - : DataSourceConfigKeys.defaultPublicationName(jobId); + return StringUtils.isNotBlank(name) ? name : DataSourceConfigKeys.LEGACY_PUBLICATION_NAME; } - // Per-resource ownership: each name is independent. A set name means the caller owns that - // resource - Doris will neither create nor drop it. - private static boolean isSlotUserProvided(Map config) { - return StringUtils.isNotBlank(config.get(DataSourceConfigKeys.SLOT_NAME)); + // Per-resource ownership: Doris owns the resource iff the resolved name equals + // doris_{cdc|pub}_{jobId}. Users cannot specify this name (jobId is unknown pre-CREATE); + // legacy publication resolves to dbz_publication and stays user-owned (not dropped). + private boolean isSlotDorisOwned(Map config, String jobId) { + return DataSourceConfigKeys.defaultSlotName(jobId).equals(resolveSlotName(config, jobId)); } - private static boolean isPublicationUserProvided(Map config) { - return StringUtils.isNotBlank(config.get(DataSourceConfigKeys.PUBLICATION_NAME)); + private boolean isPublicationDorisOwned(Map config, String jobId) { + return DataSourceConfigKeys.defaultPublicationName(jobId) + .equals(resolvePublicationName(config, jobId)); } @Override @@ -493,8 +498,8 @@ public void close(JobBaseConfig jobConfig) { String jobId = jobConfig.getJobId(); String slotName = resolveSlotName(config, jobId); String pubName = resolvePublicationName(config, jobId); - boolean dropSlot = !isSlotUserProvided(config); - boolean dropPub = !isPublicationUserProvided(config); + boolean dropSlot = isSlotDorisOwned(config, jobId); + boolean dropPub = isPublicationDorisOwned(config, jobId); if (!dropSlot && !dropPub) { LOG.info( "Skipping drop of user-provided slot {} / publication {} for job {}", diff --git a/regression-test/data/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_publication.out b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_publication.out new file mode 100644 index 00000000000000..b842d45fbac497 --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_publication.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_auto_snapshot -- +1 Alice + +-- !select_user_snapshot -- +1 Alice + diff --git a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy index 498251a9a99362..ced310a539e4e0 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy @@ -61,81 +61,10 @@ suite("test_cdc_stream_tvf_postgres", "p0,external,pg,external_docker,external_d exception "Unsupported offset: initial" } - // Fail fast when a user-provided slot does not exist on PG. - test { - sql """ - select * from cdc_stream( - "type" = "postgres", - "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", - "driver_url" = "${driver_url}", - "driver_class" = "org.postgresql.Driver", - "user" = "${pgUser}", - "password" = "${pgPassword}", - "database" = "${pgDB}", - "schema" = "${pgSchema}", - "table" = "${table1}", - "slot_name" = "tvf_missing_slot_xyz", - "offset" = 'latest') - """ - exception "replication slot does not exist" - } - - // Fail fast when a user-provided publication does not exist on PG. - test { - sql """ - select * from cdc_stream( - "type" = "postgres", - "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", - "driver_url" = "${driver_url}", - "driver_class" = "org.postgresql.Driver", - "user" = "${pgUser}", - "password" = "${pgPassword}", - "database" = "${pgDB}", - "schema" = "${pgSchema}", - "table" = "${table1}", - "publication_name" = "tvf_missing_pub_xyz", - "offset" = 'latest') - """ - exception "publication does not exist" - } - - // Fail fast when a user-provided publication exists but does not cover the target table. - def otherTable = "user_info_pg_normal2_tvf" - def partialPub = "tvf_partial_pub" - connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { - sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${otherTable}""" - sql """CREATE TABLE ${pgDB}.${pgSchema}.${otherTable} ( - "name" varchar(200), - "age" int2, - PRIMARY KEY ("name") - )""" - sql """DROP PUBLICATION IF EXISTS ${partialPub}""" - sql """CREATE PUBLICATION ${partialPub} FOR TABLE ${pgDB}.${pgSchema}.${otherTable}""" - } - try { - test { - sql """ - select * from cdc_stream( - "type" = "postgres", - "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", - "driver_url" = "${driver_url}", - "driver_class" = "org.postgresql.Driver", - "user" = "${pgUser}", - "password" = "${pgPassword}", - "database" = "${pgDB}", - "schema" = "${pgSchema}", - "table" = "${table1}", - "publication_name" = "${partialPub}", - "offset" = 'latest') - """ - exception "is missing required tables" - } - } finally { - connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { - sql """DROP PUBLICATION IF EXISTS ${partialPub}""" - sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${otherTable}""" - } - } + // Standalone cdc_stream TVF does not validate PG slot/publication ownership + // (that check runs only when cdc_stream is nested inside a streaming INSERT job + // — see test_cdc_stream_tvf_publication). The TVF path just forwards whatever + // slot_name / publication_name the user supplies to cdcclient. // Here, because PG consumption requires creating a slot first, // we only verify whether the execution can be successful. diff --git a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_publication.groovy b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_publication.groovy new file mode 100644 index 00000000000000..2a3a163cef5dfc --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_publication.groovy @@ -0,0 +1,262 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +// PG slot/publication ownership on the cdc_stream TVF path (streaming INSERT job). +// Test 1: auto-generated slot & publication — default names injected at task scheduling, +// created on PG side, cleaned up on DROP JOB +// Test 2: user-provided slot & publication — Doris uses them but never drops them +// Test 3: mixed (user publication + auto slot) — only the auto slot is dropped +// +// cdc_stream TVF only supports a single source table (`table` property), so each case +// uses one PG table instead of the multi-table include_tables list used by the from-to +// ownership test. +suite("test_cdc_stream_tvf_publication", "p0,external,pg,external_docker,external_docker_pg,nondatalake") { + def jobName = "test_cdc_tvf_pub_job" + def currentDb = (sql "select database()")[0][0] + def dorisTable = "cdc_tvf_pub_tbl" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + def pgTable = "cdc_tvf_pub_src" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${dorisTable} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + def prepareDorisTable = { + sql """drop table if exists ${currentDb}.${dorisTable} force""" + sql """ + CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} ( + `id` int NOT NULL, + `name` varchar(200) NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ("replication_num" = "1") + """ + } + + def preparePgTable = { + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgTable}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgTable} ( + "id" int PRIMARY KEY, + "name" varchar(200) + )""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (id, name) VALUES (1, 'Alice')""" + } + } + + def waitJobRunning = { String name -> + try { + Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until({ + def cnt = sql """select SucceedTaskCount from jobs("type"="insert") where Name='${name}' and ExecuteType='STREAMING'""" + cnt.size() == 1 && (cnt.get(0).get(0) as int) >= 2 + }) + } catch (Exception ex) { + log.info("job: " + (sql """select * from jobs("type"="insert") where Name='${name}'""")) + log.info("tasks: " + (sql """select * from tasks("type"="insert") where JobName='${name}'""")) + throw ex + } + } + + // ========== Test 1: auto-generated slot_name and publication_name ========== + prepareDorisTable() + preparePgTable() + + sql """ + CREATE JOB ${jobName} + ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (id, name) + SELECT id, name FROM cdc_stream( + "type" = "postgres", + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "table" = "${pgTable}", + "offset" = "initial", + "snapshot_split_size" = "1" + ) + """ + + def jobRow = sql """select Id from jobs("type"="insert") where Name='${jobName}'""" + def jobId = jobRow.get(0).get(0).toString() + def expectedSlot = "doris_cdc_${jobId}" + def expectedPub = "doris_pub_${jobId}" + log.info("jobId: ${jobId}, expected slot: ${expectedSlot}, pub: ${expectedPub}") + + waitJobRunning(jobName) + + // Defaults are injected runtime (not in ExecuteSql); verify via PG side. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + def pubResult = sql """SELECT pubname, puballtables FROM pg_publication WHERE pubname = '${expectedPub}'""" + assert pubResult.size() == 1 : "auto publication ${expectedPub} should exist" + assert pubResult[0][1] == false : "auto publication should NOT be FOR ALL TABLES" + + def pubTables = sql """SELECT tablename FROM pg_publication_tables WHERE pubname = '${expectedPub}' AND schemaname = '${pgSchema}'""" + assert pubTables.size() == 1 : "publication should contain exactly 1 table" + assert pubTables[0][0] == pgTable : "publication should contain ${pgTable}" + + def slotResult = sql """SELECT slot_name FROM pg_replication_slots WHERE slot_name = '${expectedSlot}'""" + assert slotResult.size() == 1 : "auto replication slot ${expectedSlot} should exist" + } + + qt_select_auto_snapshot """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY id """ + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + + Awaitility.await().atMost(60, SECONDS).pollInterval(1, SECONDS).untilAsserted { + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + def pubAfter = sql """SELECT COUNT(1) FROM pg_publication WHERE pubname = '${expectedPub}'""" + assert pubAfter[0][0] == 0 : "auto publication should be dropped after job deletion" + def slotAfter = sql """SELECT COUNT(1) FROM pg_replication_slots WHERE slot_name = '${expectedSlot}'""" + assert slotAfter[0][0] == 0 : "auto slot should be dropped after job deletion" + } + } + + // ========== Test 2: user-provided slot_name and publication_name ========== + def userSlot = "test_tvf_user_slot" + def userPub = "test_tvf_user_pub" + + prepareDorisTable() + preparePgTable() + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP PUBLICATION IF EXISTS ${userPub}""" + sql """CREATE PUBLICATION ${userPub} FOR TABLE ${pgDB}.${pgSchema}.${pgTable}""" + def existing = sql """SELECT COUNT(1) FROM pg_replication_slots WHERE slot_name = '${userSlot}'""" + if (existing[0][0] != 0) { + sql """SELECT pg_drop_replication_slot('${userSlot}')""" + } + sql """SELECT pg_create_logical_replication_slot('${userSlot}', 'pgoutput')""" + } + + sql """ + CREATE JOB ${jobName} + ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (id, name) + SELECT id, name FROM cdc_stream( + "type" = "postgres", + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "table" = "${pgTable}", + "slot_name" = "${userSlot}", + "publication_name" = "${userPub}", + "offset" = "initial", + "snapshot_split_size" = "1" + ) + """ + + waitJobRunning(jobName) + qt_select_user_snapshot """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY id """ + + // Drop job: Doris must NOT touch user-provided slot or publication. + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sleep(5000) + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + def pubAfter = sql """SELECT COUNT(1) FROM pg_publication WHERE pubname = '${userPub}'""" + assert pubAfter[0][0] == 1 : "user-provided publication must be preserved after job deletion" + def slotAfter = sql """SELECT COUNT(1) FROM pg_replication_slots WHERE slot_name = '${userSlot}'""" + assert slotAfter[0][0] == 1 : "user-provided slot must be preserved after job deletion" + sql """SELECT pg_drop_replication_slot('${userSlot}')""" + sql """DROP PUBLICATION IF EXISTS ${userPub}""" + } + + // ========== Test 3: only publication_name provided (slot is Doris-managed) ========== + def mixedJob = "test_cdc_tvf_pub_mixed" + def mixedPub = "test_tvf_mixed_pub" + sql """DROP JOB IF EXISTS where jobname = '${mixedJob}'""" + + prepareDorisTable() + preparePgTable() + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP PUBLICATION IF EXISTS ${mixedPub}""" + sql """CREATE PUBLICATION ${mixedPub} FOR TABLE ${pgDB}.${pgSchema}.${pgTable}""" + } + + sql """ + CREATE JOB ${mixedJob} + ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (id, name) + SELECT id, name FROM cdc_stream( + "type" = "postgres", + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "table" = "${pgTable}", + "publication_name" = "${mixedPub}", + "offset" = "initial", + "snapshot_split_size" = "1" + ) + """ + + waitJobRunning(mixedJob) + + def mixedRow = sql """select Id from jobs("type"="insert") where Name='${mixedJob}'""" + def mixedJobId = mixedRow.get(0).get(0).toString() + def mixedExpectedSlot = "doris_cdc_${mixedJobId}" + + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + def slotResult = sql """SELECT slot_name FROM pg_replication_slots WHERE slot_name = '${mixedExpectedSlot}'""" + assert slotResult.size() == 1 : "auto slot ${mixedExpectedSlot} should exist" + def pubTables = sql """SELECT tablename FROM pg_publication_tables WHERE pubname = '${mixedPub}' AND schemaname = '${pgSchema}'""" + assert pubTables.size() == 1 : "user-provided publication should contain the table" + } + + // Drop job: Doris drops the auto-managed slot but preserves the user-provided publication. + sql """DROP JOB IF EXISTS where jobname = '${mixedJob}'""" + Awaitility.await().atMost(60, SECONDS).pollInterval(1, SECONDS).untilAsserted { + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + def slotCount = sql """SELECT COUNT(1) FROM pg_replication_slots WHERE slot_name = '${mixedExpectedSlot}'""" + assert slotCount[0][0] == 0 : "Doris-managed slot ${mixedExpectedSlot} should be dropped" + } + } + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + def pubAfter = sql """SELECT COUNT(1) FROM pg_publication WHERE pubname = '${mixedPub}'""" + assert pubAfter[0][0] == 1 : "user-provided publication must be preserved" + sql """DROP PUBLICATION IF EXISTS ${mixedPub}""" + } + + // Cleanup + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgTable}""" + } + sql """drop table if exists ${currentDb}.${dorisTable} force""" + } +} From 8730818d26e7573135450b76ea7eb1b66dd3b0b0 Mon Sep 17 00:00:00 2001 From: wudi Date: Mon, 20 Apr 2026 10:38:28 +0800 Subject: [PATCH 3/3] fix --- .../job/offset/jdbc/JdbcTvfSourceOffsetProvider.java | 4 ---- .../tablefunction/CdcStreamTableValuedFunction.java | 11 +++++++---- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java index 33da084c3e5a93..8e2e5eff9dffd5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java @@ -149,10 +149,6 @@ public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand originComm props.put(CdcStreamTableValuedFunction.META_KEY, new Gson().toJson(offset.generateMeta())); props.put(CdcStreamTableValuedFunction.JOB_ID_KEY, String.valueOf(jobId)); props.put(CdcStreamTableValuedFunction.TASK_ID_KEY, String.valueOf(taskId)); - // Inject default slot/publication names so cdc client ownership logic on BE - // sees the resolved names. Users who specified their own values are preserved - // via putIfAbsent. - StreamingJobUtils.populateDefaultSourceProperties(sourceType, props, String.valueOf(jobId)); return new UnboundTVFRelation( originTvfRel.getRelationId(), originTvfRel.getFunctionName(), new Properties(props)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java index 86ac016f579249..24031de5e9b019 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java @@ -62,12 +62,15 @@ private void processProps(Map properties) throws AnalysisExcepti Map copyProps = new HashMap<>(properties); copyProps.put("format", "json"); - // Materialize jobId so generateParams has a stable value across TVF invocations. - // Standalone TVF path: not tied to any job, so we generate a random id here. - // TVF-inside-job path: job.id is already injected by rewriteTvfParams. - copyProps.computeIfAbsent(JOB_ID_KEY, + // Standalone TVF: random jobId. TVF-in-job: job.id injected by rewriteTvfParams. + String jobId = copyProps.computeIfAbsent(JOB_ID_KEY, k -> UUID.randomUUID().toString().replace("-", "")); + // Default PG slot/pub so cdcclient auto-creates per-job resources + StreamingJobUtils.populateDefaultSourceProperties( + DataSourceType.valueOf(copyProps.get(DataSourceConfigKeys.TYPE).toUpperCase()), + copyProps, jobId); + super.parseCommonProperties(copyProps); this.processedParams.put(ENABLE_CDC_CLIENT_KEY, "true"); this.processedParams.put(URI_KEY, URI);