Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,73 @@

package org.apache.iotdb.it.env.cluster.env;

import org.apache.iotdb.it.env.cluster.EnvUtils;
import org.apache.iotdb.it.env.cluster.node.AINodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestLogger;
import org.apache.iotdb.itbase.runtime.ParallelRequestDelegate;
import org.apache.iotdb.itbase.runtime.RequestDelegate;

import org.slf4j.Logger;

import java.sql.SQLException;
import java.util.Collections;

import static org.apache.iotdb.it.env.cluster.ClusterConstant.NODE_START_TIMEOUT;

public class AIEnv extends AbstractEnv {
private static final Logger logger = IoTDBTestLogger.logger;

@Override
public void initClusterEnvironment() {
initClusterEnvironment(1, 1);
}

@Override
public void initClusterEnvironment(int configNodesNum, int dataNodesNum) {
super.initEnvironment(configNodesNum, dataNodesNum, 600, true);
super.initEnvironment(configNodesNum, dataNodesNum, 600);
}

@Override
public void initClusterEnvironment(
int configNodesNum, int dataNodesNum, int testWorkingRetryCount) {
super.initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount, true);
super.initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount);
}

@Override
protected void initExtraNodes(String seedConfigNode, int dataNodePort) {
startAINode(seedConfigNode, dataNodePort, getTestClassName());
}

private void startAINode(
final String seedConfigNode, final int clusterIngressPort, final String testClassName) {
final AINodeWrapper aiNodeWrapper =
new AINodeWrapper(
seedConfigNode,
clusterIngressPort,
testClassName,
testMethodName,
index,
EnvUtils.searchAvailablePorts(),
startTime);
extraNodeWrappers.add(aiNodeWrapper);
aiNodeWrapper.setKillPoints(extraNodeKillPoints);
final String aiNodeEndPoint = aiNodeWrapper.getIpAndPortString();
aiNodeWrapper.createNodeDir();
aiNodeWrapper.createLogDir();
final RequestDelegate<Void> aiNodesDelegate =
new ParallelRequestDelegate<>(
Collections.singletonList(aiNodeEndPoint), NODE_START_TIMEOUT, this);

aiNodesDelegate.addRequest(
() -> {
aiNodeWrapper.start();
return null;
});

try {
aiNodesDelegate.requestAll();
} catch (final SQLException e) {
logger.error("Start aiNodes failed", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.iotdb.it.env.cluster.config.MppConfigNodeConfig;
import org.apache.iotdb.it.env.cluster.config.MppDataNodeConfig;
import org.apache.iotdb.it.env.cluster.config.MppJVMConfig;
import org.apache.iotdb.it.env.cluster.node.AINodeWrapper;
import org.apache.iotdb.it.env.cluster.node.AbstractNodeWrapper;
import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
Expand Down Expand Up @@ -101,14 +100,15 @@ public abstract class AbstractEnv implements BaseEnv {
private final Random rand = new Random();
protected List<ConfigNodeWrapper> configNodeWrapperList = Collections.emptyList();
protected List<DataNodeWrapper> dataNodeWrapperList = Collections.emptyList();
protected List<AINodeWrapper> aiNodeWrapperList = Collections.emptyList();
protected List<AbstractNodeWrapper> extraNodeWrappers = Collections.emptyList();
protected String testMethodName = null;
protected int index = 0;
protected long startTime;
protected int retryCount = 30;
private IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager;
private List<String> configNodeKillPoints = new ArrayList<>();
private List<String> dataNodeKillPoints = new ArrayList<>();
protected List<String> extraNodeKillPoints = new ArrayList<>();

/**
* This config object stores the properties set by developers during the test. It will be cleared
Expand Down Expand Up @@ -169,17 +169,10 @@ protected void initEnvironment(final int configNodesNum, final int dataNodesNum)

protected void initEnvironment(
final int configNodesNum, final int dataNodesNum, final int testWorkingRetryCount) {
initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount, false);
}

protected void initEnvironment(
final int configNodesNum,
final int dataNodesNum,
final int retryCount,
final boolean addAINode) {
this.retryCount = retryCount;
this.retryCount = testWorkingRetryCount;
this.configNodeWrapperList = new ArrayList<>();
this.dataNodeWrapperList = new ArrayList<>();
this.extraNodeWrappers = new ArrayList<>();

clientManager =
new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
Expand Down Expand Up @@ -258,14 +251,24 @@ protected void initEnvironment(
throw new AssertionError();
}

if (addAINode) {
this.aiNodeWrapperList = new ArrayList<>();
startAINode(seedConfigNode, this.dataNodeWrapperList.get(0).getPort(), testClassName);
}
initExtraNodes(seedConfigNode, this.dataNodeWrapperList.get(0).getPort());

checkClusterStatusWithoutUnknown();
}

/**
* Hook for subclasses to create and start extra node types (e.g., AINode, StreamNode) beyond the
* core ConfigNode and DataNode. Subclasses should create node wrappers, add them to {@link
* #extraNodeWrappers}, and start them.
*
* @param seedConfigNode the ip:port of the seed ConfigNode
* @param dataNodePort the port of the first DataNode (useful for nodes that need it, e.g.,
* AINode)
*/
protected void initExtraNodes(final String seedConfigNode, final int dataNodePort) {
// Default: no extra nodes. Subclasses override to add nodes.
}

private ConfigNodeWrapper newConfigNode() {
final ConfigNodeWrapper configNodeWrapper =
new ConfigNodeWrapper(
Expand Down Expand Up @@ -309,39 +312,6 @@ private DataNodeWrapper newDataNode() {
return dataNodeWrapper;
}

private void startAINode(
final String seedConfigNode, final int clusterIngressPort, final String testClassName) {
final String aiNodeEndPoint;
final AINodeWrapper aiNodeWrapper =
new AINodeWrapper(
seedConfigNode,
clusterIngressPort,
testClassName,
testMethodName,
index,
EnvUtils.searchAvailablePorts(),
startTime);
aiNodeWrapperList.add(aiNodeWrapper);
aiNodeEndPoint = aiNodeWrapper.getIpAndPortString();
aiNodeWrapper.createNodeDir();
aiNodeWrapper.createLogDir();
final RequestDelegate<Void> aiNodesDelegate =
new ParallelRequestDelegate<>(
Collections.singletonList(aiNodeEndPoint), NODE_START_TIMEOUT, this);

aiNodesDelegate.addRequest(
() -> {
aiNodeWrapper.start();
return null;
});

try {
aiNodesDelegate.requestAll();
} catch (final SQLException e) {
logger.error("Start aiNodes failed", e);
}
}

public String getTestClassName() {
final StackTraceElement[] stack = Thread.currentThread().getStackTrace();
for (final StackTraceElement stackTraceElement : stack) {
Expand Down Expand Up @@ -433,7 +403,7 @@ public void checkClusterStatus(
if (showClusterResp.getNodeStatus().size()
!= configNodeWrapperList.size()
+ dataNodeWrapperList.size()
+ aiNodeWrapperList.size()) {
+ extraNodeWrappers.size()) {
passed = false;
nodeSizePassed = false;
actualNodeSize = showClusterResp.getNodeStatusSize();
Expand Down Expand Up @@ -465,7 +435,7 @@ public void checkClusterStatus(
processStatusMap.put(nodeWrapper, 0);
}
}
for (AINodeWrapper nodeWrapper : aiNodeWrapperList) {
for (AbstractNodeWrapper nodeWrapper : extraNodeWrappers) {
boolean alive = nodeWrapper.getInstance().isAlive();
if (!alive) {
processStatusMap.put(nodeWrapper, nodeWrapper.getInstance().waitFor());
Expand Down Expand Up @@ -568,14 +538,14 @@ private void handleProcessStatus(Map<AbstractNodeWrapper, Integer> processStatus
configNodeWrapper.start();
}
}
for (AINodeWrapper aiNodeWrapper : aiNodeWrapperList) {
if (portOccupationMap.containsValue(aiNodeWrapper.getPid())) {
for (AbstractNodeWrapper extraNodeWrapper : extraNodeWrappers) {
if (portOccupationMap.containsValue(extraNodeWrapper.getPid())) {
logger.info(
"A port is occupied by another AINode {}-{}, restart it",
aiNodeWrapper.getIpAndPortString(),
aiNodeWrapper.getPid());
aiNodeWrapper.stop();
aiNodeWrapper.start();
"A port is occupied by another node {}-{}, restart it",
extraNodeWrapper.getIpAndPortString(),
extraNodeWrapper.getPid());
extraNodeWrapper.stop();
extraNodeWrapper.start();
}
}
} catch (IOException e) {
Expand All @@ -592,8 +562,8 @@ private void handleProcessStatus(Map<AbstractNodeWrapper, Integer> processStatus
public void cleanClusterEnvironment() {
final List<AbstractNodeWrapper> allNodeWrappers =
Stream.concat(
dataNodeWrapperList.stream(),
Stream.concat(configNodeWrapperList.stream(), aiNodeWrapperList.stream()))
Stream.concat(configNodeWrapperList.stream(), dataNodeWrapperList.stream()),
extraNodeWrappers.stream())
.collect(Collectors.toList());
allNodeWrappers.stream()
.findAny()
Expand Down Expand Up @@ -1045,6 +1015,7 @@ public void dumpTestJVMSnapshot() {
public List<AbstractNodeWrapper> getNodeWrapperList() {
final List<AbstractNodeWrapper> result = new ArrayList<>(configNodeWrapperList);
result.addAll(dataNodeWrapperList);
result.addAll(extraNodeWrappers);
return result;
}

Expand Down