From c203a2dad8dcac7b9d22333897f1ec1366ff5d03 Mon Sep 17 00:00:00 2001 From: clsu <404083629@qq.com> Date: Tue, 16 Jun 2026 14:53:31 +0800 Subject: [PATCH 1/5] feat: add integration test support for StreamNode --- integration-test/pom.xml | 14 ++ .../org/apache/iotdb/it/env/EnvFactory.java | 4 + .../java/org/apache/iotdb/it/env/EnvType.java | 1 + .../iotdb/it/env/cluster/ClusterConstant.java | 7 + .../iotdb/it/env/cluster/env/AbstractEnv.java | 76 +++++++- .../iotdb/it/env/cluster/env/StreamEnv.java | 45 +++++ .../env/cluster/node/StreamNodeWrapper.java | 170 ++++++++++++++++++ .../itbase/category/StreamClusterIT.java | 3 + .../org/apache/iotdb/itbase/env/BaseEnv.java | 12 ++ 9 files changed, 331 insertions(+), 1 deletion(-) create mode 100644 integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/StreamEnv.java create mode 100644 integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/StreamNodeWrapper.java create mode 100644 integration-test/src/main/java/org/apache/iotdb/itbase/category/StreamClusterIT.java diff --git a/integration-test/pom.xml b/integration-test/pom.xml index c61509ed2c84c..5a290c3f77cbe 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -693,6 +693,20 @@ AI + + StreamClusterIT + + false + + + org.apache.iotdb.itbase.category.ManualIT + org.apache.iotdb.itbase.category.StreamClusterIT + false + true + true + Stream + + DailyIT diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/EnvFactory.java b/integration-test/src/main/java/org/apache/iotdb/it/env/EnvFactory.java index 2fbc9672837c8..54e2f48dabd09 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/EnvFactory.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/EnvFactory.java @@ -23,6 +23,7 @@ import org.apache.iotdb.it.env.cluster.env.AbstractEnv; import org.apache.iotdb.it.env.cluster.env.Cluster1Env; import org.apache.iotdb.it.env.cluster.env.SimpleEnv; +import org.apache.iotdb.it.env.cluster.env.StreamEnv; import org.apache.iotdb.it.env.remote.env.RemoteServerEnv; import org.apache.iotdb.it.framework.IoTDBTestLogger; import org.apache.iotdb.itbase.env.BaseEnv; @@ -59,6 +60,9 @@ public static BaseEnv getEnv() { case AI: env = new AIEnv(); break; + case STREAM: + env = new StreamEnv(); + break; case MultiCluster: logger.warn( "EnvFactory only supports EnvType Simple, Cluster1 and Remote, please use MultiEnvFactory instead."); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/EnvType.java b/integration-test/src/main/java/org/apache/iotdb/it/env/EnvType.java index 7c2ee415cf1b4..38ff8f2160bb5 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/EnvType.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/EnvType.java @@ -25,6 +25,7 @@ public enum EnvType { Cluster1, MultiCluster, AI, + STREAM, TABLE_SIMPLE, TABLE_CLUSTER1; diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java index 691a13509d9ee..145911e19d24e 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java @@ -45,6 +45,11 @@ public class ClusterConstant { "DefaultDataNodeCommonProperties"; public static final String DATA_REGION_PER_DATANODE = "integrationTest.dataRegionPerDataNode"; + // StreamNode constants + public static final String STREAMNODE_INIT_HEAP_SIZE = "StreamNodeInitHeapSize"; + public static final String STREAMNODE_MAX_HEAP_SIZE = "StreamNodeMaxHeapSize"; + public static final String STREAMNODE_MAX_DIRECT_MEMORY_SIZE = "StreamNodeMaxDirectMemorySize"; + // Cluster Configurations public static final String CLUSTER_CONFIGURATIONS = "ClusterConfigurations"; @@ -214,6 +219,8 @@ public class ClusterConstant { public static final String AI_NODE_NAME = "AINode"; + public static final String STREAM_NODE_NAME = "StreamNode"; + public static final String LOCK_FILE_PATH = System.getProperty(USER_DIR) + File.separator + TARGET + File.separator + "lock-"; public static final String TEMPLATE_NODE_PATH = diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 0e7eefb3a4188..1aff69b7775c8 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -51,6 +51,7 @@ import org.apache.iotdb.it.env.cluster.node.AbstractNodeWrapper; import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.env.cluster.node.StreamNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestLogger; import org.apache.iotdb.itbase.env.BaseEnv; import org.apache.iotdb.itbase.env.BaseNodeWrapper; @@ -102,6 +103,7 @@ public abstract class AbstractEnv implements BaseEnv { protected List configNodeWrapperList = Collections.emptyList(); protected List dataNodeWrapperList = Collections.emptyList(); protected List aiNodeWrapperList = Collections.emptyList(); + protected List streamNodeWrapperList = Collections.emptyList(); protected String testMethodName = null; protected int index = 0; protected long startTime; @@ -109,6 +111,7 @@ public abstract class AbstractEnv implements BaseEnv { private IClientManager clientManager; private List configNodeKillPoints = new ArrayList<>(); private List dataNodeKillPoints = new ArrayList<>(); + private List streamNodeKillPoints = new ArrayList<>(); /** * This config object stores the properties set by developers during the test. It will be cleared @@ -177,6 +180,15 @@ protected void initEnvironment( final int dataNodesNum, final int retryCount, final boolean addAINode) { + initEnvironment(configNodesNum, dataNodesNum, retryCount, addAINode, 0); + } + + protected void initEnvironment( + final int configNodesNum, + final int dataNodesNum, + final int retryCount, + final boolean addAINode, + final int streamNodesNum) { this.retryCount = retryCount; this.configNodeWrapperList = new ArrayList<>(); this.dataNodeWrapperList = new ArrayList<>(); @@ -263,9 +275,52 @@ protected void initEnvironment( startAINode(seedConfigNode, this.dataNodeWrapperList.get(0).getPort(), testClassName); } + if (streamNodesNum > 0) { + this.streamNodeWrapperList = new ArrayList<>(); + final List streamNodeEndpoints = new ArrayList<>(); + final RequestDelegate streamNodesDelegate = + new ParallelRequestDelegate<>(streamNodeEndpoints, NODE_START_TIMEOUT, this); + + for (int i = 0; i < streamNodesNum; i++) { + StreamNodeWrapper streamNodeWrapper = newStreamNode(); + streamNodeEndpoints.add(streamNodeWrapper.getIpAndPortString()); + this.streamNodeWrapperList.add(streamNodeWrapper); + streamNodesDelegate.addRequest( + () -> { + streamNodeWrapper.start(); + return null; + }); + } + + try { + streamNodesDelegate.requestAll(); + } catch (final SQLException e) { + logger.error("Start streamNodes failed", e); + throw new AssertionError(); + } + } + checkClusterStatusWithoutUnknown(); } + private StreamNodeWrapper newStreamNode() { + final StreamNodeWrapper streamNodeWrapper = + new StreamNodeWrapper( + configNodeWrapperList.get(0).getIpAndPortString(), + getTestClassName(), + testMethodName, + EnvUtils.searchAvailablePorts(), + index, + this instanceof MultiClusterEnv, + startTime); + + streamNodeWrapper.createNodeDir(); + streamNodeWrapper.changeConfig(); + streamNodeWrapper.createLogDir(); + streamNodeWrapper.setKillPoints(streamNodeKillPoints); + return streamNodeWrapper; + } + private ConfigNodeWrapper newConfigNode() { final ConfigNodeWrapper configNodeWrapper = new ConfigNodeWrapper( @@ -433,7 +488,8 @@ public void checkClusterStatus( if (showClusterResp.getNodeStatus().size() != configNodeWrapperList.size() + dataNodeWrapperList.size() - + aiNodeWrapperList.size()) { + + aiNodeWrapperList.size() + + streamNodeWrapperList.size()) { passed = false; nodeSizePassed = false; actualNodeSize = showClusterResp.getNodeStatusSize(); @@ -473,6 +529,14 @@ public void checkClusterStatus( processStatusMap.put(nodeWrapper, 0); } } + for (StreamNodeWrapper nodeWrapper : streamNodeWrapperList) { + boolean alive = nodeWrapper.getInstance().isAlive(); + if (!alive) { + processStatusMap.put(nodeWrapper, nodeWrapper.getInstance().waitFor()); + } else { + processStatusMap.put(nodeWrapper, 0); + } + } processStatusPassed = processStatusCheck.test(processStatusMap); if (!processStatusPassed) { @@ -578,6 +642,16 @@ private void handleProcessStatus(Map processStatus aiNodeWrapper.start(); } } + for (StreamNodeWrapper streamNodeWrapper : streamNodeWrapperList) { + if (portOccupationMap.containsValue(streamNodeWrapper.getPid())) { + logger.info( + "A port is occupied by another StreamNode {}-{}, restart it", + streamNodeWrapper.getIpAndPortString(), + streamNodeWrapper.getPid()); + streamNodeWrapper.stop(); + streamNodeWrapper.start(); + } + } } catch (IOException e) { logger.error("Cannot check port occupation", e); } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/StreamEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/StreamEnv.java new file mode 100644 index 0000000000000..f290b9aa7623f --- /dev/null +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/StreamEnv.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.it.env.cluster.env; + +public class StreamEnv extends AbstractEnv { + @Override + public void initClusterEnvironment() { + initClusterEnvironment(1, 1); + } + + @Override + public void initClusterEnvironment(int configNodesNum, int dataNodesNum) { + super.initEnvironment(configNodesNum, dataNodesNum, 600, false, 1); + } + + @Override + public void initClusterEnvironment( + int configNodesNum, int dataNodesNum, int testWorkingRetryCount) { + super.initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount, false, 1); + } + + @Override + public void initClusterEnvironment( + int configNodesNum, int dataNodesNum, int streamNodesNum, int testWorkingRetryCount) { + super.initEnvironment( + configNodesNum, dataNodesNum, testWorkingRetryCount, false, streamNodesNum); + } +} diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/StreamNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/StreamNodeWrapper.java new file mode 100644 index 0000000000000..c4523084a7d6b --- /dev/null +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/StreamNodeWrapper.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.it.env.cluster.node; + +import org.apache.iotdb.it.env.cluster.EnvUtils; +import org.apache.iotdb.it.env.cluster.config.MppBaseConfig; +import org.apache.iotdb.it.env.cluster.config.MppDataNodeConfig; +import org.apache.iotdb.it.env.cluster.config.MppJVMConfig; + +import org.apache.tsfile.external.commons.io.file.PathUtils; + +import java.io.File; +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.List; + +import static org.apache.iotdb.it.env.cluster.ClusterConstant.STREAMNODE_INIT_HEAP_SIZE; +import static org.apache.iotdb.it.env.cluster.ClusterConstant.STREAMNODE_MAX_DIRECT_MEMORY_SIZE; +import static org.apache.iotdb.it.env.cluster.ClusterConstant.STREAMNODE_MAX_HEAP_SIZE; +import static org.apache.iotdb.it.env.cluster.ClusterConstant.STREAM_NODE_NAME; + +public class StreamNodeWrapper extends AbstractNodeWrapper { + + public StreamNodeWrapper( + final String seedConfigNode, + final String testClassName, + final String testMethodName, + final int[] portList, + final int clusterIndex, + final boolean isMultiCluster, + final long startTime) { + super(testClassName, testMethodName, portList, clusterIndex, isMultiCluster, startTime); + + // Initialize mutable properties + reloadMutableFields(); + + // Initialize immutable properties + immutableNodeProperties.setProperty("sn_seed_config_node", seedConfigNode); + // Set system directory to be under the node working directory, not relative to project root + immutableNodeProperties.setProperty( + "dn_system_dir", getNodePath() + File.separator + "data" + File.separator + "streamnode"); + } + + @Override + protected String getSystemConfigPath() { + return workDirFilePath("conf", "iotdb-streamnode.properties"); + } + + @Override + protected String getDefaultNodeConfigPath() { + return ""; + } + + @Override + protected String getDefaultCommonConfigPath() { + return ""; + } + + @Override + public String getSystemPropertiesPath() { + return workDirFilePath("conf", "iotdb-streamnode.properties"); + } + + @Override + protected MppJVMConfig initVMConfig() { + return MppJVMConfig.builder() + .setInitHeapSize(EnvUtils.getIntFromSysVar(STREAMNODE_INIT_HEAP_SIZE, 256, clusterIndex)) + .setMaxHeapSize(EnvUtils.getIntFromSysVar(STREAMNODE_MAX_HEAP_SIZE, 256, clusterIndex)) + .setMaxDirectMemorySize( + EnvUtils.getIntFromSysVar(STREAMNODE_MAX_DIRECT_MEMORY_SIZE, 256, clusterIndex)) + .setTimezone("Asia/Shanghai") + .build(); + } + + @Override + public final String getId() { + return STREAM_NODE_NAME + getPort(); + } + + @Override + protected void addStartCmdParams(final List params) { + final String workDir = getNodePath(); + final String confDir = workDir + File.separator + "conf"; + params.addAll( + Arrays.asList( + "-Dlogback.configurationFile=" + confDir + File.separator + "logback-streamnode.xml", + "-DIOTDB_HOME=" + workDir, + "-DIOTDB_CONF=" + confDir, + "-DTSFILE_CONF=" + confDir, + "org.apache.iotdb.streamnode.StreamNode", + "-s")); + } + + @Override + String getNodeType() { + return "streamnode"; + } + + @Override + protected void reloadMutableFields() { + // Set basic properties + mutableNodeProperties.setProperty("rpc_address", super.getIp()); + mutableNodeProperties.setProperty("rpc_port", String.valueOf(super.getPort())); + } + + @Override + public void createNodeDir() { + // First, delete any existing data directory that may contain old system.properties + // The system.properties file contains config_node_list which may have old ConfigNode port + // from previous test runs, causing connection failures + final String dataDir = getNodePath() + File.separator + "data"; + try { + PathUtils.deleteDirectory(Paths.get(dataDir)); + } catch (NoSuchFileException e) { + // ignored - data directory may not exist + } catch (IOException e) { + // ignored - data directory may not exist or be deletable + } + // Then call parent method to copy template + super.createNodeDir(); + } + + public void changeConfig() { + try { + reloadMutableFields(); + + MppBaseConfig streamNodeConfig = new MppDataNodeConfig(); + streamNodeConfig.updateProperties(mutableNodeProperties); + streamNodeConfig.updateProperties(immutableNodeProperties); + + streamNodeConfig.persistent(getSystemConfigPath()); + } catch (IOException ex) { + throw new AssertionError("Change the config of StreamNode failed. " + ex); + } + this.jvmConfig.override(jvmConfig); + } + + @Override + public int getMetricPort() { + // no metric currently + return -1; + } + + @Override + public void stopForcibly() { + this.stop(); + } + + /* Abstract methods, which must be implemented in ConfigNode and DataNode. */ + public void renameFile() {} +} diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/category/StreamClusterIT.java b/integration-test/src/main/java/org/apache/iotdb/itbase/category/StreamClusterIT.java new file mode 100644 index 0000000000000..5814533568b16 --- /dev/null +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/category/StreamClusterIT.java @@ -0,0 +1,3 @@ +package org.apache.iotdb.itbase.category; + +public interface StreamClusterIT {} diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java index 0aed2cfdf1912..43fab88cf28bf 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java @@ -75,6 +75,18 @@ public interface BaseEnv { */ void initClusterEnvironment(int configNodesNum, int dataNodesNum, int testWorkingRetryCount); + /** + * Init a cluster with the specified number of ConfigNodes and DataNodes. + * + * @param configNodesNum the number of ConfigNodes. + * @param dataNodesNum the number of DataNodes. + * @param testWorkingRetryCount the retry count when testing the availability of cluster + */ + default void initClusterEnvironment( + int configNodesNum, int dataNodesNum, int streamNodesNum, int testWorkingRetryCount) { + this.initClusterEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount); + } + /** Destroy the cluster and all the configurations. */ void cleanClusterEnvironment(); From 05830822adc2a4713aea69d1f35185034a0f993c Mon Sep 17 00:00:00 2001 From: clsu <404083629@qq.com> Date: Tue, 16 Jun 2026 17:27:47 +0800 Subject: [PATCH 2/5] Remove unnecessary implementations --- integration-test/pom.xml | 14 -- .../org/apache/iotdb/it/env/EnvFactory.java | 4 - .../java/org/apache/iotdb/it/env/EnvType.java | 1 - .../iotdb/it/env/cluster/ClusterConstant.java | 7 - .../iotdb/it/env/cluster/env/AIEnv.java | 57 +++++- .../iotdb/it/env/cluster/env/AbstractEnv.java | 162 ++++------------- .../iotdb/it/env/cluster/env/StreamEnv.java | 45 ----- .../env/cluster/node/StreamNodeWrapper.java | 170 ------------------ .../it/env/remote/env/RemoteServerEnv.java | 5 + .../itbase/category/StreamClusterIT.java | 3 - .../org/apache/iotdb/itbase/env/BaseEnv.java | 14 +- 11 files changed, 94 insertions(+), 388 deletions(-) delete mode 100644 integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/StreamEnv.java delete mode 100644 integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/StreamNodeWrapper.java delete mode 100644 integration-test/src/main/java/org/apache/iotdb/itbase/category/StreamClusterIT.java diff --git a/integration-test/pom.xml b/integration-test/pom.xml index 5a290c3f77cbe..c61509ed2c84c 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -693,20 +693,6 @@ AI - - StreamClusterIT - - false - - - org.apache.iotdb.itbase.category.ManualIT - org.apache.iotdb.itbase.category.StreamClusterIT - false - true - true - Stream - - DailyIT diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/EnvFactory.java b/integration-test/src/main/java/org/apache/iotdb/it/env/EnvFactory.java index 54e2f48dabd09..2fbc9672837c8 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/EnvFactory.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/EnvFactory.java @@ -23,7 +23,6 @@ import org.apache.iotdb.it.env.cluster.env.AbstractEnv; import org.apache.iotdb.it.env.cluster.env.Cluster1Env; import org.apache.iotdb.it.env.cluster.env.SimpleEnv; -import org.apache.iotdb.it.env.cluster.env.StreamEnv; import org.apache.iotdb.it.env.remote.env.RemoteServerEnv; import org.apache.iotdb.it.framework.IoTDBTestLogger; import org.apache.iotdb.itbase.env.BaseEnv; @@ -60,9 +59,6 @@ public static BaseEnv getEnv() { case AI: env = new AIEnv(); break; - case STREAM: - env = new StreamEnv(); - break; case MultiCluster: logger.warn( "EnvFactory only supports EnvType Simple, Cluster1 and Remote, please use MultiEnvFactory instead."); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/EnvType.java b/integration-test/src/main/java/org/apache/iotdb/it/env/EnvType.java index 38ff8f2160bb5..7c2ee415cf1b4 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/EnvType.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/EnvType.java @@ -25,7 +25,6 @@ public enum EnvType { Cluster1, MultiCluster, AI, - STREAM, TABLE_SIMPLE, TABLE_CLUSTER1; diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java index 145911e19d24e..691a13509d9ee 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java @@ -45,11 +45,6 @@ public class ClusterConstant { "DefaultDataNodeCommonProperties"; public static final String DATA_REGION_PER_DATANODE = "integrationTest.dataRegionPerDataNode"; - // StreamNode constants - public static final String STREAMNODE_INIT_HEAP_SIZE = "StreamNodeInitHeapSize"; - public static final String STREAMNODE_MAX_HEAP_SIZE = "StreamNodeMaxHeapSize"; - public static final String STREAMNODE_MAX_DIRECT_MEMORY_SIZE = "StreamNodeMaxDirectMemorySize"; - // Cluster Configurations public static final String CLUSTER_CONFIGURATIONS = "ClusterConfigurations"; @@ -219,8 +214,6 @@ public class ClusterConstant { public static final String AI_NODE_NAME = "AINode"; - public static final String STREAM_NODE_NAME = "StreamNode"; - public static final String LOCK_FILE_PATH = System.getProperty(USER_DIR) + File.separator + TARGET + File.separator + "lock-"; public static final String TEMPLATE_NODE_PATH = diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AIEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AIEnv.java index f812e9db3d160..551ee035b822a 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AIEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AIEnv.java @@ -19,7 +19,22 @@ package org.apache.iotdb.it.env.cluster.env; +import org.apache.iotdb.it.env.cluster.EnvUtils; +import org.apache.iotdb.it.env.cluster.node.AINodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestLogger; +import org.apache.iotdb.itbase.runtime.ParallelRequestDelegate; +import org.apache.iotdb.itbase.runtime.RequestDelegate; + +import org.slf4j.Logger; + +import java.sql.SQLException; +import java.util.Collections; + +import static org.apache.iotdb.it.env.cluster.ClusterConstant.NODE_START_TIMEOUT; + public class AIEnv extends AbstractEnv { + private static final Logger logger = IoTDBTestLogger.logger; + @Override public void initClusterEnvironment() { initClusterEnvironment(1, 1); @@ -27,12 +42,50 @@ public void initClusterEnvironment() { @Override public void initClusterEnvironment(int configNodesNum, int dataNodesNum) { - super.initEnvironment(configNodesNum, dataNodesNum, 600, true); + super.initEnvironment(configNodesNum, dataNodesNum, 600); } @Override public void initClusterEnvironment( int configNodesNum, int dataNodesNum, int testWorkingRetryCount) { - super.initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount, true); + super.initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount); + } + + @Override + protected void initExtraNodes(String seedConfigNode, int dataNodePort) { + startAINode(seedConfigNode, dataNodePort, getTestClassName()); + } + + private void startAINode( + final String seedConfigNode, final int clusterIngressPort, final String testClassName) { + final AINodeWrapper aiNodeWrapper = + new AINodeWrapper( + seedConfigNode, + clusterIngressPort, + testClassName, + testMethodName, + index, + EnvUtils.searchAvailablePorts(), + startTime); + extraNodeWrappers.add(aiNodeWrapper); + aiNodeWrapper.setKillPoints(extraNodeKillPoints); + final String aiNodeEndPoint = aiNodeWrapper.getIpAndPortString(); + aiNodeWrapper.createNodeDir(); + aiNodeWrapper.createLogDir(); + final RequestDelegate aiNodesDelegate = + new ParallelRequestDelegate<>( + Collections.singletonList(aiNodeEndPoint), NODE_START_TIMEOUT, this); + + aiNodesDelegate.addRequest( + () -> { + aiNodeWrapper.start(); + return null; + }); + + try { + aiNodesDelegate.requestAll(); + } catch (final SQLException e) { + logger.error("Start aiNodes failed", e); + } } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 1aff69b7775c8..87bf89de373a6 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -47,11 +47,9 @@ import org.apache.iotdb.it.env.cluster.config.MppConfigNodeConfig; import org.apache.iotdb.it.env.cluster.config.MppDataNodeConfig; import org.apache.iotdb.it.env.cluster.config.MppJVMConfig; -import org.apache.iotdb.it.env.cluster.node.AINodeWrapper; import org.apache.iotdb.it.env.cluster.node.AbstractNodeWrapper; import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; -import org.apache.iotdb.it.env.cluster.node.StreamNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestLogger; import org.apache.iotdb.itbase.env.BaseEnv; import org.apache.iotdb.itbase.env.BaseNodeWrapper; @@ -102,8 +100,7 @@ public abstract class AbstractEnv implements BaseEnv { private final Random rand = new Random(); protected List configNodeWrapperList = Collections.emptyList(); protected List dataNodeWrapperList = Collections.emptyList(); - protected List aiNodeWrapperList = Collections.emptyList(); - protected List streamNodeWrapperList = Collections.emptyList(); + protected List extraNodeWrappers = Collections.emptyList(); protected String testMethodName = null; protected int index = 0; protected long startTime; @@ -111,7 +108,7 @@ public abstract class AbstractEnv implements BaseEnv { private IClientManager clientManager; private List configNodeKillPoints = new ArrayList<>(); private List dataNodeKillPoints = new ArrayList<>(); - private List streamNodeKillPoints = new ArrayList<>(); + protected List extraNodeKillPoints = new ArrayList<>(); /** * This config object stores the properties set by developers during the test. It will be cleared @@ -172,26 +169,10 @@ protected void initEnvironment(final int configNodesNum, final int dataNodesNum) protected void initEnvironment( final int configNodesNum, final int dataNodesNum, final int testWorkingRetryCount) { - initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount, false); - } - - protected void initEnvironment( - final int configNodesNum, - final int dataNodesNum, - final int retryCount, - final boolean addAINode) { - initEnvironment(configNodesNum, dataNodesNum, retryCount, addAINode, 0); - } - - protected void initEnvironment( - final int configNodesNum, - final int dataNodesNum, - final int retryCount, - final boolean addAINode, - final int streamNodesNum) { this.retryCount = retryCount; this.configNodeWrapperList = new ArrayList<>(); this.dataNodeWrapperList = new ArrayList<>(); + this.extraNodeWrappers = new ArrayList<>(); clientManager = new IClientManager.Factory() @@ -270,55 +251,22 @@ protected void initEnvironment( throw new AssertionError(); } - if (addAINode) { - this.aiNodeWrapperList = new ArrayList<>(); - startAINode(seedConfigNode, this.dataNodeWrapperList.get(0).getPort(), testClassName); - } - - if (streamNodesNum > 0) { - this.streamNodeWrapperList = new ArrayList<>(); - final List streamNodeEndpoints = new ArrayList<>(); - final RequestDelegate streamNodesDelegate = - new ParallelRequestDelegate<>(streamNodeEndpoints, NODE_START_TIMEOUT, this); - - for (int i = 0; i < streamNodesNum; i++) { - StreamNodeWrapper streamNodeWrapper = newStreamNode(); - streamNodeEndpoints.add(streamNodeWrapper.getIpAndPortString()); - this.streamNodeWrapperList.add(streamNodeWrapper); - streamNodesDelegate.addRequest( - () -> { - streamNodeWrapper.start(); - return null; - }); - } - - try { - streamNodesDelegate.requestAll(); - } catch (final SQLException e) { - logger.error("Start streamNodes failed", e); - throw new AssertionError(); - } - } + initExtraNodes(seedConfigNode, this.dataNodeWrapperList.get(0).getPort()); checkClusterStatusWithoutUnknown(); } - private StreamNodeWrapper newStreamNode() { - final StreamNodeWrapper streamNodeWrapper = - new StreamNodeWrapper( - configNodeWrapperList.get(0).getIpAndPortString(), - getTestClassName(), - testMethodName, - EnvUtils.searchAvailablePorts(), - index, - this instanceof MultiClusterEnv, - startTime); - - streamNodeWrapper.createNodeDir(); - streamNodeWrapper.changeConfig(); - streamNodeWrapper.createLogDir(); - streamNodeWrapper.setKillPoints(streamNodeKillPoints); - return streamNodeWrapper; + /** + * Hook for subclasses to create and start extra node types (e.g., AINode, StreamNode) beyond the + * core ConfigNode and DataNode. Subclasses should create node wrappers, add them to {@link + * #extraNodeWrappers}, and start them. + * + * @param seedConfigNode the ip:port of the seed ConfigNode + * @param dataNodePort the port of the first DataNode (useful for nodes that need it, e.g., + * AINode) + */ + protected void initExtraNodes(final String seedConfigNode, final int dataNodePort) { + // Default: no extra nodes. Subclasses override to add nodes. } private ConfigNodeWrapper newConfigNode() { @@ -364,39 +312,6 @@ private DataNodeWrapper newDataNode() { return dataNodeWrapper; } - private void startAINode( - final String seedConfigNode, final int clusterIngressPort, final String testClassName) { - final String aiNodeEndPoint; - final AINodeWrapper aiNodeWrapper = - new AINodeWrapper( - seedConfigNode, - clusterIngressPort, - testClassName, - testMethodName, - index, - EnvUtils.searchAvailablePorts(), - startTime); - aiNodeWrapperList.add(aiNodeWrapper); - aiNodeEndPoint = aiNodeWrapper.getIpAndPortString(); - aiNodeWrapper.createNodeDir(); - aiNodeWrapper.createLogDir(); - final RequestDelegate aiNodesDelegate = - new ParallelRequestDelegate<>( - Collections.singletonList(aiNodeEndPoint), NODE_START_TIMEOUT, this); - - aiNodesDelegate.addRequest( - () -> { - aiNodeWrapper.start(); - return null; - }); - - try { - aiNodesDelegate.requestAll(); - } catch (final SQLException e) { - logger.error("Start aiNodes failed", e); - } - } - public String getTestClassName() { final StackTraceElement[] stack = Thread.currentThread().getStackTrace(); for (final StackTraceElement stackTraceElement : stack) { @@ -488,8 +403,7 @@ public void checkClusterStatus( if (showClusterResp.getNodeStatus().size() != configNodeWrapperList.size() + dataNodeWrapperList.size() - + aiNodeWrapperList.size() - + streamNodeWrapperList.size()) { + + extraNodeWrappers.size()) { passed = false; nodeSizePassed = false; actualNodeSize = showClusterResp.getNodeStatusSize(); @@ -521,15 +435,7 @@ public void checkClusterStatus( processStatusMap.put(nodeWrapper, 0); } } - for (AINodeWrapper nodeWrapper : aiNodeWrapperList) { - boolean alive = nodeWrapper.getInstance().isAlive(); - if (!alive) { - processStatusMap.put(nodeWrapper, nodeWrapper.getInstance().waitFor()); - } else { - processStatusMap.put(nodeWrapper, 0); - } - } - for (StreamNodeWrapper nodeWrapper : streamNodeWrapperList) { + for (AbstractNodeWrapper nodeWrapper : extraNodeWrappers) { boolean alive = nodeWrapper.getInstance().isAlive(); if (!alive) { processStatusMap.put(nodeWrapper, nodeWrapper.getInstance().waitFor()); @@ -632,24 +538,14 @@ private void handleProcessStatus(Map processStatus configNodeWrapper.start(); } } - for (AINodeWrapper aiNodeWrapper : aiNodeWrapperList) { - if (portOccupationMap.containsValue(aiNodeWrapper.getPid())) { + for (AbstractNodeWrapper extraNodeWrapper : extraNodeWrappers) { + if (portOccupationMap.containsValue(extraNodeWrapper.getPid())) { logger.info( - "A port is occupied by another AINode {}-{}, restart it", - aiNodeWrapper.getIpAndPortString(), - aiNodeWrapper.getPid()); - aiNodeWrapper.stop(); - aiNodeWrapper.start(); - } - } - for (StreamNodeWrapper streamNodeWrapper : streamNodeWrapperList) { - if (portOccupationMap.containsValue(streamNodeWrapper.getPid())) { - logger.info( - "A port is occupied by another StreamNode {}-{}, restart it", - streamNodeWrapper.getIpAndPortString(), - streamNodeWrapper.getPid()); - streamNodeWrapper.stop(); - streamNodeWrapper.start(); + "A port is occupied by another node {}-{}, restart it", + extraNodeWrapper.getIpAndPortString(), + extraNodeWrapper.getPid()); + extraNodeWrapper.stop(); + extraNodeWrapper.start(); } } } catch (IOException e) { @@ -666,8 +562,8 @@ private void handleProcessStatus(Map processStatus public void cleanClusterEnvironment() { final List allNodeWrappers = Stream.concat( - dataNodeWrapperList.stream(), - Stream.concat(configNodeWrapperList.stream(), aiNodeWrapperList.stream())) + Stream.concat(configNodeWrapperList.stream(), dataNodeWrapperList.stream()), + extraNodeWrappers.stream()) .collect(Collectors.toList()); allNodeWrappers.stream() .findAny() @@ -1119,6 +1015,7 @@ public void dumpTestJVMSnapshot() { public List getNodeWrapperList() { final List result = new ArrayList<>(configNodeWrapperList); result.addAll(dataNodeWrapperList); + result.addAll(extraNodeWrappers); return result; } @@ -1616,6 +1513,11 @@ public void registerDataNodeKillPoints(final List killPoints) { this.dataNodeKillPoints = killPoints; } + @Override + public void registerExtraNodeKillPoints(final List killPoints) { + this.extraNodeKillPoints = killPoints; + } + public void clearClientManager() { clientManager.clearAll(); } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/StreamEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/StreamEnv.java deleted file mode 100644 index f290b9aa7623f..0000000000000 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/StreamEnv.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.it.env.cluster.env; - -public class StreamEnv extends AbstractEnv { - @Override - public void initClusterEnvironment() { - initClusterEnvironment(1, 1); - } - - @Override - public void initClusterEnvironment(int configNodesNum, int dataNodesNum) { - super.initEnvironment(configNodesNum, dataNodesNum, 600, false, 1); - } - - @Override - public void initClusterEnvironment( - int configNodesNum, int dataNodesNum, int testWorkingRetryCount) { - super.initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount, false, 1); - } - - @Override - public void initClusterEnvironment( - int configNodesNum, int dataNodesNum, int streamNodesNum, int testWorkingRetryCount) { - super.initEnvironment( - configNodesNum, dataNodesNum, testWorkingRetryCount, false, streamNodesNum); - } -} diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/StreamNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/StreamNodeWrapper.java deleted file mode 100644 index c4523084a7d6b..0000000000000 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/StreamNodeWrapper.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.it.env.cluster.node; - -import org.apache.iotdb.it.env.cluster.EnvUtils; -import org.apache.iotdb.it.env.cluster.config.MppBaseConfig; -import org.apache.iotdb.it.env.cluster.config.MppDataNodeConfig; -import org.apache.iotdb.it.env.cluster.config.MppJVMConfig; - -import org.apache.tsfile.external.commons.io.file.PathUtils; - -import java.io.File; -import java.io.IOException; -import java.nio.file.NoSuchFileException; -import java.nio.file.Paths; -import java.util.Arrays; -import java.util.List; - -import static org.apache.iotdb.it.env.cluster.ClusterConstant.STREAMNODE_INIT_HEAP_SIZE; -import static org.apache.iotdb.it.env.cluster.ClusterConstant.STREAMNODE_MAX_DIRECT_MEMORY_SIZE; -import static org.apache.iotdb.it.env.cluster.ClusterConstant.STREAMNODE_MAX_HEAP_SIZE; -import static org.apache.iotdb.it.env.cluster.ClusterConstant.STREAM_NODE_NAME; - -public class StreamNodeWrapper extends AbstractNodeWrapper { - - public StreamNodeWrapper( - final String seedConfigNode, - final String testClassName, - final String testMethodName, - final int[] portList, - final int clusterIndex, - final boolean isMultiCluster, - final long startTime) { - super(testClassName, testMethodName, portList, clusterIndex, isMultiCluster, startTime); - - // Initialize mutable properties - reloadMutableFields(); - - // Initialize immutable properties - immutableNodeProperties.setProperty("sn_seed_config_node", seedConfigNode); - // Set system directory to be under the node working directory, not relative to project root - immutableNodeProperties.setProperty( - "dn_system_dir", getNodePath() + File.separator + "data" + File.separator + "streamnode"); - } - - @Override - protected String getSystemConfigPath() { - return workDirFilePath("conf", "iotdb-streamnode.properties"); - } - - @Override - protected String getDefaultNodeConfigPath() { - return ""; - } - - @Override - protected String getDefaultCommonConfigPath() { - return ""; - } - - @Override - public String getSystemPropertiesPath() { - return workDirFilePath("conf", "iotdb-streamnode.properties"); - } - - @Override - protected MppJVMConfig initVMConfig() { - return MppJVMConfig.builder() - .setInitHeapSize(EnvUtils.getIntFromSysVar(STREAMNODE_INIT_HEAP_SIZE, 256, clusterIndex)) - .setMaxHeapSize(EnvUtils.getIntFromSysVar(STREAMNODE_MAX_HEAP_SIZE, 256, clusterIndex)) - .setMaxDirectMemorySize( - EnvUtils.getIntFromSysVar(STREAMNODE_MAX_DIRECT_MEMORY_SIZE, 256, clusterIndex)) - .setTimezone("Asia/Shanghai") - .build(); - } - - @Override - public final String getId() { - return STREAM_NODE_NAME + getPort(); - } - - @Override - protected void addStartCmdParams(final List params) { - final String workDir = getNodePath(); - final String confDir = workDir + File.separator + "conf"; - params.addAll( - Arrays.asList( - "-Dlogback.configurationFile=" + confDir + File.separator + "logback-streamnode.xml", - "-DIOTDB_HOME=" + workDir, - "-DIOTDB_CONF=" + confDir, - "-DTSFILE_CONF=" + confDir, - "org.apache.iotdb.streamnode.StreamNode", - "-s")); - } - - @Override - String getNodeType() { - return "streamnode"; - } - - @Override - protected void reloadMutableFields() { - // Set basic properties - mutableNodeProperties.setProperty("rpc_address", super.getIp()); - mutableNodeProperties.setProperty("rpc_port", String.valueOf(super.getPort())); - } - - @Override - public void createNodeDir() { - // First, delete any existing data directory that may contain old system.properties - // The system.properties file contains config_node_list which may have old ConfigNode port - // from previous test runs, causing connection failures - final String dataDir = getNodePath() + File.separator + "data"; - try { - PathUtils.deleteDirectory(Paths.get(dataDir)); - } catch (NoSuchFileException e) { - // ignored - data directory may not exist - } catch (IOException e) { - // ignored - data directory may not exist or be deletable - } - // Then call parent method to copy template - super.createNodeDir(); - } - - public void changeConfig() { - try { - reloadMutableFields(); - - MppBaseConfig streamNodeConfig = new MppDataNodeConfig(); - streamNodeConfig.updateProperties(mutableNodeProperties); - streamNodeConfig.updateProperties(immutableNodeProperties); - - streamNodeConfig.persistent(getSystemConfigPath()); - } catch (IOException ex) { - throw new AssertionError("Change the config of StreamNode failed. " + ex); - } - this.jvmConfig.override(jvmConfig); - } - - @Override - public int getMetricPort() { - // no metric currently - return -1; - } - - @Override - public void stopForcibly() { - this.stop(); - } - - /* Abstract methods, which must be implemented in ConfigNode and DataNode. */ - public void renameFile() {} -} diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java index 586eff60494dc..ca9c7c3e8b174 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java @@ -519,4 +519,9 @@ public void registerConfigNodeKillPoints(List killPoints) { public void registerDataNodeKillPoints(List killPoints) { throw new UnsupportedOperationException(); } + + @Override + public void registerExtraNodeKillPoints(List killPoints) { + throw new UnsupportedOperationException(); + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/category/StreamClusterIT.java b/integration-test/src/main/java/org/apache/iotdb/itbase/category/StreamClusterIT.java deleted file mode 100644 index 5814533568b16..0000000000000 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/category/StreamClusterIT.java +++ /dev/null @@ -1,3 +0,0 @@ -package org.apache.iotdb.itbase.category; - -public interface StreamClusterIT {} diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java index 43fab88cf28bf..9d9786ebc634c 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java @@ -75,18 +75,6 @@ public interface BaseEnv { */ void initClusterEnvironment(int configNodesNum, int dataNodesNum, int testWorkingRetryCount); - /** - * Init a cluster with the specified number of ConfigNodes and DataNodes. - * - * @param configNodesNum the number of ConfigNodes. - * @param dataNodesNum the number of DataNodes. - * @param testWorkingRetryCount the retry count when testing the availability of cluster - */ - default void initClusterEnvironment( - int configNodesNum, int dataNodesNum, int streamNodesNum, int testWorkingRetryCount) { - this.initClusterEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount); - } - /** Destroy the cluster and all the configurations. */ void cleanClusterEnvironment(); @@ -370,6 +358,8 @@ void ensureNodeStatus(List nodes, List targetStatus void registerDataNodeKillPoints(List killPoints); + void registerExtraNodeKillPoints(List killPoints); + static Properties constructProperties(String username, String password, String sqlDialect) { Properties info = new Properties(); From bf7f57a90842360fd3822939ef71b6c9fea2ceb4 Mon Sep 17 00:00:00 2001 From: clsu <404083629@qq.com> Date: Tue, 16 Jun 2026 17:42:56 +0800 Subject: [PATCH 3/5] Remove unnecessary implementations --- .../org/apache/iotdb/it/env/cluster/env/AbstractEnv.java | 5 ----- .../org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java | 5 ----- .../src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java | 2 -- 3 files changed, 12 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 87bf89de373a6..15f6a67760616 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -1513,11 +1513,6 @@ public void registerDataNodeKillPoints(final List killPoints) { this.dataNodeKillPoints = killPoints; } - @Override - public void registerExtraNodeKillPoints(final List killPoints) { - this.extraNodeKillPoints = killPoints; - } - public void clearClientManager() { clientManager.clearAll(); } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java index ca9c7c3e8b174..586eff60494dc 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java @@ -519,9 +519,4 @@ public void registerConfigNodeKillPoints(List killPoints) { public void registerDataNodeKillPoints(List killPoints) { throw new UnsupportedOperationException(); } - - @Override - public void registerExtraNodeKillPoints(List killPoints) { - throw new UnsupportedOperationException(); - } } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java index 9d9786ebc634c..0aed2cfdf1912 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java @@ -358,8 +358,6 @@ void ensureNodeStatus(List nodes, List targetStatus void registerDataNodeKillPoints(List killPoints); - void registerExtraNodeKillPoints(List killPoints); - static Properties constructProperties(String username, String password, String sqlDialect) { Properties info = new Properties(); From 7f6c732f7ae4c0e05843b3eb8121de7f7800dea0 Mon Sep 17 00:00:00 2001 From: clsu <404083629@qq.com> Date: Tue, 16 Jun 2026 18:19:45 +0800 Subject: [PATCH 4/5] fix bug --- .../java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 15f6a67760616..97a32e4653374 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -169,7 +169,7 @@ protected void initEnvironment(final int configNodesNum, final int dataNodesNum) protected void initEnvironment( final int configNodesNum, final int dataNodesNum, final int testWorkingRetryCount) { - this.retryCount = retryCount; + this.retryCount = testWorkingRetryCount; this.configNodeWrapperList = new ArrayList<>(); this.dataNodeWrapperList = new ArrayList<>(); this.extraNodeWrappers = new ArrayList<>(); From c8952824a3755b22f5678ad427375568c670c418 Mon Sep 17 00:00:00 2001 From: clsu <404083629@qq.com> Date: Wed, 17 Jun 2026 09:11:22 +0800 Subject: [PATCH 5/5] Optimize the interface --- .../iotdb/it/env/cluster/env/AIEnv.java | 12 ++++++++-- .../iotdb/it/env/cluster/env/AbstractEnv.java | 22 ++++++++++++------- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AIEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AIEnv.java index 551ee035b822a..cf9520ed6a258 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AIEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AIEnv.java @@ -21,6 +21,8 @@ import org.apache.iotdb.it.env.cluster.EnvUtils; import org.apache.iotdb.it.env.cluster.node.AINodeWrapper; +import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestLogger; import org.apache.iotdb.itbase.runtime.ParallelRequestDelegate; import org.apache.iotdb.itbase.runtime.RequestDelegate; @@ -29,6 +31,7 @@ import java.sql.SQLException; import java.util.Collections; +import java.util.List; import static org.apache.iotdb.it.env.cluster.ClusterConstant.NODE_START_TIMEOUT; @@ -52,8 +55,13 @@ public void initClusterEnvironment( } @Override - protected void initExtraNodes(String seedConfigNode, int dataNodePort) { - startAINode(seedConfigNode, dataNodePort, getTestClassName()); + protected void initExtraNodes( + final List configNodeWrappers, + final List dataNodeWrappers, + final String testClassName) { + String seedConfigNode = configNodeWrappers.get(0).getIpAndPortString(); + int dataNodePort = dataNodeWrappers.get(0).getPort(); + startAINode(seedConfigNode, dataNodePort, testClassName); } private void startAINode( diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 97a32e4653374..74c34392fb75f 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -251,21 +251,27 @@ protected void initEnvironment( throw new AssertionError(); } - initExtraNodes(seedConfigNode, this.dataNodeWrapperList.get(0).getPort()); + initExtraNodes(configNodeWrapperList, dataNodeWrapperList, testClassName); checkClusterStatusWithoutUnknown(); } /** - * Hook for subclasses to create and start extra node types (e.g., AINode, StreamNode) beyond the - * core ConfigNode and DataNode. Subclasses should create node wrappers, add them to {@link - * #extraNodeWrappers}, and start them. + * Hook method for subclasses to initialize and start extra node types beyond the core ConfigNode + * and DataNode (e.g., AINode, StreamNode, ProxyNode). * - * @param seedConfigNode the ip:port of the seed ConfigNode - * @param dataNodePort the port of the first DataNode (useful for nodes that need it, e.g., - * AINode) + *

Subclasses should create node wrappers, add them to {@link #extraNodeWrappers}, configure + * kill points via {@link #extraNodeKillPoints}, and start the nodes. Subclasses have direct + * access to protected fields: {@code testMethodName}, {@code index}, {@code startTime}. + * + * @param configNodeWrappers list of all ConfigNode wrappers in the cluster (unmodifiable) + * @param dataNodeWrappers list of all DataNode wrappers in the cluster (unmodifiable) + * @param testClassName the test class name for logging and identification purposes */ - protected void initExtraNodes(final String seedConfigNode, final int dataNodePort) { + protected void initExtraNodes( + final List configNodeWrappers, + final List dataNodeWrappers, + final String testClassName) { // Default: no extra nodes. Subclasses override to add nodes. }