Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigSnapshotPieceReq;
import org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigSnapshotSealReq;
import org.apache.iotdb.confignode.persistence.schema.CNSnapshotFileType;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;

import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -97,4 +98,21 @@ public void testPipeTransferConfigSnapshotSealReq() throws IOException {
Assert.assertEquals(req.getFileLengths(), deserializeReq.getFileLengths());
Assert.assertEquals(req.getParameters(), deserializeReq.getParameters());
}

@Test
public void testPipeTransferConfigSnapshotSealReqPreservesPathPattern() throws IOException {
String snapshotName = "cluster_schema.bin";
String templateInfoName = "template_info.bin";
CNSnapshotFileType fileType = CNSnapshotFileType.SCHEMA;
String typeString = "200";

PipeTransferConfigSnapshotSealReq req =
PipeTransferConfigSnapshotSealReq.toTPipeTransferReq(
"root.ln.**", snapshotName, 100, templateInfoName, 10, fileType, typeString);
PipeTransferConfigSnapshotSealReq deserializeReq =
PipeTransferConfigSnapshotSealReq.fromTPipeTransferReq(req);

Assert.assertEquals(
"root.ln.**", deserializeReq.getParameters().get(ColumnHeaderConstant.PATH_PATTERN));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,28 @@ public void testCreateDatabase() {
.orElseThrow(AssertionError::new));
}

@Test
public void testCreateDatabaseWithProcess() {
final DatabaseSchemaPlan includedCreateDatabasePlan =
new DatabaseSchemaPlan(
ConfigPhysicalPlanType.CreateDatabase, new TDatabaseSchema("root.ln"));
final DatabaseSchemaPlan excludedCreateDatabasePlan =
new DatabaseSchemaPlan(
ConfigPhysicalPlanType.CreateDatabase, new TDatabaseSchema("root.db"));
final IoTDBPipePatternOperations rootLnPattern =
new UnionIoTDBPipePattern(new IoTDBPipePattern("root.ln.**"));

Assert.assertEquals(
includedCreateDatabasePlan,
IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
.process(includedCreateDatabasePlan, rootLnPattern)
.orElseThrow(AssertionError::new));
Assert.assertFalse(
IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
.process(excludedCreateDatabasePlan, rootLnPattern)
.isPresent());
}

@Test
public void testAlterDatabase() {
final DatabaseSchemaPlan alterDatabasePlan =
Expand Down Expand Up @@ -201,6 +223,26 @@ public void testCommitSetSchemaTemplate() {
.orElseThrow(AssertionError::new));
}

@Test
public void testCommitSetSchemaTemplateWithRootLnPattern() {
final IoTDBPipePatternOperations rootLnPattern =
new UnionIoTDBPipePattern(new IoTDBPipePattern("root.ln.**"));
final CommitSetSchemaTemplatePlan includedSetTemplatePlan =
new CommitSetSchemaTemplatePlan("t1", "root.ln.wf01");
final CommitSetSchemaTemplatePlan excludedSetTemplatePlan =
new CommitSetSchemaTemplatePlan("t1", "root.db.wf01");

Assert.assertEquals(
includedSetTemplatePlan,
IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
.process(includedSetTemplatePlan, rootLnPattern)
.orElseThrow(AssertionError::new));
Assert.assertFalse(
IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
.process(excludedSetTemplatePlan, rootLnPattern)
.isPresent());
}

@Test
public void testPipeUnsetSchemaTemplate() {
final PipeUnsetSchemaTemplatePlan pipeUnsetSchemaTemplatePlanOnPrefix =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ private boolean registerDatabase(
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
false);
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& result.status.code != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()
&& result.status.code != TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
&& result.status.code != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
LOGGER.error(
"Create Database error, statement: {}, result status : {}.", statement, result.status);
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement;
import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
Expand Down Expand Up @@ -536,17 +537,29 @@ static LoadTsFileStatement buildLoadTsFileStatementForSync(
private TSStatus loadSchemaSnapShot(
final Map<String, String> parameters, final List<String> fileAbsolutePaths)
throws IllegalPathException, IOException {
final String databaseName = parameters.get(ColumnHeaderConstant.DATABASE);
final PartialPath databasePath = new PartialPath(databaseName);

final String pathPattern = parameters.get(ColumnHeaderConstant.PATH_PATTERN);
if (!shouldLoadSchemaSnapshotDatabase(pathPattern, databaseName)) {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
final PipePattern pipePattern =
PipePattern.parsePatternFromString(pathPattern, IoTDBPipePattern::new);

final TSStatus createDatabaseStatus = createSchemaSnapshotDatabaseIfNecessary(databasePath);
if (createDatabaseStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return createDatabaseStatus;
}

final SRStatementGenerator generator =
SchemaRegionSnapshotParser.translate2Statements(
Paths.get(fileAbsolutePaths.get(0)),
fileAbsolutePaths.size() > 1 ? Paths.get(fileAbsolutePaths.get(1)) : null,
new PartialPath(parameters.get(ColumnHeaderConstant.DATABASE)));
databasePath);
final Set<StatementType> executionTypes =
PipeSchemaRegionSnapshotEvent.getStatementTypeSet(
parameters.get(ColumnHeaderConstant.TYPE));
final PipePattern pipePattern =
PipePattern.parsePatternFromString(
parameters.get(ColumnHeaderConstant.PATH_PATTERN), IoTDBPipePattern::new);

// Clear to avoid previous exceptions
batchVisitor.clear();
Expand All @@ -571,6 +584,39 @@ private TSStatus loadSchemaSnapShot(
return PipeReceiverStatusHandler.getPriorStatus(results);
}

static boolean shouldLoadSchemaSnapshotDatabase(
final String pathPattern, final String databaseName) {
return PipePattern.parsePatternFromString(pathPattern, IoTDBPipePattern::new)
.mayOverlapWithDb(databaseName);
}

private TSStatus createSchemaSnapshotDatabaseIfNecessary(final PartialPath databasePath) {
final DatabaseSchemaStatement statement =
new DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE);
statement.setDatabasePath(databasePath);

final TSStatus status = executeStatementAndClassifyExceptions(statement);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}

if (status.getCode() == TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
return Objects.equals(
status.getMessage(),
databasePath.getFullPath() + " has already been created as database")
? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)
: new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(status.getMessage());
}

if (status.getCode() == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(status.getMessage());
}

return status;
}

private TPipeTransferResp handleTransferSchemaPlan(final PipeTransferPlanNodeReq req) {
// We may be able to skip the alter logical view's exception parsing because
// the "AlterLogicalViewNode" is itself idempotent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.iotdb.db.queryengine.plan.planner.TreeModelPlanner;
import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.utils.SetThreadName;

import org.slf4j.Logger;
Expand Down Expand Up @@ -208,13 +209,19 @@ private IQueryExecution createQueryExecutionForTreeModel(
long startTime) {
queryContext.setTimeOut(timeOut);
queryContext.setStartTime(startTime);
if (statement instanceof IConfigStatement) {
queryContext.setQueryType(((IConfigStatement) statement).getQueryType());
final Statement configStatement =
statement instanceof PipeEnrichedStatement
&& ((PipeEnrichedStatement) statement).getInnerStatement()
instanceof IConfigStatement
? ((PipeEnrichedStatement) statement).getInnerStatement()
: statement;
if (configStatement instanceof IConfigStatement) {
queryContext.setQueryType(((IConfigStatement) configStatement).getQueryType());
return new ConfigExecution(
queryContext,
statement.getType(),
configStatement.getType(),
executor,
statement.accept(new ConfigTaskVisitor(), queryContext));
configStatement.accept(new ConfigTaskVisitor(), queryContext));
}
TreeModelPlanner treeModelPlanner =
new TreeModelPlanner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ public void testRepeatedStatementExceptionLogIsReduced() throws Exception {
}
}

@Test
public void testSchemaSnapshotDatabaseIsFilteredByPattern() {
Assert.assertTrue(
IoTDBDataNodeReceiver.shouldLoadSchemaSnapshotDatabase("root.ln.**", "root.ln"));
Assert.assertTrue(IoTDBDataNodeReceiver.shouldLoadSchemaSnapshotDatabase("root.**", "root.db"));
Assert.assertFalse(
IoTDBDataNodeReceiver.shouldLoadSchemaSnapshotDatabase("root.ln.**", "root.db"));
}

@Test
public void testLoadTsFileSyncStatementVerifiesSchemaWhenConvertingType() throws Exception {
final Path tsFile = Files.createTempFile("pipe-load-convert-verify-schema", ".tsfile");
Expand Down
Loading