From fc3d2916d28748f5c2e40b01a93bad2712e6cdd7 Mon Sep 17 00:00:00 2001 From: Davide Polato Date: Sat, 18 Apr 2026 14:45:02 +0200 Subject: [PATCH 1/2] fix(cassandra): auto-recover session after Cassandra restart - Register ExponentialReconnectionPolicy on the Cluster builder so the Datastax driver keeps retrying downed nodes in the background. - Wrap every Session.execute() in executeWithRetry() with exponential backoff on transient connectivity failures. - Implement reconnectIfNeeded()/reset() so the pool reopens closed sessions and issues a lightweight health-check (SELECT now() FROM system.local) before subsequent queries. - Add tunable options: cassandra.reconnect_base_delay, cassandra.reconnect_max_delay, cassandra.reconnect_max_retries, cassandra.reconnect_interval. - Add unit tests covering defaults, overrides, disabling retries and option keys. Fixes #2740 --- .../store/cassandra/CassandraOptions.java | 42 ++++++ .../store/cassandra/CassandraSessionPool.java | 136 +++++++++++++++++- .../unit/cassandra/CassandraTest.java | 101 +++++++++++++ 3 files changed, 275 insertions(+), 4 deletions(-) diff --git a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java index a9ccf97765..7f4c2e9801 100644 --- a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java +++ b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java @@ -130,4 +130,46 @@ public static synchronized CassandraOptions instance() { positiveInt(), 12 * 60 * 60 ); + + public static final ConfigOption CASSANDRA_RECONNECT_BASE_DELAY = + new ConfigOption<>( + "cassandra.reconnect_base_delay", + "The base delay in milliseconds used by the driver's " + + "exponential reconnection policy when a Cassandra host " + + "becomes unreachable.", + rangeInt(100L, Long.MAX_VALUE), + 1000L + ); + + public static final ConfigOption CASSANDRA_RECONNECT_MAX_DELAY = + new ConfigOption<>( + "cassandra.reconnect_max_delay", + "The maximum delay in milliseconds used by the driver's " + + "exponential reconnection policy when a Cassandra host " + + "becomes unreachable.", + rangeInt(1000L, Long.MAX_VALUE), + 60_000L + ); + + public static final ConfigOption CASSANDRA_RECONNECT_MAX_RETRIES = + new ConfigOption<>( + "cassandra.reconnect_max_retries", + "The maximum number of retries applied at query-time when " + + "a Cassandra host is temporarily unreachable " + + "(NoHostAvailableException / OperationTimedOutException). " + + "Set to 0 to disable query-time retries.", + rangeInt(0, Integer.MAX_VALUE), + 10 + ); + + public static final ConfigOption CASSANDRA_RECONNECT_INTERVAL = + new ConfigOption<>( + "cassandra.reconnect_interval", + "The interval in milliseconds between query-time retries " + + "when a Cassandra host is temporarily unreachable. The " + + "actual wait grows with exponential backoff, capped at " + + "cassandra.reconnect_max_delay.", + rangeInt(100L, Long.MAX_VALUE), + 5000L + ); } diff --git a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java index 7a9ffa2b91..120dbd5f68 100644 --- a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java +++ b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java @@ -26,6 +26,8 @@ import org.apache.hugegraph.backend.store.BackendSessionPool; import org.apache.hugegraph.config.HugeConfig; import org.apache.hugegraph.util.E; +import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; import com.datastax.driver.core.BatchStatement; import com.datastax.driver.core.Cluster; @@ -34,22 +36,37 @@ import com.datastax.driver.core.ProtocolOptions.Compression; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.SimpleStatement; import com.datastax.driver.core.SocketOptions; import com.datastax.driver.core.Statement; +import com.datastax.driver.core.exceptions.DriverException; import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.OperationTimedOutException; +import com.datastax.driver.core.policies.ExponentialReconnectionPolicy; public class CassandraSessionPool extends BackendSessionPool { + private static final Logger LOG = Log.logger(CassandraSessionPool.class); + private static final int SECOND = 1000; + private static final String HEALTH_CHECK_CQL = + "SELECT now() FROM system.local"; private Cluster cluster; private final String keyspace; + private int maxRetries; + private long retryInterval; + private long retryMaxDelay; public CassandraSessionPool(HugeConfig config, String keyspace, String store) { super(config, keyspace + "/" + store); this.cluster = null; this.keyspace = keyspace; + this.maxRetries = 0; + this.retryInterval = 0L; + this.retryMaxDelay = 0L; } @Override @@ -86,6 +103,26 @@ public synchronized void open() { builder.withSocketOptions(socketOptions); + // Reconnection policy: let driver keep retrying nodes in background + // with exponential backoff after they go down (see issue #2740). + long reconnectBase = config.get( + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY); + long reconnectMax = config.get( + CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY); + E.checkArgument(reconnectMax >= reconnectBase, + "'%s' (%s) must be >= '%s' (%s)", + CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), + reconnectMax, + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), + reconnectBase); + builder.withReconnectionPolicy( + new ExponentialReconnectionPolicy(reconnectBase, reconnectMax)); + this.retryMaxDelay = reconnectMax; + this.maxRetries = config.get( + CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES); + this.retryInterval = config.get( + CassandraOptions.CASSANDRA_RECONNECT_INTERVAL); + // Credential options String username = config.get(CassandraOptions.CASSANDRA_USERNAME); String password = config.get(CassandraOptions.CASSANDRA_PASSWORD); @@ -161,7 +198,7 @@ public void rollback() { @Override public ResultSet commit() { - ResultSet rs = this.session.execute(this.batch); + ResultSet rs = this.executeWithRetry(this.batch); // Clear batch if execute() successfully (retained if failed) this.batch.clear(); return rs; @@ -197,15 +234,59 @@ public ResultSet query(Statement statement) { } public ResultSet execute(Statement statement) { - return this.session.execute(statement); + return this.executeWithRetry(statement); } public ResultSet execute(String statement) { - return this.session.execute(statement); + return this.executeWithRetry(new SimpleStatement(statement)); } public ResultSet execute(String statement, Object... args) { - return this.session.execute(statement, args); + return this.executeWithRetry(new SimpleStatement(statement, args)); + } + + /** + * Execute a statement, retrying on transient connectivity failures + * (NoHostAvailableException / OperationTimedOutException). The driver + * itself keeps retrying connections in the background via the + * reconnection policy, so once Cassandra comes back online, a + * subsequent attempt here will succeed without restarting the server. + * See issue #2740. + */ + private ResultSet executeWithRetry(Statement statement) { + int retries = CassandraSessionPool.this.maxRetries; + long interval = CassandraSessionPool.this.retryInterval; + long maxDelay = CassandraSessionPool.this.retryMaxDelay; + DriverException lastError = null; + for (int attempt = 0; attempt <= retries; attempt++) { + try { + return this.session.execute(statement); + } catch (NoHostAvailableException | OperationTimedOutException e) { + lastError = e; + if (attempt >= retries) { + break; + } + long delay = Math.min(interval * (1L << Math.min(attempt, 20)), + maxDelay > 0 ? maxDelay : interval); + LOG.warn("Cassandra temporarily unavailable ({}), " + + "retry {}/{} in {} ms", + e.getClass().getSimpleName(), attempt + 1, + retries, delay); + try { + Thread.sleep(delay); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new BackendException("Interrupted while " + + "waiting to retry " + + "Cassandra query", ie); + } + } + } + throw new BackendException("Failed to execute Cassandra query " + + "after %s retries: %s", + lastError, retries, + lastError == null ? "" : + lastError.getMessage()); } private void tryOpen() { @@ -255,6 +336,53 @@ public boolean hasChanges() { return this.batch.size() > 0; } + /** + * Periodic liveness probe invoked by {@link BackendSessionPool} to + * recover thread-local sessions after Cassandra has been restarted. + * Reopens the driver session if it was closed and pings the cluster + * with a lightweight query. Any failure here is swallowed so the + * caller can still issue the real query, which will drive retries via + * {@link #executeWithRetry(Statement)}. + */ + @Override + public void reconnectIfNeeded() { + if (!this.opened) { + return; + } + try { + if (this.session == null || this.session.isClosed()) { + this.session = null; + this.tryOpen(); + if (this.session == null) { + return; + } + } + this.session.execute(new SimpleStatement(HEALTH_CHECK_CQL)); + } catch (DriverException e) { + LOG.debug("Cassandra health-check failed, " + + "will retry on next query: {}", e.getMessage()); + } + } + + /** + * Force-close the driver session so it is re-opened on the next + * {@link #opened()} call. Used when a failure is observed and we + * want to start fresh on the next attempt. + */ + @Override + public void reset() { + if (this.session == null) { + return; + } + try { + this.session.close(); + } catch (Throwable e) { + LOG.warn("Failed to reset Cassandra session", e); + } finally { + this.session = null; + } + } + public Collection statements() { return this.batch.getStatements(); } diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java index ef5a8e896b..139a29c145 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java @@ -17,11 +17,13 @@ package org.apache.hugegraph.unit.cassandra; +import java.util.Collections; import java.util.Map; import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.PropertiesConfiguration; import org.apache.hugegraph.backend.store.cassandra.CassandraOptions; +import org.apache.hugegraph.backend.store.cassandra.CassandraSessionPool; import org.apache.hugegraph.backend.store.cassandra.CassandraStore; import org.apache.hugegraph.config.HugeConfig; import org.apache.hugegraph.config.OptionSpace; @@ -30,7 +32,11 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -192,4 +198,99 @@ public void testParseReplicaWithNetworkTopologyStrategyAndDoubleReplica() { Whitebox.invokeStatic(CassandraStore.class, "parseReplica", config); }); } + + @Test + public void testReconnectOptionsHaveSensibleDefaults() { + // Runtime-reconnection options must exist with non-zero defaults so + // HugeGraph keeps running when Cassandra restarts (issue #2740). + Assert.assertEquals(1000L, (long) CassandraOptions + .CASSANDRA_RECONNECT_BASE_DELAY.defaultValue()); + Assert.assertEquals(60_000L, (long) CassandraOptions + .CASSANDRA_RECONNECT_MAX_DELAY.defaultValue()); + Assert.assertEquals(10, (int) CassandraOptions + .CASSANDRA_RECONNECT_MAX_RETRIES.defaultValue()); + Assert.assertEquals(5000L, (long) CassandraOptions + .CASSANDRA_RECONNECT_INTERVAL.defaultValue()); + } + + @Test + public void testReconnectOptionsAreOverridable() { + String base = CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(); + String max = CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(); + String retries = CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES + .name(); + String interval = CassandraOptions.CASSANDRA_RECONNECT_INTERVAL.name(); + + Configuration conf = new PropertiesConfiguration(); + conf.setProperty(base, 500L); + conf.setProperty(max, 30_000L); + conf.setProperty(retries, 3); + conf.setProperty(interval, 1000L); + HugeConfig config = new HugeConfig(conf); + + Assert.assertEquals(500L, (long) config.get( + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY)); + Assert.assertEquals(30_000L, (long) config.get( + CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY)); + Assert.assertEquals(3, (int) config.get( + CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES)); + Assert.assertEquals(1000L, (long) config.get( + CassandraOptions.CASSANDRA_RECONNECT_INTERVAL)); + } + + @Test + public void testReconnectRetriesCanBeDisabled() { + String retries = CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES + .name(); + Configuration conf = new PropertiesConfiguration(); + conf.setProperty(retries, 0); + HugeConfig config = new HugeConfig(conf); + Assert.assertEquals(0, (int) config.get( + CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES)); + } + + @Test + public void testExecuteWithRetrySucceedsAfterTransientFailures() { + Configuration conf = new PropertiesConfiguration(); + HugeConfig config = new HugeConfig(conf); + CassandraSessionPool pool = new CassandraSessionPool(config, + "ks", "store"); + Whitebox.setInternalState(pool, "maxRetries", 3); + Whitebox.setInternalState(pool, "retryInterval", 1L); + Whitebox.setInternalState(pool, "retryMaxDelay", 10L); + + com.datastax.driver.core.Session driverSession = Mockito.mock( + com.datastax.driver.core.Session.class); + ResultSet rs = Mockito.mock(ResultSet.class); + NoHostAvailableException transientFailure = + new NoHostAvailableException(Collections.emptyMap()); + Mockito.when(driverSession.execute(Mockito.any(Statement.class))) + .thenThrow(transientFailure) + .thenThrow(transientFailure) + .thenReturn(rs); + + CassandraSessionPool.Session session = pool.new Session(); + Whitebox.setInternalState(session, "session", driverSession); + + ResultSet result = session.execute("SELECT now() FROM system.local"); + Assert.assertSame(rs, result); + Mockito.verify(driverSession, Mockito.times(3)) + .execute(Mockito.any(Statement.class)); + } + + @Test + public void testReconnectOptionsExposeExpectedKeys() { + Assert.assertEquals("cassandra.reconnect_base_delay", + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY + .name()); + Assert.assertEquals("cassandra.reconnect_max_delay", + CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY + .name()); + Assert.assertEquals("cassandra.reconnect_max_retries", + CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES + .name()); + Assert.assertEquals("cassandra.reconnect_interval", + CassandraOptions.CASSANDRA_RECONNECT_INTERVAL + .name()); + } } From 5ac39906a3cb335983acc6f708d2b1e7e5ed984c Mon Sep 17 00:00:00 2001 From: Davide Polato Date: Mon, 20 Apr 2026 16:35:11 +0200 Subject: [PATCH 2/2] fix: Address reviewer feedback --- .../store/cassandra/CassandraSessionPool.java | 93 ++++++++++++------- 1 file changed, 60 insertions(+), 33 deletions(-) diff --git a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java index 120dbd5f68..92d7367f94 100644 --- a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java +++ b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java @@ -55,18 +55,30 @@ public class CassandraSessionPool extends BackendSessionPool { private Cluster cluster; private final String keyspace; - private int maxRetries; - private long retryInterval; - private long retryMaxDelay; + private final int maxRetries; + private final long retryInterval; + private final long retryMaxDelay; public CassandraSessionPool(HugeConfig config, String keyspace, String store) { super(config, keyspace + "/" + store); this.cluster = null; this.keyspace = keyspace; - this.maxRetries = 0; - this.retryInterval = 0L; - this.retryMaxDelay = 0L; + this.maxRetries = config.get( + CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES); + this.retryInterval = config.get( + CassandraOptions.CASSANDRA_RECONNECT_INTERVAL); + long reconnectBase = config.get( + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY); + long reconnectMax = config.get( + CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY); + E.checkArgument(reconnectMax >= reconnectBase, + "'%s' (%s) must be >= '%s' (%s)", + CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), + reconnectMax, + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), + reconnectBase); + this.retryMaxDelay = reconnectMax; } @Override @@ -107,21 +119,9 @@ public synchronized void open() { // with exponential backoff after they go down (see issue #2740). long reconnectBase = config.get( CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY); - long reconnectMax = config.get( - CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY); - E.checkArgument(reconnectMax >= reconnectBase, - "'%s' (%s) must be >= '%s' (%s)", - CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), - reconnectMax, - CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), - reconnectBase); builder.withReconnectionPolicy( - new ExponentialReconnectionPolicy(reconnectBase, reconnectMax)); - this.retryMaxDelay = reconnectMax; - this.maxRetries = config.get( - CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES); - this.retryInterval = config.get( - CassandraOptions.CASSANDRA_RECONNECT_INTERVAL); + new ExponentialReconnectionPolicy(reconnectBase, + this.retryMaxDelay)); // Credential options String username = config.get(CassandraOptions.CASSANDRA_USERNAME); @@ -211,6 +211,7 @@ public void commitAsync() { int processors = Math.min(statements.size(), 1023); List results = new ArrayList<>(processors + 1); for (Statement s : statements) { + // TODO: commitAsync is not retried (async retry semantics are complex) ResultSetFuture future = this.session.executeAsync(s); results.add(future); @@ -251,7 +252,15 @@ public ResultSet execute(String statement, Object... args) { * itself keeps retrying connections in the background via the * reconnection policy, so once Cassandra comes back online, a * subsequent attempt here will succeed without restarting the server. - * See issue #2740. + * + *

Blocking note: retries block the calling thread via + * {@link Thread#sleep(long)}. Worst-case a single call blocks for + * {@code maxRetries * retryMaxDelay} ms. Under high-throughput + * workloads concurrent threads may pile up in {@code sleep()} during + * a Cassandra outage. For such deployments lower + * {@code cassandra.reconnect_max_retries} (default 10) and + * {@code cassandra.reconnect_max_delay} (default 60000ms) so the + * request fails fast and pressure is released back to the caller. */ private ResultSet executeWithRetry(Statement statement) { int retries = CassandraSessionPool.this.maxRetries; @@ -266,8 +275,15 @@ private ResultSet executeWithRetry(Statement statement) { if (attempt >= retries) { break; } - long delay = Math.min(interval * (1L << Math.min(attempt, 20)), - maxDelay > 0 ? maxDelay : interval); + long cap = maxDelay > 0 ? maxDelay : interval; + long shift = 1L << Math.min(attempt, 20); + long delay; + try { + // Guard against Long overflow when retryInterval is huge. + delay = Math.min(Math.multiplyExact(interval, shift), cap); + } catch (ArithmeticException overflow) { + delay = cap; + } LOG.warn("Cassandra temporarily unavailable ({}), " + "retry {}/{} in {} ms", e.getClass().getSimpleName(), attempt + 1, @@ -282,11 +298,14 @@ private ResultSet executeWithRetry(Statement statement) { } } } - throw new BackendException("Failed to execute Cassandra query " + - "after %s retries: %s", - lastError, retries, - lastError == null ? "" : - lastError.getMessage()); + // Preserve original exception as cause (stack trace + type) by + // pre-formatting the message and using the (String, Throwable) + // constructor explicitly — avoids ambiguity with varargs overloads. + String msg = String.format( + "Failed to execute Cassandra query after %s retries: %s", + retries, + lastError == null ? "" : lastError.getMessage()); + throw new BackendException(msg, lastError); } private void tryOpen() { @@ -353,14 +372,20 @@ public void reconnectIfNeeded() { if (this.session == null || this.session.isClosed()) { this.session = null; this.tryOpen(); - if (this.session == null) { - return; - } } - this.session.execute(new SimpleStatement(HEALTH_CHECK_CQL)); + if (this.session != null) { + this.session.execute(new SimpleStatement(HEALTH_CHECK_CQL)); + } } catch (DriverException e) { LOG.debug("Cassandra health-check failed, " + "will retry on next query: {}", e.getMessage()); + } finally { + // Keep opened flag consistent with session: if tryOpen() + // failed to reopen, clear opened so the next execute() does + // not NPE before executeWithRetry() can intercept. + if (this.session == null) { + this.opened = false; + } } } @@ -376,7 +401,9 @@ public void reset() { } try { this.session.close(); - } catch (Throwable e) { + } catch (Exception e) { + // Do not swallow Error (OOM / StackOverflow); only log + // ordinary exceptions raised by the driver on close. LOG.warn("Failed to reset Cassandra session", e); } finally { this.session = null;