diff --git a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java index a9ccf97765..7f4c2e9801 100644 --- a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java +++ b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java @@ -130,4 +130,46 @@ public static synchronized CassandraOptions instance() { positiveInt(), 12 * 60 * 60 ); + + public static final ConfigOption CASSANDRA_RECONNECT_BASE_DELAY = + new ConfigOption<>( + "cassandra.reconnect_base_delay", + "The base delay in milliseconds used by the driver's " + + "exponential reconnection policy when a Cassandra host " + + "becomes unreachable.", + rangeInt(100L, Long.MAX_VALUE), + 1000L + ); + + public static final ConfigOption CASSANDRA_RECONNECT_MAX_DELAY = + new ConfigOption<>( + "cassandra.reconnect_max_delay", + "The maximum delay in milliseconds used by the driver's " + + "exponential reconnection policy when a Cassandra host " + + "becomes unreachable.", + rangeInt(1000L, Long.MAX_VALUE), + 60_000L + ); + + public static final ConfigOption CASSANDRA_RECONNECT_MAX_RETRIES = + new ConfigOption<>( + "cassandra.reconnect_max_retries", + "The maximum number of retries applied at query-time when " + + "a Cassandra host is temporarily unreachable " + + "(NoHostAvailableException / OperationTimedOutException). " + + "Set to 0 to disable query-time retries.", + rangeInt(0, Integer.MAX_VALUE), + 10 + ); + + public static final ConfigOption CASSANDRA_RECONNECT_INTERVAL = + new ConfigOption<>( + "cassandra.reconnect_interval", + "The interval in milliseconds between query-time retries " + + "when a Cassandra host is temporarily unreachable. The " + + "actual wait grows with exponential backoff, capped at " + + "cassandra.reconnect_max_delay.", + rangeInt(100L, Long.MAX_VALUE), + 5000L + ); } diff --git a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java index 7a9ffa2b91..92d7367f94 100644 --- a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java +++ b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java @@ -26,6 +26,8 @@ import org.apache.hugegraph.backend.store.BackendSessionPool; import org.apache.hugegraph.config.HugeConfig; import org.apache.hugegraph.util.E; +import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; import com.datastax.driver.core.BatchStatement; import com.datastax.driver.core.Cluster; @@ -34,22 +36,49 @@ import com.datastax.driver.core.ProtocolOptions.Compression; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.SimpleStatement; import com.datastax.driver.core.SocketOptions; import com.datastax.driver.core.Statement; +import com.datastax.driver.core.exceptions.DriverException; import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.OperationTimedOutException; +import com.datastax.driver.core.policies.ExponentialReconnectionPolicy; public class CassandraSessionPool extends BackendSessionPool { + private static final Logger LOG = Log.logger(CassandraSessionPool.class); + private static final int SECOND = 1000; + private static final String HEALTH_CHECK_CQL = + "SELECT now() FROM system.local"; private Cluster cluster; private final String keyspace; + private final int maxRetries; + private final long retryInterval; + private final long retryMaxDelay; public CassandraSessionPool(HugeConfig config, String keyspace, String store) { super(config, keyspace + "/" + store); this.cluster = null; this.keyspace = keyspace; + this.maxRetries = config.get( + CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES); + this.retryInterval = config.get( + CassandraOptions.CASSANDRA_RECONNECT_INTERVAL); + long reconnectBase = config.get( + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY); + long reconnectMax = config.get( + CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY); + E.checkArgument(reconnectMax >= reconnectBase, + "'%s' (%s) must be >= '%s' (%s)", + CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), + reconnectMax, + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), + reconnectBase); + this.retryMaxDelay = reconnectMax; } @Override @@ -86,6 +115,14 @@ public synchronized void open() { builder.withSocketOptions(socketOptions); + // Reconnection policy: let driver keep retrying nodes in background + // with exponential backoff after they go down (see issue #2740). + long reconnectBase = config.get( + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY); + builder.withReconnectionPolicy( + new ExponentialReconnectionPolicy(reconnectBase, + this.retryMaxDelay)); + // Credential options String username = config.get(CassandraOptions.CASSANDRA_USERNAME); String password = config.get(CassandraOptions.CASSANDRA_PASSWORD); @@ -161,7 +198,7 @@ public void rollback() { @Override public ResultSet commit() { - ResultSet rs = this.session.execute(this.batch); + ResultSet rs = this.executeWithRetry(this.batch); // Clear batch if execute() successfully (retained if failed) this.batch.clear(); return rs; @@ -174,6 +211,7 @@ public void commitAsync() { int processors = Math.min(statements.size(), 1023); List results = new ArrayList<>(processors + 1); for (Statement s : statements) { + // TODO: commitAsync is not retried (async retry semantics are complex) ResultSetFuture future = this.session.executeAsync(s); results.add(future); @@ -197,15 +235,77 @@ public ResultSet query(Statement statement) { } public ResultSet execute(Statement statement) { - return this.session.execute(statement); + return this.executeWithRetry(statement); } public ResultSet execute(String statement) { - return this.session.execute(statement); + return this.executeWithRetry(new SimpleStatement(statement)); } public ResultSet execute(String statement, Object... args) { - return this.session.execute(statement, args); + return this.executeWithRetry(new SimpleStatement(statement, args)); + } + + /** + * Execute a statement, retrying on transient connectivity failures + * (NoHostAvailableException / OperationTimedOutException). The driver + * itself keeps retrying connections in the background via the + * reconnection policy, so once Cassandra comes back online, a + * subsequent attempt here will succeed without restarting the server. + * + *

Blocking note: retries block the calling thread via + * {@link Thread#sleep(long)}. Worst-case a single call blocks for + * {@code maxRetries * retryMaxDelay} ms. Under high-throughput + * workloads concurrent threads may pile up in {@code sleep()} during + * a Cassandra outage. For such deployments lower + * {@code cassandra.reconnect_max_retries} (default 10) and + * {@code cassandra.reconnect_max_delay} (default 60000ms) so the + * request fails fast and pressure is released back to the caller. + */ + private ResultSet executeWithRetry(Statement statement) { + int retries = CassandraSessionPool.this.maxRetries; + long interval = CassandraSessionPool.this.retryInterval; + long maxDelay = CassandraSessionPool.this.retryMaxDelay; + DriverException lastError = null; + for (int attempt = 0; attempt <= retries; attempt++) { + try { + return this.session.execute(statement); + } catch (NoHostAvailableException | OperationTimedOutException e) { + lastError = e; + if (attempt >= retries) { + break; + } + long cap = maxDelay > 0 ? maxDelay : interval; + long shift = 1L << Math.min(attempt, 20); + long delay; + try { + // Guard against Long overflow when retryInterval is huge. + delay = Math.min(Math.multiplyExact(interval, shift), cap); + } catch (ArithmeticException overflow) { + delay = cap; + } + LOG.warn("Cassandra temporarily unavailable ({}), " + + "retry {}/{} in {} ms", + e.getClass().getSimpleName(), attempt + 1, + retries, delay); + try { + Thread.sleep(delay); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new BackendException("Interrupted while " + + "waiting to retry " + + "Cassandra query", ie); + } + } + } + // Preserve original exception as cause (stack trace + type) by + // pre-formatting the message and using the (String, Throwable) + // constructor explicitly — avoids ambiguity with varargs overloads. + String msg = String.format( + "Failed to execute Cassandra query after %s retries: %s", + retries, + lastError == null ? "" : lastError.getMessage()); + throw new BackendException(msg, lastError); } private void tryOpen() { @@ -255,6 +355,61 @@ public boolean hasChanges() { return this.batch.size() > 0; } + /** + * Periodic liveness probe invoked by {@link BackendSessionPool} to + * recover thread-local sessions after Cassandra has been restarted. + * Reopens the driver session if it was closed and pings the cluster + * with a lightweight query. Any failure here is swallowed so the + * caller can still issue the real query, which will drive retries via + * {@link #executeWithRetry(Statement)}. + */ + @Override + public void reconnectIfNeeded() { + if (!this.opened) { + return; + } + try { + if (this.session == null || this.session.isClosed()) { + this.session = null; + this.tryOpen(); + } + if (this.session != null) { + this.session.execute(new SimpleStatement(HEALTH_CHECK_CQL)); + } + } catch (DriverException e) { + LOG.debug("Cassandra health-check failed, " + + "will retry on next query: {}", e.getMessage()); + } finally { + // Keep opened flag consistent with session: if tryOpen() + // failed to reopen, clear opened so the next execute() does + // not NPE before executeWithRetry() can intercept. + if (this.session == null) { + this.opened = false; + } + } + } + + /** + * Force-close the driver session so it is re-opened on the next + * {@link #opened()} call. Used when a failure is observed and we + * want to start fresh on the next attempt. + */ + @Override + public void reset() { + if (this.session == null) { + return; + } + try { + this.session.close(); + } catch (Exception e) { + // Do not swallow Error (OOM / StackOverflow); only log + // ordinary exceptions raised by the driver on close. + LOG.warn("Failed to reset Cassandra session", e); + } finally { + this.session = null; + } + } + public Collection statements() { return this.batch.getStatements(); } diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java index ef5a8e896b..139a29c145 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java @@ -17,11 +17,13 @@ package org.apache.hugegraph.unit.cassandra; +import java.util.Collections; import java.util.Map; import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.PropertiesConfiguration; import org.apache.hugegraph.backend.store.cassandra.CassandraOptions; +import org.apache.hugegraph.backend.store.cassandra.CassandraSessionPool; import org.apache.hugegraph.backend.store.cassandra.CassandraStore; import org.apache.hugegraph.config.HugeConfig; import org.apache.hugegraph.config.OptionSpace; @@ -30,7 +32,11 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -192,4 +198,99 @@ public void testParseReplicaWithNetworkTopologyStrategyAndDoubleReplica() { Whitebox.invokeStatic(CassandraStore.class, "parseReplica", config); }); } + + @Test + public void testReconnectOptionsHaveSensibleDefaults() { + // Runtime-reconnection options must exist with non-zero defaults so + // HugeGraph keeps running when Cassandra restarts (issue #2740). + Assert.assertEquals(1000L, (long) CassandraOptions + .CASSANDRA_RECONNECT_BASE_DELAY.defaultValue()); + Assert.assertEquals(60_000L, (long) CassandraOptions + .CASSANDRA_RECONNECT_MAX_DELAY.defaultValue()); + Assert.assertEquals(10, (int) CassandraOptions + .CASSANDRA_RECONNECT_MAX_RETRIES.defaultValue()); + Assert.assertEquals(5000L, (long) CassandraOptions + .CASSANDRA_RECONNECT_INTERVAL.defaultValue()); + } + + @Test + public void testReconnectOptionsAreOverridable() { + String base = CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(); + String max = CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(); + String retries = CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES + .name(); + String interval = CassandraOptions.CASSANDRA_RECONNECT_INTERVAL.name(); + + Configuration conf = new PropertiesConfiguration(); + conf.setProperty(base, 500L); + conf.setProperty(max, 30_000L); + conf.setProperty(retries, 3); + conf.setProperty(interval, 1000L); + HugeConfig config = new HugeConfig(conf); + + Assert.assertEquals(500L, (long) config.get( + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY)); + Assert.assertEquals(30_000L, (long) config.get( + CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY)); + Assert.assertEquals(3, (int) config.get( + CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES)); + Assert.assertEquals(1000L, (long) config.get( + CassandraOptions.CASSANDRA_RECONNECT_INTERVAL)); + } + + @Test + public void testReconnectRetriesCanBeDisabled() { + String retries = CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES + .name(); + Configuration conf = new PropertiesConfiguration(); + conf.setProperty(retries, 0); + HugeConfig config = new HugeConfig(conf); + Assert.assertEquals(0, (int) config.get( + CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES)); + } + + @Test + public void testExecuteWithRetrySucceedsAfterTransientFailures() { + Configuration conf = new PropertiesConfiguration(); + HugeConfig config = new HugeConfig(conf); + CassandraSessionPool pool = new CassandraSessionPool(config, + "ks", "store"); + Whitebox.setInternalState(pool, "maxRetries", 3); + Whitebox.setInternalState(pool, "retryInterval", 1L); + Whitebox.setInternalState(pool, "retryMaxDelay", 10L); + + com.datastax.driver.core.Session driverSession = Mockito.mock( + com.datastax.driver.core.Session.class); + ResultSet rs = Mockito.mock(ResultSet.class); + NoHostAvailableException transientFailure = + new NoHostAvailableException(Collections.emptyMap()); + Mockito.when(driverSession.execute(Mockito.any(Statement.class))) + .thenThrow(transientFailure) + .thenThrow(transientFailure) + .thenReturn(rs); + + CassandraSessionPool.Session session = pool.new Session(); + Whitebox.setInternalState(session, "session", driverSession); + + ResultSet result = session.execute("SELECT now() FROM system.local"); + Assert.assertSame(rs, result); + Mockito.verify(driverSession, Mockito.times(3)) + .execute(Mockito.any(Statement.class)); + } + + @Test + public void testReconnectOptionsExposeExpectedKeys() { + Assert.assertEquals("cassandra.reconnect_base_delay", + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY + .name()); + Assert.assertEquals("cassandra.reconnect_max_delay", + CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY + .name()); + Assert.assertEquals("cassandra.reconnect_max_retries", + CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES + .name()); + Assert.assertEquals("cassandra.reconnect_interval", + CassandraOptions.CASSANDRA_RECONNECT_INTERVAL + .name()); + } }