diff --git a/openmetadata-service/pom.xml b/openmetadata-service/pom.xml
index 58a1b7cd18e0..63cf94e9dc55 100644
--- a/openmetadata-service/pom.xml
+++ b/openmetadata-service/pom.xml
@@ -1017,6 +1017,11 @@
owasp-java-html-sanitizer
${owasp-html-sanitizer.version}
+
+ org.apache.commons
+ commons-compress
+ ${commons-compress.version}
+
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationTestCase.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationTestCase.java
new file mode 100644
index 000000000000..59999cc6ee00
--- /dev/null
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationTestCase.java
@@ -0,0 +1,10 @@
+package org.openmetadata.service.migration.api;
+
+import java.util.List;
+import org.jdbi.v3.core.Handle;
+
+public interface MigrationTestCase {
+ List validateBefore(Handle handle);
+
+ List validateAfter(Handle handle);
+}
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java
index e9df40fff097..08189447eb2e 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java
@@ -46,7 +46,7 @@ public class MigrationWorkflow {
public static final String SUCCESS_MSG = "Success";
public static final String FAILED_MSG = "Failed due to : ";
public static final String CURRENT = "Current";
- private List migrations;
+ @Getter private List migrations;
private final String nativeSQLScriptRootPath;
private final ConnectionType connectionType;
private final String extensionSQLScriptRootPath;
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/TestResult.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/TestResult.java
new file mode 100644
index 000000000000..57b76ac9f301
--- /dev/null
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/TestResult.java
@@ -0,0 +1,11 @@
+package org.openmetadata.service.migration.api;
+
+public record TestResult(String name, boolean passed, String detail) {
+ public static TestResult pass(String name) {
+ return new TestResult(name, true, "");
+ }
+
+ public static TestResult fail(String name, String detail) {
+ return new TestResult(name, false, detail);
+ }
+}
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/DatabaseBackupRestore.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/DatabaseBackupRestore.java
new file mode 100644
index 000000000000..53bdb44b72e2
--- /dev/null
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/DatabaseBackupRestore.java
@@ -0,0 +1,585 @@
+package org.openmetadata.service.util;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.math.BigDecimal;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+import org.jdbi.v3.core.Handle;
+import org.jdbi.v3.core.Jdbi;
+import org.openmetadata.service.jdbi3.locator.ConnectionType;
+
+@Slf4j
+public class DatabaseBackupRestore {
+
+ public static final int DEFAULT_BATCH_SIZE = 1000;
+ private static final long MAX_METADATA_SIZE = 10 * 1024 * 1024;
+ private static final Pattern SAFE_IDENTIFIER = Pattern.compile("^[a-zA-Z_][a-zA-Z0-9_]*$");
+ private static final ObjectMapper MAPPER =
+ new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT);
+
+ private final Jdbi jdbi;
+ private final ConnectionType connectionType;
+ private final String databaseName;
+ private final int batchSize;
+
+ public DatabaseBackupRestore(Jdbi jdbi, ConnectionType connectionType, String databaseName) {
+ this(jdbi, connectionType, databaseName, DEFAULT_BATCH_SIZE);
+ }
+
+ public DatabaseBackupRestore(
+ Jdbi jdbi, ConnectionType connectionType, String databaseName, int batchSize) {
+ this.jdbi = jdbi;
+ this.connectionType = connectionType;
+ this.databaseName = databaseName;
+ this.batchSize = batchSize;
+ }
+
+ public List discoverTables(Handle handle) {
+ String sql;
+ if (connectionType == ConnectionType.MYSQL) {
+ sql =
+ "SELECT table_name FROM information_schema.tables "
+ + "WHERE table_type = 'BASE TABLE' AND table_schema = :db ORDER BY table_name";
+ return handle.createQuery(sql).bind("db", databaseName).mapTo(String.class).list();
+ } else {
+ sql =
+ "SELECT table_name FROM information_schema.tables "
+ + "WHERE table_type = 'BASE TABLE' AND table_schema = current_schema() "
+ + "ORDER BY table_name";
+ return handle.createQuery(sql).mapTo(String.class).list();
+ }
+ }
+
+ public List discoverColumns(Handle handle, String tableName) {
+ String sql;
+ if (connectionType == ConnectionType.MYSQL) {
+ sql =
+ "SELECT column_name FROM information_schema.columns "
+ + "WHERE table_schema = :db AND table_name = :table "
+ + "AND (extra NOT LIKE '%GENERATED%' OR extra IS NULL) "
+ + "ORDER BY ordinal_position";
+ return handle
+ .createQuery(sql)
+ .bind("db", databaseName)
+ .bind("table", tableName)
+ .mapTo(String.class)
+ .list();
+ } else {
+ sql =
+ "SELECT column_name FROM information_schema.columns "
+ + "WHERE table_schema = current_schema() AND table_name = :table "
+ + "AND (is_generated = 'NEVER' OR is_generated IS NULL) "
+ + "AND (column_default NOT LIKE 'nextval%' OR column_default IS NULL) "
+ + "ORDER BY ordinal_position";
+ return handle.createQuery(sql).bind("table", tableName).mapTo(String.class).list();
+ }
+ }
+
+ List discoverPrimaryKeyColumns(Handle handle, String tableName) {
+ String sql;
+ if (connectionType == ConnectionType.MYSQL) {
+ sql =
+ "SELECT kcu.column_name FROM information_schema.key_column_usage kcu "
+ + "WHERE kcu.table_schema = :db AND kcu.table_name = :table "
+ + "AND kcu.constraint_name = 'PRIMARY' "
+ + "ORDER BY kcu.ordinal_position";
+ return handle
+ .createQuery(sql)
+ .bind("db", databaseName)
+ .bind("table", tableName)
+ .mapTo(String.class)
+ .list();
+ } else {
+ sql =
+ "SELECT kcu.column_name "
+ + "FROM information_schema.table_constraints tc "
+ + "JOIN information_schema.key_column_usage kcu "
+ + "ON tc.constraint_name = kcu.constraint_name "
+ + "AND tc.table_schema = kcu.table_schema "
+ + "WHERE tc.table_schema = current_schema() AND tc.table_name = :table "
+ + "AND tc.constraint_type = 'PRIMARY KEY' "
+ + "ORDER BY kcu.ordinal_position";
+ return handle.createQuery(sql).bind("table", tableName).mapTo(String.class).list();
+ }
+ }
+
+ Set discoverBinaryColumns(Handle handle, String tableName) {
+ String sql;
+ if (connectionType == ConnectionType.MYSQL) {
+ sql =
+ "SELECT column_name FROM information_schema.columns "
+ + "WHERE table_schema = :db AND table_name = :table "
+ + "AND data_type IN ('blob', 'tinyblob', 'mediumblob', 'longblob', 'binary', 'varbinary')";
+ return new HashSet<>(
+ handle
+ .createQuery(sql)
+ .bind("db", databaseName)
+ .bind("table", tableName)
+ .mapTo(String.class)
+ .list());
+ } else {
+ sql =
+ "SELECT column_name FROM information_schema.columns "
+ + "WHERE table_schema = current_schema() AND table_name = :table "
+ + "AND data_type = 'bytea'";
+ return new HashSet<>(
+ handle.createQuery(sql).bind("table", tableName).mapTo(String.class).list());
+ }
+ }
+
+ public static String extractDatabaseName(String jdbcUrl) {
+ String url = jdbcUrl;
+ int questionMark = url.indexOf('?');
+ if (questionMark > 0) {
+ url = url.substring(0, questionMark);
+ }
+ int lastSlash = url.lastIndexOf('/');
+ if (lastSlash < 0 || lastSlash == url.length() - 1) {
+ throw new IllegalArgumentException("Cannot extract database name from JDBC URL: " + jdbcUrl);
+ }
+ String dbName = url.substring(lastSlash + 1);
+ if (dbName.isEmpty()) {
+ throw new IllegalArgumentException("Cannot extract database name from JDBC URL: " + jdbcUrl);
+ }
+ return dbName;
+ }
+
+ public void backup(String backupPath) throws IOException {
+ LOG.info("Starting database backup to {}", backupPath);
+ try (FileOutputStream fos = new FileOutputStream(backupPath);
+ BufferedOutputStream bos = new BufferedOutputStream(fos);
+ GzipCompressorOutputStream gzos = new GzipCompressorOutputStream(bos);
+ TarArchiveOutputStream taos = new TarArchiveOutputStream(gzos)) {
+
+ taos.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
+
+ ObjectNode metadata = MAPPER.createObjectNode();
+ metadata.put("timestamp", Instant.now().toString());
+ metadata.put("version", System.getProperty("project.version", "unknown"));
+ metadata.put("databaseType", connectionType.name());
+ metadata.put("databaseName", databaseName);
+ ObjectNode tablesMetadata = MAPPER.createObjectNode();
+
+ jdbi.useHandle(
+ handle -> {
+ beginRepeatableReadTransaction(handle);
+ try {
+ List tables = discoverTables(handle);
+ LOG.info("Discovered {} tables", tables.size());
+
+ for (String tableName : tables) {
+ backupTable(handle, tableName, taos, tablesMetadata);
+ }
+ } finally {
+ commitTransaction(handle);
+ }
+ });
+
+ metadata.set("tables", tablesMetadata);
+ byte[] metadataBytes = MAPPER.writeValueAsBytes(metadata);
+ TarArchiveEntry metadataEntry = new TarArchiveEntry("metadata.json");
+ metadataEntry.setSize(metadataBytes.length);
+ taos.putArchiveEntry(metadataEntry);
+ taos.write(metadataBytes);
+ taos.closeArchiveEntry();
+
+ LOG.info("Backup completed successfully");
+ }
+ }
+
+ private void beginRepeatableReadTransaction(Handle handle) {
+ if (connectionType == ConnectionType.MYSQL) {
+ handle.execute("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ");
+ handle.execute("START TRANSACTION");
+ } else {
+ handle.execute("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ");
+ }
+ }
+
+ private void commitTransaction(Handle handle) {
+ handle.execute("COMMIT");
+ }
+
+ private void backupTable(
+ Handle handle, String tableName, TarArchiveOutputStream taos, ObjectNode tablesMetadata)
+ throws IOException {
+ List columns = discoverColumns(handle, tableName);
+ if (columns.isEmpty()) {
+ LOG.warn("No columns found for table {}, skipping", tableName);
+ return;
+ }
+
+ String quotedColumns = quoteColumns(columns);
+ String quotedTable = quoteIdentifier(tableName);
+
+ List pkColumns = discoverPrimaryKeyColumns(handle, tableName);
+ String orderByClause = buildOrderByClause(pkColumns, columns);
+ Set binaryColumns = discoverBinaryColumns(handle, tableName);
+
+ Path tempFile = Files.createTempFile("backup_" + tableName + "_", ".json");
+ int rowCount;
+ try {
+ rowCount =
+ writeTableToTempFile(
+ handle, quotedColumns, quotedTable, orderByClause, columns, tempFile);
+ addTempFileToTar(taos, tempFile, "tables/" + tableName + ".json");
+ } finally {
+ Files.deleteIfExists(tempFile);
+ }
+
+ ObjectNode tableInfo = MAPPER.createObjectNode();
+ ArrayNode columnsArray = MAPPER.createArrayNode();
+ columns.forEach(columnsArray::add);
+ tableInfo.set("columns", columnsArray);
+ ArrayNode binaryColumnsArray = MAPPER.createArrayNode();
+ binaryColumns.forEach(binaryColumnsArray::add);
+ tableInfo.set("binaryColumns", binaryColumnsArray);
+ tableInfo.put("rowCount", rowCount);
+ tablesMetadata.set(tableName, tableInfo);
+
+ LOG.info("Backed up table {} ({} rows, {} columns)", tableName, rowCount, columns.size());
+ }
+
+ private String buildOrderByClause(List pkColumns, List allColumns) {
+ List orderColumns = pkColumns.isEmpty() ? List.of(allColumns.get(0)) : pkColumns;
+ return " ORDER BY "
+ + orderColumns.stream().map(this::quoteIdentifier).collect(Collectors.joining(", "));
+ }
+
+ private int writeTableToTempFile(
+ Handle handle,
+ String quotedColumns,
+ String quotedTable,
+ String orderByClause,
+ List columns,
+ Path tempFile)
+ throws IOException {
+ int rowCount = 0;
+ try (OutputStream os = new BufferedOutputStream(new FileOutputStream(tempFile.toFile()));
+ JsonGenerator gen = new JsonFactory().createGenerator(os)) {
+ gen.setCodec(MAPPER);
+ gen.writeStartArray();
+
+ int offset = 0;
+ while (true) {
+ String sql =
+ String.format(
+ "SELECT %s FROM %s%s LIMIT %d OFFSET %d",
+ quotedColumns, quotedTable, orderByClause, batchSize, offset);
+ List