Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -246,20 +246,22 @@ public boolean takeSnapshot(File snapshotDir) {
}

@Override
public void loadSnapshot(final File latestSnapshotRootDir) {
public boolean loadSnapshot(final File latestSnapshotRootDir) {
try {
executor.loadSnapshot(latestSnapshotRootDir);
// We recompute the snapshot for pipe listener when loading snapshot
// to recover the newest snapshot in cache
PipeConfigNodeAgent.runtime()
.listener()
.tryListenToSnapshots(ConfigNodeSnapshotParser.getSnapshots());
return true;
} catch (final IOException e) {
if (PipeConfigNodeAgent.runtime().listener().isOpened()) {
LOGGER.warn(
ConfigNodeMessages.CONFIG_REGION_LISTENING_QUEUE_LISTEN_TO_SNAPSHOT_FAILED_WHEN_STARTUP,
e);
}
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,11 @@ default boolean clearSnapshot() {
* Load the latest snapshot from given dir.
*
* @param latestSnapshotRootDir dir where the latest snapshot sits
* @return {@code true} if the snapshot was loaded successfully, {@code false} otherwise. Callers
* (e.g. the IoTConsensus AddPeer flow) rely on this to avoid activating a new peer whose
* snapshot failed to load, which would otherwise silently lose data.
*/
void loadSnapshot(File latestSnapshotRootDir);
boolean loadSnapshot(File latestSnapshotRootDir);

/**
* given a snapshot dir, ask statemachine to provide all snapshot files. By default, it will list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,14 +511,17 @@ private void clearOldSnapshot() {
}
}

public void loadSnapshot(String snapshotId) {
// TODO: (xingtanzjr) throw exception if the snapshot load failed
recvFolderManager
.getFolders()
.forEach(
dir -> {
stateMachine.loadSnapshot(getSnapshotPath(dir, snapshotId));
});
public boolean loadSnapshot(String snapshotId) {
// Load the snapshot from every receive folder. If any of them fails, report the failure so the
// AddPeer coordinator does not activate this peer with incomplete data (which would silently
// lose data on this replica).
boolean success = true;
for (String dir : recvFolderManager.getFolders()) {
if (!stateMachine.loadSnapshot(getSnapshotPath(dir, snapshotId))) {
success = false;
}
}
return success;
}

private File getSnapshotPath(String curStorageDir, String snapshotRelativePath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,15 @@ public TTriggerSnapshotLoadRes triggerSnapshotLoad(TTriggerSnapshotLoadReq req)
status.setMessage(message);
return new TTriggerSnapshotLoadRes(status);
}
impl.loadSnapshot(req.snapshotId);
if (!impl.loadSnapshot(req.snapshotId)) {
String message =
String.format(
"Failed to load snapshot %s for consensus group %s", req.snapshotId, groupId);
LOGGER.error(message);
TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
status.setMessage(message);
return new TTriggerSnapshotLoadRes(status);
}
KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_TRANSITION);
return new TTriggerSnapshotLoadRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,9 @@ private void loadSnapshot(File latestSnapshotDir) {
}

// require the application statemachine to load the latest snapshot
applicationStateMachine.loadSnapshot(latestSnapshotDir);
if (!applicationStateMachine.loadSnapshot(latestSnapshotDir)) {
logger.error("{}: failed to load snapshot from {}", this, latestSnapshotDir);
}
TermIndex snapshotTermIndex = Utils.getTermIndexFromDir(latestSnapshotDir);
updateLastAppliedTermIndex(snapshotTermIndex.getTerm(), snapshotTermIndex.getIndex());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public synchronized boolean takeSnapshot(File snapshotDir) {
}

@Override
public synchronized void loadSnapshot(File latestSnapshotRootDir) {
stateMachine.loadSnapshot(latestSnapshotRootDir);
public synchronized boolean loadSnapshot(File latestSnapshotRootDir) {
return stateMachine.loadSnapshot(latestSnapshotRootDir);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,7 @@ public boolean takeSnapshot(File snapshotDir) {
}

@Override
public void loadSnapshot(File latestSnapshotRootDir) {}
public boolean loadSnapshot(File latestSnapshotRootDir) {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.consensus.iot;

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.consensus.iot.util.TestEntry;
import org.apache.iotdb.consensus.iot.util.TestStateMachine;

import org.apache.ratis.util.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* Regression test for the snapshot-load-failure bug (a failed snapshot load on the AddPeer target
* was silently swallowed, so the new peer was activated and the migration falsely reported
* successful, losing data on the new replica).
*
* <p>It builds a real two-node IoTConsensus group, forces the target peer's {@link
* org.apache.iotdb.consensus.IStateMachine#loadSnapshot} to fail, and verifies that {@code
* addRemotePeer}:
*
* <ul>
* <li>actually reaches the snapshot-load step (so the failure is the one under test, not an
* earlier step),
* <li>fails with a {@link ConsensusException} instead of silently succeeding,
* <li>does not leave the target peer active with an incompletely-loaded snapshot.
* </ul>
*/
public class AddPeerSnapshotLoadFailureTest {

private final Logger logger = LoggerFactory.getLogger(AddPeerSnapshotLoadFailureTest.class);

private final ConsensusGroupId gid = new DataRegionId(1);

private final int basePort = 9200;

private final List<Peer> peers =
Arrays.asList(
new Peer(gid, 1, new TEndPoint("127.0.0.1", basePort - 1)),
new Peer(gid, 2, new TEndPoint("127.0.0.1", basePort)));

private final List<File> peersStorage =
Arrays.asList(
new File("target" + File.separator + "snapshot-load-fail-1"),
new File("target" + File.separator + "snapshot-load-fail-2"));

private final List<List<String>> peersRecvSnapshotDirs =
Arrays.asList(
Arrays.asList(
"target" + File.separator + "snapshot-load-fail-1-recv-1",
"target" + File.separator + "snapshot-load-fail-1-recv-2"),
Arrays.asList(
"target" + File.separator + "snapshot-load-fail-2-recv-1",
"target" + File.separator + "snapshot-load-fail-2-recv-2"));

private final List<IoTConsensus> servers = new ArrayList<>();
private final List<ControllableStateMachine> stateMachines = new ArrayList<>();

/** A {@link TestStateMachine} whose snapshot load can be made to fail on demand. */
private static class ControllableStateMachine extends TestStateMachine {
private volatile boolean failLoadSnapshot = false;
private volatile boolean loadSnapshotInvoked = false;

void setFailLoadSnapshot(boolean failLoadSnapshot) {
this.failLoadSnapshot = failLoadSnapshot;
}

boolean isLoadSnapshotInvoked() {
return loadSnapshotInvoked;
}

@Override
public boolean loadSnapshot(File latestSnapshotRootDir) {
loadSnapshotInvoked = true;
if (failLoadSnapshot) {
return false;
}
return super.loadSnapshot(latestSnapshotRootDir);
}

@Override
public boolean takeSnapshot(File snapshotDir) {
return true;
}

// TestStateMachine does not implement clearSnapshot (the IStateMachine default throws). The
// AddPeer flow calls it in a finally block to clean up the local snapshot, so we provide a
// no-op here; otherwise that cleanup would mask the ConsensusException we are asserting on.
@Override
public boolean clearSnapshot() {
return true;
}
}

@Before
public void setUp() throws Exception {
for (File file : peersStorage) {
file.mkdirs();
stateMachines.add(new ControllableStateMachine());
}
peersRecvSnapshotDirs.forEach(innerList -> innerList.forEach(dir -> new File(dir).mkdirs()));
initServer();
}

@After
public void tearDown() throws Exception {
servers.parallelStream().forEach(IoTConsensus::stop);
servers.clear();
for (File file : peersStorage) {
FileUtils.deleteFully(file);
}
peersRecvSnapshotDirs.forEach(
innerList ->
innerList.forEach(
dir -> {
try {
FileUtils.deleteFully(new File(dir));
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
}

private void initServer() throws IOException {
Assume.assumeTrue(checkPortAvailable());
try {
for (int i = 0; i < peers.size(); i++) {
int finalI = i;
servers.add(
(IoTConsensus)
ConsensusFactory.getConsensusImpl(
ConsensusFactory.IOT_CONSENSUS,
ConsensusConfig.newBuilder()
.setThisNodeId(peers.get(i).getNodeId())
.setThisNode(peers.get(i).getEndpoint())
.setStorageDir(peersStorage.get(i).getAbsolutePath())
.setRecvSnapshotDirs(peersRecvSnapshotDirs.get(i))
.setConsensusGroupType(TConsensusGroupType.DataRegion)
.build(),
groupId -> stateMachines.get(finalI))
.orElseThrow(
() ->
new IllegalArgumentException(
String.format(
ConsensusFactory.CONSTRUCT_FAILED_MSG,
ConsensusFactory.IOT_CONSENSUS))));
}
for (int i = 0; i < peers.size(); i++) {
servers.get(i).start();
}
} catch (IOException e) {
if (e.getCause() instanceof StartupException) {
// just succeed when can not bind socket
logger.info("Can not start IoTConsensus because", e);
Assume.assumeTrue(false);
} else {
logger.error("Failed because", e);
Assert.fail("Failed because " + e.getMessage());
}
}
}

@Test
public void addRemotePeerMustFailWhenTargetSnapshotLoadFails() throws Exception {
// node 0 is the sole initial member; node 1 will be added as a new peer. Mirroring the real
// region-migration flow, the destination peer (node 1) is pre-created locally with the full
// target peer list (IoTConsensus, unlike Ratis, requires a non-empty peer list here).
servers.get(0).createLocalPeer(gid, peers.subList(0, 1));
servers.get(1).createLocalPeer(gid, peers);

// Put some data into the group so the snapshot transfer is meaningful.
for (int i = 0; i < 10; i++) {
servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
}

// Force the target peer (node 1) to fail loading the transferred snapshot.
stateMachines.get(1).setFailLoadSnapshot(true);

// Before the fix, addRemotePeer swallowed the load failure and returned normally, leaving the
// target peer active with incomplete data. It must now surface the failure.
Assert.assertThrows(
ConsensusException.class, () -> servers.get(0).addRemotePeer(gid, peers.get(1)));

// The failure must be the snapshot load itself, i.e. the AddPeer flow actually reached the
// load step on the target rather than aborting earlier.
Assert.assertTrue(
"Target peer's loadSnapshot was never invoked; the failure came from an earlier step",
stateMachines.get(1).isLoadSnapshotInvoked());

// The target peer must not be left active with an incompletely-loaded snapshot.
Assert.assertFalse(
"Target peer was activated despite a failed snapshot load",
servers.get(1).getImpl(gid).isActive());
}

private boolean checkPortAvailable() {
for (Peer peer : this.peers) {
try (ServerSocket ignored = new ServerSocket(peer.getEndpoint().port)) {
logger.info("check port {} success for node {}", peer.getEndpoint().port, peer.getNodeId());
} catch (IOException e) {
logger.error("check port {} failed for node {}", peer.getEndpoint().port, peer.getNodeId());
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,5 +142,7 @@ public boolean takeSnapshot(File snapshotDir) {
}

@Override
public void loadSnapshot(File latestSnapshotRootDir) {}
public boolean loadSnapshot(File latestSnapshotRootDir) {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,15 @@ public boolean takeSnapshot(File snapshotDir) {
}

@Override
public void loadSnapshot(File latestSnapshotRootDir) {
public boolean loadSnapshot(File latestSnapshotRootDir) {
File snapshot =
new File(latestSnapshotRootDir.getAbsolutePath() + File.separator + "snapshot");
try (Scanner scanner = new Scanner(snapshot)) {
integer.set(Integer.parseInt(scanner.next()));
return true;
} catch (FileNotFoundException e) {
logger.error("cannot find snapshot file {}", snapshot);
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ public boolean takeSnapshot(File snapshotDir) {
}

@Override
public void loadSnapshot(File latestSnapshotRootDir) {}
public boolean loadSnapshot(File latestSnapshotRootDir) {
return true;
}
}

@Before
Expand Down
Loading
Loading