From 98246cec9063c4a2acf2134c848cf3fea847cc4b Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Fri, 12 Jun 2026 20:05:15 +0800 Subject: [PATCH] Propagate snapshot load failure during IoTConsensus AddPeer During region migration, when the target peer failed to load the transferred snapshot, the failure was silently swallowed: the target's IoTConsensus RPC handler returned SUCCESS regardless, so the coordinator activated the new peer and marked AddRegionPeerProcedure / RegionMigrateProcedure successful. The migration was reported complete while the destination replica actually had no data, leading to silent data loss once the source replica was dropped. The coordinator side already handles a non-SUCCESS triggerSnapshotLoad response correctly (it throws ConsensusGroupModifyPeerException, which fails the AddPeer task and rolls the procedure back without deleting the source replica). The only broken link was that snapshot-load failure was never reportable, because IStateMachine.loadSnapshot returned void and the implementations swallowed errors. Change IStateMachine.loadSnapshot to return boolean (true on success): - DataRegionStateMachine / SchemaRegionStateMachine / ConfigRegionState Machine return false when loading fails (and SchemaRegionStateMachine now guards its body so an exception is reported rather than thrown). - IoTConsensusServerImpl.loadSnapshot returns false if loading any receive folder fails (removing the long-standing TODO). - IoTConsensusRPCServiceProcessor.triggerSnapshotLoad returns a non- SUCCESS status when loadSnapshot fails, so the coordinator's existing error path fires and AddPeer fails instead of falsely succeeding. - SimpleConsensusServerImpl forwards the boolean; the Ratis ApplicationStateMachineProxy logs a failure (its behavior is otherwise unchanged). Test state machines updated accordingly. Add AddPeerSnapshotLoadFailureTest: a real two-node IoTConsensus group where the target's loadSnapshot is forced to fail; it verifies that addRemotePeer reaches the load step, throws ConsensusException, and does not leave the target peer active. The test fails against the old code and passes with the fix. --- .../ConfigRegionStateMachine.java | 4 +- .../apache/iotdb/consensus/IStateMachine.java | 5 +- .../consensus/iot/IoTConsensusServerImpl.java | 19 +- .../IoTConsensusRPCServiceProcessor.java | 10 +- .../ratis/ApplicationStateMachineProxy.java | 4 +- .../simple/SimpleConsensusServerImpl.java | 4 +- .../iotdb/consensus/EmptyStateMachine.java | 4 +- .../iot/AddPeerSnapshotLoadFailureTest.java | 244 ++++++++++++++++++ .../consensus/iot/util/TestStateMachine.java | 4 +- .../iotdb/consensus/ratis/TestUtils.java | 4 +- .../consensus/simple/SimpleConsensusTest.java | 4 +- .../dataregion/DataRegionStateMachine.java | 6 +- .../SchemaRegionStateMachine.java | 22 +- 13 files changed, 306 insertions(+), 28 deletions(-) create mode 100644 iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/AddPeerSnapshotLoadFailureTest.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index b3029e6602808..1c2fb38470b61 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -246,7 +246,7 @@ public boolean takeSnapshot(File snapshotDir) { } @Override - public void loadSnapshot(final File latestSnapshotRootDir) { + public boolean loadSnapshot(final File latestSnapshotRootDir) { try { executor.loadSnapshot(latestSnapshotRootDir); // We recompute the snapshot for pipe listener when loading snapshot @@ -254,12 +254,14 @@ public void loadSnapshot(final File latestSnapshotRootDir) { PipeConfigNodeAgent.runtime() .listener() .tryListenToSnapshots(ConfigNodeSnapshotParser.getSnapshots()); + return true; } catch (final IOException e) { if (PipeConfigNodeAgent.runtime().listener().isOpened()) { LOGGER.warn( ConfigNodeMessages.CONFIG_REGION_LISTENING_QUEUE_LISTEN_TO_SNAPSHOT_FAILED_WHEN_STARTUP, e); } + return false; } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java index 3354c83699b54..c7705d93896e8 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java @@ -112,8 +112,11 @@ default boolean clearSnapshot() { * Load the latest snapshot from given dir. * * @param latestSnapshotRootDir dir where the latest snapshot sits + * @return {@code true} if the snapshot was loaded successfully, {@code false} otherwise. Callers + * (e.g. the IoTConsensus AddPeer flow) rely on this to avoid activating a new peer whose + * snapshot failed to load, which would otherwise silently lose data. */ - void loadSnapshot(File latestSnapshotRootDir); + boolean loadSnapshot(File latestSnapshotRootDir); /** * given a snapshot dir, ask statemachine to provide all snapshot files. By default, it will list diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index f3c7d4e50ae26..27a2691d63136 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java @@ -511,14 +511,17 @@ private void clearOldSnapshot() { } } - public void loadSnapshot(String snapshotId) { - // TODO: (xingtanzjr) throw exception if the snapshot load failed - recvFolderManager - .getFolders() - .forEach( - dir -> { - stateMachine.loadSnapshot(getSnapshotPath(dir, snapshotId)); - }); + public boolean loadSnapshot(String snapshotId) { + // Load the snapshot from every receive folder. If any of them fails, report the failure so the + // AddPeer coordinator does not activate this peer with incomplete data (which would silently + // lose data on this replica). + boolean success = true; + for (String dir : recvFolderManager.getFolders()) { + if (!stateMachine.loadSnapshot(getSnapshotPath(dir, snapshotId))) { + success = false; + } + } + return success; } private File getSnapshotPath(String curStorageDir, String snapshotRelativePath) { diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java index c9a9901dbcf6f..91e17b370ca1e 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java @@ -343,7 +343,15 @@ public TTriggerSnapshotLoadRes triggerSnapshotLoad(TTriggerSnapshotLoadReq req) status.setMessage(message); return new TTriggerSnapshotLoadRes(status); } - impl.loadSnapshot(req.snapshotId); + if (!impl.loadSnapshot(req.snapshotId)) { + String message = + String.format( + "Failed to load snapshot %s for consensus group %s", req.snapshotId, groupId); + LOGGER.error(message); + TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); + status.setMessage(message); + return new TTriggerSnapshotLoadRes(status); + } KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_TRANSITION); return new TTriggerSnapshotLoadRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java index 1134d8fd6f206..dc009a71887e0 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java @@ -268,7 +268,9 @@ private void loadSnapshot(File latestSnapshotDir) { } // require the application statemachine to load the latest snapshot - applicationStateMachine.loadSnapshot(latestSnapshotDir); + if (!applicationStateMachine.loadSnapshot(latestSnapshotDir)) { + logger.error("{}: failed to load snapshot from {}", this, latestSnapshotDir); + } TermIndex snapshotTermIndex = Utils.getTermIndexFromDir(latestSnapshotDir); updateLastAppliedTermIndex(snapshotTermIndex.getTerm(), snapshotTermIndex.getIndex()); } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java index 4ed4a7b41ce41..b76bcdeaaebbc 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java @@ -89,7 +89,7 @@ public synchronized boolean takeSnapshot(File snapshotDir) { } @Override - public synchronized void loadSnapshot(File latestSnapshotRootDir) { - stateMachine.loadSnapshot(latestSnapshotRootDir); + public synchronized boolean loadSnapshot(File latestSnapshotRootDir) { + return stateMachine.loadSnapshot(latestSnapshotRootDir); } } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java index aafa0be5bb5ff..997120b00f96d 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java @@ -54,5 +54,7 @@ public boolean takeSnapshot(File snapshotDir) { } @Override - public void loadSnapshot(File latestSnapshotRootDir) {} + public boolean loadSnapshot(File latestSnapshotRootDir) { + return true; + } } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/AddPeerSnapshotLoadFailureTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/AddPeerSnapshotLoadFailureTest.java new file mode 100644 index 0000000000000..aa69907e6ed4e --- /dev/null +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/AddPeerSnapshotLoadFailureTest.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.consensus.iot; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.consensus.ConsensusGroupId; +import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.exception.StartupException; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.consensus.common.Peer; +import org.apache.iotdb.consensus.config.ConsensusConfig; +import org.apache.iotdb.consensus.exception.ConsensusException; +import org.apache.iotdb.consensus.iot.util.TestEntry; +import org.apache.iotdb.consensus.iot.util.TestStateMachine; + +import org.apache.ratis.util.FileUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Regression test for the snapshot-load-failure bug (a failed snapshot load on the AddPeer target + * was silently swallowed, so the new peer was activated and the migration falsely reported + * successful, losing data on the new replica). + * + *

It builds a real two-node IoTConsensus group, forces the target peer's {@link + * org.apache.iotdb.consensus.IStateMachine#loadSnapshot} to fail, and verifies that {@code + * addRemotePeer}: + * + *

+ */ +public class AddPeerSnapshotLoadFailureTest { + + private final Logger logger = LoggerFactory.getLogger(AddPeerSnapshotLoadFailureTest.class); + + private final ConsensusGroupId gid = new DataRegionId(1); + + private final int basePort = 9200; + + private final List peers = + Arrays.asList( + new Peer(gid, 1, new TEndPoint("127.0.0.1", basePort - 1)), + new Peer(gid, 2, new TEndPoint("127.0.0.1", basePort))); + + private final List peersStorage = + Arrays.asList( + new File("target" + File.separator + "snapshot-load-fail-1"), + new File("target" + File.separator + "snapshot-load-fail-2")); + + private final List> peersRecvSnapshotDirs = + Arrays.asList( + Arrays.asList( + "target" + File.separator + "snapshot-load-fail-1-recv-1", + "target" + File.separator + "snapshot-load-fail-1-recv-2"), + Arrays.asList( + "target" + File.separator + "snapshot-load-fail-2-recv-1", + "target" + File.separator + "snapshot-load-fail-2-recv-2")); + + private final List servers = new ArrayList<>(); + private final List stateMachines = new ArrayList<>(); + + /** A {@link TestStateMachine} whose snapshot load can be made to fail on demand. */ + private static class ControllableStateMachine extends TestStateMachine { + private volatile boolean failLoadSnapshot = false; + private volatile boolean loadSnapshotInvoked = false; + + void setFailLoadSnapshot(boolean failLoadSnapshot) { + this.failLoadSnapshot = failLoadSnapshot; + } + + boolean isLoadSnapshotInvoked() { + return loadSnapshotInvoked; + } + + @Override + public boolean loadSnapshot(File latestSnapshotRootDir) { + loadSnapshotInvoked = true; + if (failLoadSnapshot) { + return false; + } + return super.loadSnapshot(latestSnapshotRootDir); + } + + @Override + public boolean takeSnapshot(File snapshotDir) { + return true; + } + + // TestStateMachine does not implement clearSnapshot (the IStateMachine default throws). The + // AddPeer flow calls it in a finally block to clean up the local snapshot, so we provide a + // no-op here; otherwise that cleanup would mask the ConsensusException we are asserting on. + @Override + public boolean clearSnapshot() { + return true; + } + } + + @Before + public void setUp() throws Exception { + for (File file : peersStorage) { + file.mkdirs(); + stateMachines.add(new ControllableStateMachine()); + } + peersRecvSnapshotDirs.forEach(innerList -> innerList.forEach(dir -> new File(dir).mkdirs())); + initServer(); + } + + @After + public void tearDown() throws Exception { + servers.parallelStream().forEach(IoTConsensus::stop); + servers.clear(); + for (File file : peersStorage) { + FileUtils.deleteFully(file); + } + peersRecvSnapshotDirs.forEach( + innerList -> + innerList.forEach( + dir -> { + try { + FileUtils.deleteFully(new File(dir)); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + } + + private void initServer() throws IOException { + Assume.assumeTrue(checkPortAvailable()); + try { + for (int i = 0; i < peers.size(); i++) { + int finalI = i; + servers.add( + (IoTConsensus) + ConsensusFactory.getConsensusImpl( + ConsensusFactory.IOT_CONSENSUS, + ConsensusConfig.newBuilder() + .setThisNodeId(peers.get(i).getNodeId()) + .setThisNode(peers.get(i).getEndpoint()) + .setStorageDir(peersStorage.get(i).getAbsolutePath()) + .setRecvSnapshotDirs(peersRecvSnapshotDirs.get(i)) + .setConsensusGroupType(TConsensusGroupType.DataRegion) + .build(), + groupId -> stateMachines.get(finalI)) + .orElseThrow( + () -> + new IllegalArgumentException( + String.format( + ConsensusFactory.CONSTRUCT_FAILED_MSG, + ConsensusFactory.IOT_CONSENSUS)))); + } + for (int i = 0; i < peers.size(); i++) { + servers.get(i).start(); + } + } catch (IOException e) { + if (e.getCause() instanceof StartupException) { + // just succeed when can not bind socket + logger.info("Can not start IoTConsensus because", e); + Assume.assumeTrue(false); + } else { + logger.error("Failed because", e); + Assert.fail("Failed because " + e.getMessage()); + } + } + } + + @Test + public void addRemotePeerMustFailWhenTargetSnapshotLoadFails() throws Exception { + // node 0 is the sole initial member; node 1 will be added as a new peer. Mirroring the real + // region-migration flow, the destination peer (node 1) is pre-created locally with the full + // target peer list (IoTConsensus, unlike Ratis, requires a non-empty peer list here). + servers.get(0).createLocalPeer(gid, peers.subList(0, 1)); + servers.get(1).createLocalPeer(gid, peers); + + // Put some data into the group so the snapshot transfer is meaningful. + for (int i = 0; i < 10; i++) { + servers.get(0).write(gid, new TestEntry(i, peers.get(0))); + } + + // Force the target peer (node 1) to fail loading the transferred snapshot. + stateMachines.get(1).setFailLoadSnapshot(true); + + // Before the fix, addRemotePeer swallowed the load failure and returned normally, leaving the + // target peer active with incomplete data. It must now surface the failure. + Assert.assertThrows( + ConsensusException.class, () -> servers.get(0).addRemotePeer(gid, peers.get(1))); + + // The failure must be the snapshot load itself, i.e. the AddPeer flow actually reached the + // load step on the target rather than aborting earlier. + Assert.assertTrue( + "Target peer's loadSnapshot was never invoked; the failure came from an earlier step", + stateMachines.get(1).isLoadSnapshotInvoked()); + + // The target peer must not be left active with an incompletely-loaded snapshot. + Assert.assertFalse( + "Target peer was activated despite a failed snapshot load", + servers.get(1).getImpl(gid).isActive()); + } + + private boolean checkPortAvailable() { + for (Peer peer : this.peers) { + try (ServerSocket ignored = new ServerSocket(peer.getEndpoint().port)) { + logger.info("check port {} success for node {}", peer.getEndpoint().port, peer.getNodeId()); + } catch (IOException e) { + logger.error("check port {} failed for node {}", peer.getEndpoint().port, peer.getNodeId()); + return false; + } + } + return true; + } +} diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java index a879a03478457..9454dcb0c4392 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java @@ -142,5 +142,7 @@ public boolean takeSnapshot(File snapshotDir) { } @Override - public void loadSnapshot(File latestSnapshotRootDir) {} + public boolean loadSnapshot(File latestSnapshotRootDir) { + return true; + } } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java index 5cac2173396a3..3217d1cc58e0c 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java @@ -177,13 +177,15 @@ public boolean takeSnapshot(File snapshotDir) { } @Override - public void loadSnapshot(File latestSnapshotRootDir) { + public boolean loadSnapshot(File latestSnapshotRootDir) { File snapshot = new File(latestSnapshotRootDir.getAbsolutePath() + File.separator + "snapshot"); try (Scanner scanner = new Scanner(snapshot)) { integer.set(Integer.parseInt(scanner.next())); + return true; } catch (FileNotFoundException e) { logger.error("cannot find snapshot file {}", snapshot); + return false; } } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java index d50ab992f6f9c..ff062fb63a85b 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java @@ -123,7 +123,9 @@ public boolean takeSnapshot(File snapshotDir) { } @Override - public void loadSnapshot(File latestSnapshotRootDir) {} + public boolean loadSnapshot(File latestSnapshotRootDir) { + return true; + } } @Before diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java index 2de1ec9fdc0ce..62bb498afa19a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java @@ -124,7 +124,7 @@ public boolean clearSnapshot() { } @Override - public void loadSnapshot(File latestSnapshotRootDir) { + public boolean loadSnapshot(File latestSnapshotRootDir) { String databaseName = region.getDatabaseName(); String dataRegionIdString = region.getDataRegionIdString(); DataRegionId regionId = new DataRegionId(Integer.parseInt(dataRegionIdString)); @@ -141,14 +141,16 @@ public void loadSnapshot(File latestSnapshotRootDir) { .loadSnapshotForStateMachine()); if (newRegion == null) { logger.error(DataNodeMiscMessages.FAIL_LOAD_SNAPSHOT, latestSnapshotRootDir); - return; + return false; } this.region = newRegion; ChunkCache.getInstance().clear(); TimeSeriesMetadataCache.getInstance().clear(); BloomFilterCache.getInstance().clear(); + return true; } catch (Exception e) { logger.error(DataNodeMiscMessages.EXCEPTION_REPLACING_DATA_REGION, e); + return false; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java index edab5862d11e8..3f647ced7a5e7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java @@ -136,14 +136,20 @@ public boolean takeSnapshot(final File snapshotDir) { } @Override - public void loadSnapshot(final File latestSnapshotRootDir) { - schemaRegion.loadSnapshot(latestSnapshotRootDir); - PipeDataNodeAgent.runtime() - .schemaListener(schemaRegion.getSchemaRegionId()) - .loadSnapshot(latestSnapshotRootDir); - // We recompute the snapshot for pipe listener when loading snapshot - // to recover the newest snapshot in cache - listen2Snapshot4PipeListener(false); + public boolean loadSnapshot(final File latestSnapshotRootDir) { + try { + schemaRegion.loadSnapshot(latestSnapshotRootDir); + PipeDataNodeAgent.runtime() + .schemaListener(schemaRegion.getSchemaRegionId()) + .loadSnapshot(latestSnapshotRootDir); + // We recompute the snapshot for pipe listener when loading snapshot + // to recover the newest snapshot in cache + listen2Snapshot4PipeListener(false); + return true; + } catch (Exception e) { + logger.error("Failed to load snapshot from {}", latestSnapshotRootDir, e); + return false; + } } public void listen2Snapshot4PipeListener(final boolean isTmp) {