Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,23 @@ public class DataSourceConfigKeys {
public static final String SSL_MODE = "ssl_mode";
public static final String SSL_ROOTCERT = "ssl_rootcert";

// PostgreSQL replication slot and publication config
public static final String SLOT_NAME = "slot_name";
public static final String PUBLICATION_NAME = "publication_name";
public static final String DEFAULT_SLOT_PREFIX = "doris_cdc_";
public static final String DEFAULT_PUBLICATION_PREFIX = "doris_pub_";
// Pre-PR default (Debezium auto-created). Legacy jobs reconnecting on a newer version fall
// back to this name when no publication_name was persisted.
public static final String LEGACY_PUBLICATION_NAME = "dbz_publication";

public static String defaultSlotName(String jobId) {
return DEFAULT_SLOT_PREFIX + jobId;
}

public static String defaultPublicationName(String jobId) {
return DEFAULT_PUBLICATION_PREFIX + jobId;
}

// per-table config: key format is "table.<tableName>.<suffix>"
public static final String TABLE = "table";
public static final String TABLE_EXCLUDE_COLUMNS_SUFFIX = "exclude_columns";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@

import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

public class DataSourceConfigValidator {

// PostgreSQL unquoted identifier: lowercase letters, digits, underscores, not starting with a digit.
private static final Pattern PG_IDENTIFIER_PATTERN = Pattern.compile("^[a-z_][a-z0-9_]*$");
private static final int PG_MAX_IDENTIFIER_LENGTH = 63;
private static final Set<String> ALLOW_SOURCE_KEYS = Sets.newHashSet(
DataSourceConfigKeys.JDBC_URL,
DataSourceConfigKeys.USER,
Expand All @@ -40,7 +45,9 @@ public class DataSourceConfigValidator {
DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE,
DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
DataSourceConfigKeys.SSL_MODE,
DataSourceConfigKeys.SSL_ROOTCERT
DataSourceConfigKeys.SSL_ROOTCERT,
DataSourceConfigKeys.SLOT_NAME,
DataSourceConfigKeys.PUBLICATION_NAME
);
Comment thread
JNSimba marked this conversation as resolved.

// Known suffixes for per-table config keys (format: "table.<tableName>.<suffix>")
Expand Down Expand Up @@ -114,6 +121,14 @@ private static boolean isValidValue(String key, String value) {
|| value.equals(DataSourceConfigKeys.OFFSET_SNAPSHOT))) {
return false;
}

// slot_name / publication_name are interpolated into PG DDL without quoting,
// so enforce unquoted-identifier grammar to prevent injection and runtime errors.
if (key.equals(DataSourceConfigKeys.SLOT_NAME)
|| key.equals(DataSourceConfigKeys.PUBLICATION_NAME)) {
return value.length() <= PG_MAX_IDENTIFIER_LENGTH
&& PG_IDENTIFIER_PATTERN.matcher(value).matches();
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.job.extensions.insert.streaming;

import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
import org.apache.doris.job.common.DataSourceType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.util.StreamingJobUtils;

import org.apache.commons.lang3.StringUtils;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Fail-fast validation of PostgreSQL replication slot and publication at CREATE JOB time,
* before the CDC client connects. Catches mistakes (user-provided slot/publication missing,
* conflicting jobId) with actionable errors. Validation runs only on initial create; restarts
* skip this path by design, so an active slot held by the previous BE does not self-conflict.
*/
public class PostgresResourceValidator {

public static void validate(Map<String, String> sourceProperties, String jobId, List<String> tableNames)
throws JobException {
String slotName = resolveSlotName(sourceProperties, jobId);
String publicationName = resolvePublicationName(sourceProperties, jobId);
// Pattern-match ownership: name equals the default = Doris-owned (auto); otherwise user.
String defaultSlot = DataSourceConfigKeys.defaultSlotName(jobId);
String defaultPub = DataSourceConfigKeys.defaultPublicationName(jobId);
boolean slotUserProvided = !defaultSlot.equals(slotName);
boolean pubUserProvided = !defaultPub.equals(publicationName);
String pgSchema = sourceProperties.get(DataSourceConfigKeys.SCHEMA);
List<String> qualifiedTables = new ArrayList<>();
for (String name : tableNames) {
qualifiedTables.add(pgSchema + "." + name);
}

JdbcClient jdbcClient = StreamingJobUtils.getJdbcClient(DataSourceType.POSTGRES, sourceProperties);
Comment thread
JNSimba marked this conversation as resolved.
try (Connection conn = jdbcClient.getConnection()) {
boolean pubExists = publicationExists(conn, publicationName);
if (!pubExists && pubUserProvided) {
throw new JobException(
"publication does not exist: " + publicationName
+ ". Create it before starting the job or omit "
+ DataSourceConfigKeys.PUBLICATION_NAME
+ " to let Doris create one.");
}
if (pubExists) {
List<String> missing = findMissingTables(conn, publicationName, qualifiedTables);
Comment thread
JNSimba marked this conversation as resolved.
if (!missing.isEmpty()) {
if (pubUserProvided) {
throw new JobException(
"publication " + publicationName
+ " is missing required tables: " + missing
+ ". Add them via ALTER PUBLICATION ... ADD TABLE before starting.");
} else {
throw new JobException(
"publication " + publicationName
+ " already exists but does not cover the configured"
+ " include_tables (missing: " + missing
+ "). Another Doris cluster may be using the same jobId."
+ " Please set " + DataSourceConfigKeys.PUBLICATION_NAME
+ " explicitly to avoid the conflict.");
}
}
}
Boolean slotActive = queryReplicationSlotActive(conn, slotName);
if (slotUserProvided && slotActive == null) {
throw new JobException(
"replication slot does not exist: " + slotName
+ ". Create it before starting the job or omit "
+ DataSourceConfigKeys.SLOT_NAME
+ " to let Doris create one.");
}
if (!slotUserProvided && Boolean.TRUE.equals(slotActive)) {
throw new JobException(
"replication slot " + slotName
+ " is active, held by another consumer. Another Doris"
+ " cluster may be using the same jobId. Please set "
+ DataSourceConfigKeys.SLOT_NAME
+ " explicitly to avoid the conflict.");
}
} catch (JobException e) {
throw e;
} catch (Exception e) {
throw new JobException(
"Failed to validate PG resources for publication " + publicationName
+ ": " + e.getMessage(), e);
} finally {
jdbcClient.closeClient();
}
}

private static String resolveSlotName(Map<String, String> config, String jobId) {
String name = config.get(DataSourceConfigKeys.SLOT_NAME);
return StringUtils.isNotBlank(name) ? name : DataSourceConfigKeys.defaultSlotName(jobId);
}

private static String resolvePublicationName(Map<String, String> config, String jobId) {
String name = config.get(DataSourceConfigKeys.PUBLICATION_NAME);
return StringUtils.isNotBlank(name) ? name : DataSourceConfigKeys.defaultPublicationName(jobId);
}

private static boolean publicationExists(Connection conn, String publicationName) throws Exception {
try (PreparedStatement ps = conn.prepareStatement("SELECT 1 FROM pg_publication WHERE pubname = ?")) {
ps.setString(1, publicationName);
try (ResultSet rs = ps.executeQuery()) {
return rs.next();
}
}
}

private static List<String> findMissingTables(Connection conn, String publicationName, List<String> tables)
throws Exception {
Set<String> covered = new HashSet<>();
try (PreparedStatement ps = conn.prepareStatement(
"SELECT schemaname, tablename FROM pg_publication_tables WHERE pubname = ?")) {
ps.setString(1, publicationName);
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
covered.add(rs.getString(1) + "." + rs.getString(2));
}
}
}
List<String> missing = new ArrayList<>();
for (String table : tables) {
if (!covered.contains(table)) {
missing.add(table);
}
}
return missing;
}

/** Returns the slot's active flag, or null when the slot does not exist. */
private static Boolean queryReplicationSlotActive(Connection conn, String slotName) throws Exception {
try (PreparedStatement ps = conn.prepareStatement(
"SELECT active FROM pg_replication_slots WHERE slot_name = ?")) {
ps.setString(1, slotName);
try (ResultSet rs = ps.executeQuery()) {
if (!rs.next()) {
return null;
}
return rs.getBoolean(1);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ private void initSourceJob() {
String includeTables = String.join(",", createTbls);
sourceProperties.put(DataSourceConfigKeys.INCLUDE_TABLES, includeTables);
}
StreamingJobUtils.resolveAndValidateSource(
dataSourceType, sourceProperties, String.valueOf(getJobId()), createTbls);
this.offsetProvider = new JdbcSourceOffsetProvider(getJobId(), dataSourceType,
StreamingJobUtils.convertCertFile(getDbId(), sourceProperties));
JdbcSourceOffsetProvider rdsOffsetProvider = (JdbcSourceOffsetProvider) this.offsetProvider;
Expand Down Expand Up @@ -305,6 +307,9 @@ private void initInsertJob() {
this.originTvfProps = currentTvf.getProperties().getMap();
this.offsetProvider = SourceOffsetProviderFactory.createSourceOffsetProvider(currentTvf.getFunctionName());
this.offsetProvider.ensureInitialized(getJobId(), originTvfProps);
// Validate source-side resources (e.g. PG slot/publication ownership) once at job
// creation so conflicts fail fast. No-op for standalone cdc_stream TVF (no job).
StreamingJobUtils.validateTvfSource(tvfType, originTvfProps, String.valueOf(getJobId()));
this.offsetProvider.initOnCreate();
// validate offset props, only for s3 cause s3 tvf no offset prop
if (jobProperties.getOffsetProperty() != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,16 @@ public JdbcTvfSourceOffsetProvider() {
*/
@Override
public void ensureInitialized(Long jobId, Map<String, String> originTvfProps) throws JobException {
String type = originTvfProps.get(DataSourceConfigKeys.TYPE);
Preconditions.checkArgument(type != null, "type is required");
DataSourceType resolvedType = DataSourceType.valueOf(type.toUpperCase());

// Populate default slot/pub into sourceProperties so cleanMeta -> /api/close
// carries the resolved names for cdcclient ownership-based cleanup.
Map<String, String> effective = new HashMap<>(originTvfProps);
StreamingJobUtils.populateDefaultSourceProperties(resolvedType, effective, String.valueOf(jobId));
// Always refresh fields that may be updated via ALTER JOB (e.g. credentials, parallelism).
this.sourceProperties = originTvfProps;
this.sourceProperties = effective;
this.snapshotParallelism = Integer.parseInt(
originTvfProps.getOrDefault(DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
DataSourceConfigKeys.SNAPSHOT_PARALLELISM_DEFAULT));
Expand All @@ -109,9 +117,7 @@ public void ensureInitialized(Long jobId, Map<String, String> originTvfProps) th
// is reconstructed fresh (getPersistInfo returns null), so jobId is null then too.
this.jobId = jobId;
this.chunkHighWatermarkMap = new HashMap<>();
String type = originTvfProps.get(DataSourceConfigKeys.TYPE);
Preconditions.checkArgument(type != null, "type is required");
this.sourceType = DataSourceType.valueOf(type.toUpperCase());
this.sourceType = resolvedType;
String table = originTvfProps.get(DataSourceConfigKeys.TABLE);
Preconditions.checkArgument(table != null, "table is required for cdc_stream TVF");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.doris.job.cdc.split.SnapshotSplit;
import org.apache.doris.job.common.DataSourceType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.streaming.PostgresResourceValidator;
import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
Expand Down Expand Up @@ -448,6 +449,73 @@ public static List<Column> getColumns(JdbcClient jdbcClient,
* The remoteDB implementation differs for each data source;
* refer to the hierarchical mapping in the JDBC catalog.
*/
/**
* Populate default resource names into properties, then validate. No-op for sources that
* don't need it. Mutates properties: callers should expect default values to be inserted.
*/
public static void resolveAndValidateSource(DataSourceType sourceType,
Map<String, String> properties,
String jobId,
List<String> tables) throws JobException {
if (sourceType == DataSourceType.POSTGRES) {
// PG slot/pub: values equal to default = Doris-owned; any other value = user-owned.
// (users cannot specify default names since jobId is unknown pre-CREATE)
properties.putIfAbsent(DataSourceConfigKeys.SLOT_NAME,
DataSourceConfigKeys.defaultSlotName(jobId));
properties.putIfAbsent(DataSourceConfigKeys.PUBLICATION_NAME,
DataSourceConfigKeys.defaultPublicationName(jobId));
validateSource(sourceType, properties, jobId, tables);
}
}

public static void validateSource(DataSourceType sourceType,
Map<String, String> properties,
String jobId,
List<String> tables) throws JobException {
if (sourceType == DataSourceType.POSTGRES) {
PostgresResourceValidator.validate(properties, jobId, tables);
}
}

/**
* Validate source-side resources for a streaming job backed by a TVF. Only cdc_stream
* TVF is subject to source validation (e.g. PG slot/publication ownership); other TVFs
* (s3, ...) are no-ops.
*
* <p>originTvfProps is treated as read-only (Nereids may hand back an immutable map).
* Defaults are populated into a temporary copy so ownership checks see the effective
* slot/pub names without mutating the caller's map.
*/
public static void validateTvfSource(String tvfType,
Map<String, String> originTvfProps,
String jobId) throws JobException {
if (!"cdc_stream".equalsIgnoreCase(tvfType)) {
return;
}
DataSourceType sourceType = DataSourceType.valueOf(
originTvfProps.get(DataSourceConfigKeys.TYPE).toUpperCase());
List<String> tables = Collections.singletonList(
originTvfProps.get(DataSourceConfigKeys.TABLE));
Map<String, String> effective = new HashMap<>(originTvfProps);
populateDefaultSourceProperties(sourceType, effective, jobId);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking: the CREATE JOB ... DO INSERT ... SELECT FROM cdc_stream(...) path still bypasses the new FE identifier validation.

CreateJobCommand.validate() only calls DataSourceConfigValidator.validateSource() for FROM POSTGRES jobs. Here validateTvfSource() goes straight to PostgresResourceValidator, so slot_name / publication_name values like Bad-Name or pub;drop are accepted in the TVF-backed job path even though the PR description says CREATE-time validation now covers both paths. Those raw values later reach the cdc client DDL and fail only at runtime.

Please run the same DataSourceConfigValidator.validateSource() checks on the TVF props before PostgresResourceValidator.validate().

validateSource(sourceType, effective, jobId, tables);
}


/** Persist resolved resource names so ownership is self-describing after restart. */
public static void populateDefaultSourceProperties(DataSourceType sourceType,
Map<String, String> properties,
String jobId) {
if (sourceType == DataSourceType.POSTGRES) {
// PG slot/pub: values equal to default = Doris-owned; any other value = user-owned.
// (users cannot specify default names since jobId is unknown pre-CREATE)
properties.putIfAbsent(DataSourceConfigKeys.SLOT_NAME,
DataSourceConfigKeys.defaultSlotName(jobId));
properties.putIfAbsent(DataSourceConfigKeys.PUBLICATION_NAME,
DataSourceConfigKeys.defaultPublicationName(jobId));
}
}

public static String getRemoteDbName(DataSourceType sourceType, Map<String, String> properties) {
String remoteDb = null;
switch (sourceType) {
Expand Down
Loading
Loading