diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/PipeConfigNodeThriftRequestTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/PipeConfigNodeThriftRequestTest.java index f014a8ae22a33..f1d17d8a0f01e 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/PipeConfigNodeThriftRequestTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/PipeConfigNodeThriftRequestTest.java @@ -25,6 +25,7 @@ import org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigSnapshotPieceReq; import org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigSnapshotSealReq; import org.apache.iotdb.confignode.persistence.schema.CNSnapshotFileType; +import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; import org.junit.Assert; import org.junit.Test; @@ -97,4 +98,21 @@ public void testPipeTransferConfigSnapshotSealReq() throws IOException { Assert.assertEquals(req.getFileLengths(), deserializeReq.getFileLengths()); Assert.assertEquals(req.getParameters(), deserializeReq.getParameters()); } + + @Test + public void testPipeTransferConfigSnapshotSealReqPreservesPathPattern() throws IOException { + String snapshotName = "cluster_schema.bin"; + String templateInfoName = "template_info.bin"; + CNSnapshotFileType fileType = CNSnapshotFileType.SCHEMA; + String typeString = "200"; + + PipeTransferConfigSnapshotSealReq req = + PipeTransferConfigSnapshotSealReq.toTPipeTransferReq( + "root.ln.**", snapshotName, 100, templateInfoName, 10, fileType, typeString); + PipeTransferConfigSnapshotSealReq deserializeReq = + PipeTransferConfigSnapshotSealReq.fromTPipeTransferReq(req); + + Assert.assertEquals( + "root.ln.**", deserializeReq.getParameters().get(ColumnHeaderConstant.PATH_PATTERN)); + } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/source/PipeConfigPhysicalPlanPatternParseVisitorTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/source/PipeConfigPhysicalPlanPatternParseVisitorTest.java index 8125a980acd3d..a246ea9de903e 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/source/PipeConfigPhysicalPlanPatternParseVisitorTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/source/PipeConfigPhysicalPlanPatternParseVisitorTest.java @@ -91,6 +91,28 @@ public void testCreateDatabase() { .orElseThrow(AssertionError::new)); } + @Test + public void testCreateDatabaseWithProcess() { + final DatabaseSchemaPlan includedCreateDatabasePlan = + new DatabaseSchemaPlan( + ConfigPhysicalPlanType.CreateDatabase, new TDatabaseSchema("root.ln")); + final DatabaseSchemaPlan excludedCreateDatabasePlan = + new DatabaseSchemaPlan( + ConfigPhysicalPlanType.CreateDatabase, new TDatabaseSchema("root.db")); + final IoTDBPipePatternOperations rootLnPattern = + new UnionIoTDBPipePattern(new IoTDBPipePattern("root.ln.**")); + + Assert.assertEquals( + includedCreateDatabasePlan, + IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR + .process(includedCreateDatabasePlan, rootLnPattern) + .orElseThrow(AssertionError::new)); + Assert.assertFalse( + IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR + .process(excludedCreateDatabasePlan, rootLnPattern) + .isPresent()); + } + @Test public void testAlterDatabase() { final DatabaseSchemaPlan alterDatabasePlan = @@ -201,6 +223,26 @@ public void testCommitSetSchemaTemplate() { .orElseThrow(AssertionError::new)); } + @Test + public void testCommitSetSchemaTemplateWithRootLnPattern() { + final IoTDBPipePatternOperations rootLnPattern = + new UnionIoTDBPipePattern(new IoTDBPipePattern("root.ln.**")); + final CommitSetSchemaTemplatePlan includedSetTemplatePlan = + new CommitSetSchemaTemplatePlan("t1", "root.ln.wf01"); + final CommitSetSchemaTemplatePlan excludedSetTemplatePlan = + new CommitSetSchemaTemplatePlan("t1", "root.db.wf01"); + + Assert.assertEquals( + includedSetTemplatePlan, + IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR + .process(includedSetTemplatePlan, rootLnPattern) + .orElseThrow(AssertionError::new)); + Assert.assertFalse( + IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR + .process(excludedSetTemplatePlan, rootLnPattern) + .isPresent()); + } + @Test public void testPipeUnsetSchemaTemplate() { final PipeUnsetSchemaTemplatePlan pipeUnsetSchemaTemplatePlanOnPrefix = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java index cc85e2f4f100e..b9e26ee0fa57d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java @@ -160,8 +160,7 @@ private boolean registerDatabase( IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(), false); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && result.status.code != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode() - && result.status.code != TSStatusCode.DATABASE_CONFLICT.getStatusCode()) { + && result.status.code != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { LOGGER.error( "Create Database error, statement: {}, result status : {}.", statement, result.status); return false; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 8a5c4ad060fa6..4c1ed9748119a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -83,6 +83,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement; import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper; import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil; @@ -536,17 +537,29 @@ static LoadTsFileStatement buildLoadTsFileStatementForSync( private TSStatus loadSchemaSnapShot( final Map parameters, final List fileAbsolutePaths) throws IllegalPathException, IOException { + final String databaseName = parameters.get(ColumnHeaderConstant.DATABASE); + final PartialPath databasePath = new PartialPath(databaseName); + + final String pathPattern = parameters.get(ColumnHeaderConstant.PATH_PATTERN); + if (!shouldLoadSchemaSnapshotDatabase(pathPattern, databaseName)) { + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } + final PipePattern pipePattern = + PipePattern.parsePatternFromString(pathPattern, IoTDBPipePattern::new); + + final TSStatus createDatabaseStatus = createSchemaSnapshotDatabaseIfNecessary(databasePath); + if (createDatabaseStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return createDatabaseStatus; + } + final SRStatementGenerator generator = SchemaRegionSnapshotParser.translate2Statements( Paths.get(fileAbsolutePaths.get(0)), fileAbsolutePaths.size() > 1 ? Paths.get(fileAbsolutePaths.get(1)) : null, - new PartialPath(parameters.get(ColumnHeaderConstant.DATABASE))); + databasePath); final Set executionTypes = PipeSchemaRegionSnapshotEvent.getStatementTypeSet( parameters.get(ColumnHeaderConstant.TYPE)); - final PipePattern pipePattern = - PipePattern.parsePatternFromString( - parameters.get(ColumnHeaderConstant.PATH_PATTERN), IoTDBPipePattern::new); // Clear to avoid previous exceptions batchVisitor.clear(); @@ -571,6 +584,39 @@ private TSStatus loadSchemaSnapShot( return PipeReceiverStatusHandler.getPriorStatus(results); } + static boolean shouldLoadSchemaSnapshotDatabase( + final String pathPattern, final String databaseName) { + return PipePattern.parsePatternFromString(pathPattern, IoTDBPipePattern::new) + .mayOverlapWithDb(databaseName); + } + + private TSStatus createSchemaSnapshotDatabaseIfNecessary(final PartialPath databasePath) { + final DatabaseSchemaStatement statement = + new DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE); + statement.setDatabasePath(databasePath); + + final TSStatus status = executeStatementAndClassifyExceptions(statement); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; + } + + if (status.getCode() == TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { + return Objects.equals( + status.getMessage(), + databasePath.getFullPath() + " has already been created as database") + ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS) + : new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) + .setMessage(status.getMessage()); + } + + if (status.getCode() == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) { + return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) + .setMessage(status.getMessage()); + } + + return status; + } + private TPipeTransferResp handleTransferSchemaPlan(final PipeTransferPlanNodeReq req) { // We may be able to skip the alter logical view's exception parsing because // the "AlterLogicalViewNode" is itself idempotent diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index 2f173861f22e4..2a548557d8cf9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -48,6 +48,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.TreeModelPlanner; import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement; import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; import org.apache.iotdb.db.utils.SetThreadName; import org.slf4j.Logger; @@ -208,13 +209,19 @@ private IQueryExecution createQueryExecutionForTreeModel( long startTime) { queryContext.setTimeOut(timeOut); queryContext.setStartTime(startTime); - if (statement instanceof IConfigStatement) { - queryContext.setQueryType(((IConfigStatement) statement).getQueryType()); + final Statement configStatement = + statement instanceof PipeEnrichedStatement + && ((PipeEnrichedStatement) statement).getInnerStatement() + instanceof IConfigStatement + ? ((PipeEnrichedStatement) statement).getInnerStatement() + : statement; + if (configStatement instanceof IConfigStatement) { + queryContext.setQueryType(((IConfigStatement) configStatement).getQueryType()); return new ConfigExecution( queryContext, - statement.getType(), + configStatement.getType(), executor, - statement.accept(new ConfigTaskVisitor(), queryContext)); + configStatement.accept(new ConfigTaskVisitor(), queryContext)); } TreeModelPlanner treeModelPlanner = new TreeModelPlanner( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java index 1e279a18febfb..9eac46c8351d4 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java @@ -110,6 +110,15 @@ public void testRepeatedStatementExceptionLogIsReduced() throws Exception { } } + @Test + public void testSchemaSnapshotDatabaseIsFilteredByPattern() { + Assert.assertTrue( + IoTDBDataNodeReceiver.shouldLoadSchemaSnapshotDatabase("root.ln.**", "root.ln")); + Assert.assertTrue(IoTDBDataNodeReceiver.shouldLoadSchemaSnapshotDatabase("root.**", "root.db")); + Assert.assertFalse( + IoTDBDataNodeReceiver.shouldLoadSchemaSnapshotDatabase("root.ln.**", "root.db")); + } + @Test public void testLoadTsFileSyncStatementVerifiesSchemaWhenConvertingType() throws Exception { final Path tsFile = Files.createTempFile("pipe-load-convert-verify-schema", ".tsfile");