-
Notifications
You must be signed in to change notification settings - Fork 3.8k
[Improve](streaming-job) use per-table publication instead of ALL TABLES for PostgreSQL CDC #62526
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,169 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| package org.apache.doris.job.extensions.insert.streaming; | ||
|
|
||
| import org.apache.doris.datasource.jdbc.client.JdbcClient; | ||
| import org.apache.doris.job.cdc.DataSourceConfigKeys; | ||
| import org.apache.doris.job.common.DataSourceType; | ||
| import org.apache.doris.job.exception.JobException; | ||
| import org.apache.doris.job.util.StreamingJobUtils; | ||
|
|
||
| import org.apache.commons.lang3.StringUtils; | ||
|
|
||
| import java.sql.Connection; | ||
| import java.sql.PreparedStatement; | ||
| import java.sql.ResultSet; | ||
| import java.util.ArrayList; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
|
|
||
| /** | ||
| * Fail-fast validation of PostgreSQL replication slot and publication at CREATE JOB time, | ||
| * before the CDC client connects. Catches mistakes (user-provided slot/publication missing, | ||
| * conflicting jobId) with actionable errors. Validation runs only on initial create; restarts | ||
| * skip this path by design, so an active slot held by the previous BE does not self-conflict. | ||
| */ | ||
| public class PostgresResourceValidator { | ||
|
|
||
| public static void validate(Map<String, String> sourceProperties, String jobId, List<String> tableNames) | ||
| throws JobException { | ||
| String slotName = resolveSlotName(sourceProperties, jobId); | ||
| String publicationName = resolvePublicationName(sourceProperties, jobId); | ||
| // Pattern-match ownership: name equals the default = Doris-owned (auto); otherwise user. | ||
| String defaultSlot = DataSourceConfigKeys.defaultSlotName(jobId); | ||
| String defaultPub = DataSourceConfigKeys.defaultPublicationName(jobId); | ||
| boolean slotUserProvided = !defaultSlot.equals(slotName); | ||
| boolean pubUserProvided = !defaultPub.equals(publicationName); | ||
| String pgSchema = sourceProperties.get(DataSourceConfigKeys.SCHEMA); | ||
| List<String> qualifiedTables = new ArrayList<>(); | ||
| for (String name : tableNames) { | ||
| qualifiedTables.add(pgSchema + "." + name); | ||
| } | ||
|
|
||
| JdbcClient jdbcClient = StreamingJobUtils.getJdbcClient(DataSourceType.POSTGRES, sourceProperties); | ||
|
JNSimba marked this conversation as resolved.
|
||
| try (Connection conn = jdbcClient.getConnection()) { | ||
| boolean pubExists = publicationExists(conn, publicationName); | ||
| if (!pubExists && pubUserProvided) { | ||
| throw new JobException( | ||
| "publication does not exist: " + publicationName | ||
| + ". Create it before starting the job or omit " | ||
| + DataSourceConfigKeys.PUBLICATION_NAME | ||
| + " to let Doris create one."); | ||
| } | ||
| if (pubExists) { | ||
| List<String> missing = findMissingTables(conn, publicationName, qualifiedTables); | ||
|
JNSimba marked this conversation as resolved.
|
||
| if (!missing.isEmpty()) { | ||
| if (pubUserProvided) { | ||
| throw new JobException( | ||
| "publication " + publicationName | ||
| + " is missing required tables: " + missing | ||
| + ". Add them via ALTER PUBLICATION ... ADD TABLE before starting."); | ||
| } else { | ||
| throw new JobException( | ||
| "publication " + publicationName | ||
| + " already exists but does not cover the configured" | ||
| + " include_tables (missing: " + missing | ||
| + "). Another Doris cluster may be using the same jobId." | ||
| + " Please set " + DataSourceConfigKeys.PUBLICATION_NAME | ||
| + " explicitly to avoid the conflict."); | ||
| } | ||
| } | ||
| } | ||
| Boolean slotActive = queryReplicationSlotActive(conn, slotName); | ||
| if (slotUserProvided && slotActive == null) { | ||
| throw new JobException( | ||
| "replication slot does not exist: " + slotName | ||
| + ". Create it before starting the job or omit " | ||
| + DataSourceConfigKeys.SLOT_NAME | ||
| + " to let Doris create one."); | ||
| } | ||
| if (!slotUserProvided && Boolean.TRUE.equals(slotActive)) { | ||
| throw new JobException( | ||
| "replication slot " + slotName | ||
| + " is active, held by another consumer. Another Doris" | ||
| + " cluster may be using the same jobId. Please set " | ||
| + DataSourceConfigKeys.SLOT_NAME | ||
| + " explicitly to avoid the conflict."); | ||
| } | ||
| } catch (JobException e) { | ||
| throw e; | ||
| } catch (Exception e) { | ||
| throw new JobException( | ||
| "Failed to validate PG resources for publication " + publicationName | ||
| + ": " + e.getMessage(), e); | ||
| } finally { | ||
| jdbcClient.closeClient(); | ||
| } | ||
| } | ||
|
|
||
| private static String resolveSlotName(Map<String, String> config, String jobId) { | ||
| String name = config.get(DataSourceConfigKeys.SLOT_NAME); | ||
| return StringUtils.isNotBlank(name) ? name : DataSourceConfigKeys.defaultSlotName(jobId); | ||
| } | ||
|
|
||
| private static String resolvePublicationName(Map<String, String> config, String jobId) { | ||
| String name = config.get(DataSourceConfigKeys.PUBLICATION_NAME); | ||
| return StringUtils.isNotBlank(name) ? name : DataSourceConfigKeys.defaultPublicationName(jobId); | ||
| } | ||
|
|
||
| private static boolean publicationExists(Connection conn, String publicationName) throws Exception { | ||
| try (PreparedStatement ps = conn.prepareStatement("SELECT 1 FROM pg_publication WHERE pubname = ?")) { | ||
| ps.setString(1, publicationName); | ||
| try (ResultSet rs = ps.executeQuery()) { | ||
| return rs.next(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private static List<String> findMissingTables(Connection conn, String publicationName, List<String> tables) | ||
| throws Exception { | ||
| Set<String> covered = new HashSet<>(); | ||
| try (PreparedStatement ps = conn.prepareStatement( | ||
| "SELECT schemaname, tablename FROM pg_publication_tables WHERE pubname = ?")) { | ||
| ps.setString(1, publicationName); | ||
| try (ResultSet rs = ps.executeQuery()) { | ||
| while (rs.next()) { | ||
| covered.add(rs.getString(1) + "." + rs.getString(2)); | ||
| } | ||
| } | ||
| } | ||
| List<String> missing = new ArrayList<>(); | ||
| for (String table : tables) { | ||
| if (!covered.contains(table)) { | ||
| missing.add(table); | ||
| } | ||
| } | ||
| return missing; | ||
| } | ||
|
|
||
| /** Returns the slot's active flag, or null when the slot does not exist. */ | ||
| private static Boolean queryReplicationSlotActive(Connection conn, String slotName) throws Exception { | ||
| try (PreparedStatement ps = conn.prepareStatement( | ||
| "SELECT active FROM pg_replication_slots WHERE slot_name = ?")) { | ||
| ps.setString(1, slotName); | ||
| try (ResultSet rs = ps.executeQuery()) { | ||
| if (!rs.next()) { | ||
| return null; | ||
| } | ||
| return rs.getBoolean(1); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,6 +36,7 @@ | |
| import org.apache.doris.job.cdc.split.SnapshotSplit; | ||
| import org.apache.doris.job.common.DataSourceType; | ||
| import org.apache.doris.job.exception.JobException; | ||
| import org.apache.doris.job.extensions.insert.streaming.PostgresResourceValidator; | ||
| import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob; | ||
| import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand; | ||
| import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition; | ||
|
|
@@ -448,6 +449,73 @@ public static List<Column> getColumns(JdbcClient jdbcClient, | |
| * The remoteDB implementation differs for each data source; | ||
| * refer to the hierarchical mapping in the JDBC catalog. | ||
| */ | ||
| /** | ||
| * Populate default resource names into properties, then validate. No-op for sources that | ||
| * don't need it. Mutates properties: callers should expect default values to be inserted. | ||
| */ | ||
| public static void resolveAndValidateSource(DataSourceType sourceType, | ||
| Map<String, String> properties, | ||
| String jobId, | ||
| List<String> tables) throws JobException { | ||
| if (sourceType == DataSourceType.POSTGRES) { | ||
| // PG slot/pub: values equal to default = Doris-owned; any other value = user-owned. | ||
| // (users cannot specify default names since jobId is unknown pre-CREATE) | ||
| properties.putIfAbsent(DataSourceConfigKeys.SLOT_NAME, | ||
| DataSourceConfigKeys.defaultSlotName(jobId)); | ||
| properties.putIfAbsent(DataSourceConfigKeys.PUBLICATION_NAME, | ||
| DataSourceConfigKeys.defaultPublicationName(jobId)); | ||
| validateSource(sourceType, properties, jobId, tables); | ||
| } | ||
| } | ||
|
|
||
| public static void validateSource(DataSourceType sourceType, | ||
| Map<String, String> properties, | ||
| String jobId, | ||
| List<String> tables) throws JobException { | ||
| if (sourceType == DataSourceType.POSTGRES) { | ||
| PostgresResourceValidator.validate(properties, jobId, tables); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Validate source-side resources for a streaming job backed by a TVF. Only cdc_stream | ||
| * TVF is subject to source validation (e.g. PG slot/publication ownership); other TVFs | ||
| * (s3, ...) are no-ops. | ||
| * | ||
| * <p>originTvfProps is treated as read-only (Nereids may hand back an immutable map). | ||
| * Defaults are populated into a temporary copy so ownership checks see the effective | ||
| * slot/pub names without mutating the caller's map. | ||
| */ | ||
| public static void validateTvfSource(String tvfType, | ||
| Map<String, String> originTvfProps, | ||
| String jobId) throws JobException { | ||
| if (!"cdc_stream".equalsIgnoreCase(tvfType)) { | ||
| return; | ||
| } | ||
| DataSourceType sourceType = DataSourceType.valueOf( | ||
| originTvfProps.get(DataSourceConfigKeys.TYPE).toUpperCase()); | ||
| List<String> tables = Collections.singletonList( | ||
| originTvfProps.get(DataSourceConfigKeys.TABLE)); | ||
| Map<String, String> effective = new HashMap<>(originTvfProps); | ||
| populateDefaultSourceProperties(sourceType, effective, jobId); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Blocking: the
Please run the same |
||
| validateSource(sourceType, effective, jobId, tables); | ||
| } | ||
|
|
||
|
|
||
| /** Persist resolved resource names so ownership is self-describing after restart. */ | ||
| public static void populateDefaultSourceProperties(DataSourceType sourceType, | ||
| Map<String, String> properties, | ||
| String jobId) { | ||
| if (sourceType == DataSourceType.POSTGRES) { | ||
| // PG slot/pub: values equal to default = Doris-owned; any other value = user-owned. | ||
| // (users cannot specify default names since jobId is unknown pre-CREATE) | ||
| properties.putIfAbsent(DataSourceConfigKeys.SLOT_NAME, | ||
| DataSourceConfigKeys.defaultSlotName(jobId)); | ||
| properties.putIfAbsent(DataSourceConfigKeys.PUBLICATION_NAME, | ||
| DataSourceConfigKeys.defaultPublicationName(jobId)); | ||
| } | ||
| } | ||
|
|
||
| public static String getRemoteDbName(DataSourceType sourceType, Map<String, String> properties) { | ||
| String remoteDb = null; | ||
| switch (sourceType) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.