-
Notifications
You must be signed in to change notification settings - Fork 596
fix(cassandra): auto-recover session after Cassandra restart #2997
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -26,6 +26,8 @@ | |||||||||||||
| import org.apache.hugegraph.backend.store.BackendSessionPool; | ||||||||||||||
| import org.apache.hugegraph.config.HugeConfig; | ||||||||||||||
| import org.apache.hugegraph.util.E; | ||||||||||||||
| import org.apache.hugegraph.util.Log; | ||||||||||||||
| import org.slf4j.Logger; | ||||||||||||||
|
|
||||||||||||||
| import com.datastax.driver.core.BatchStatement; | ||||||||||||||
| import com.datastax.driver.core.Cluster; | ||||||||||||||
|
|
@@ -34,22 +36,49 @@ | |||||||||||||
| import com.datastax.driver.core.ProtocolOptions.Compression; | ||||||||||||||
| import com.datastax.driver.core.ResultSet; | ||||||||||||||
| import com.datastax.driver.core.ResultSetFuture; | ||||||||||||||
| import com.datastax.driver.core.SimpleStatement; | ||||||||||||||
| import com.datastax.driver.core.SocketOptions; | ||||||||||||||
| import com.datastax.driver.core.Statement; | ||||||||||||||
| import com.datastax.driver.core.exceptions.DriverException; | ||||||||||||||
| import com.datastax.driver.core.exceptions.InvalidQueryException; | ||||||||||||||
| import com.datastax.driver.core.exceptions.NoHostAvailableException; | ||||||||||||||
| import com.datastax.driver.core.exceptions.OperationTimedOutException; | ||||||||||||||
| import com.datastax.driver.core.policies.ExponentialReconnectionPolicy; | ||||||||||||||
|
|
||||||||||||||
| public class CassandraSessionPool extends BackendSessionPool { | ||||||||||||||
|
|
||||||||||||||
| private static final Logger LOG = Log.logger(CassandraSessionPool.class); | ||||||||||||||
|
|
||||||||||||||
| private static final int SECOND = 1000; | ||||||||||||||
| private static final String HEALTH_CHECK_CQL = | ||||||||||||||
| "SELECT now() FROM system.local"; | ||||||||||||||
|
|
||||||||||||||
| private Cluster cluster; | ||||||||||||||
| private final String keyspace; | ||||||||||||||
| private final int maxRetries; | ||||||||||||||
| private final long retryInterval; | ||||||||||||||
| private final long retryMaxDelay; | ||||||||||||||
|
|
||||||||||||||
| public CassandraSessionPool(HugeConfig config, | ||||||||||||||
| String keyspace, String store) { | ||||||||||||||
| super(config, keyspace + "/" + store); | ||||||||||||||
| this.cluster = null; | ||||||||||||||
| this.keyspace = keyspace; | ||||||||||||||
| this.maxRetries = config.get( | ||||||||||||||
| CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES); | ||||||||||||||
| this.retryInterval = config.get( | ||||||||||||||
| CassandraOptions.CASSANDRA_RECONNECT_INTERVAL); | ||||||||||||||
| long reconnectBase = config.get( | ||||||||||||||
| CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY); | ||||||||||||||
| long reconnectMax = config.get( | ||||||||||||||
| CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY); | ||||||||||||||
| E.checkArgument(reconnectMax >= reconnectBase, | ||||||||||||||
| "'%s' (%s) must be >= '%s' (%s)", | ||||||||||||||
| CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), | ||||||||||||||
| reconnectMax, | ||||||||||||||
| CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), | ||||||||||||||
| reconnectBase); | ||||||||||||||
| this.retryMaxDelay = reconnectMax; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
|
|
@@ -86,6 +115,14 @@ public synchronized void open() { | |||||||||||||
|
|
||||||||||||||
| builder.withSocketOptions(socketOptions); | ||||||||||||||
|
|
||||||||||||||
| // Reconnection policy: let driver keep retrying nodes in background | ||||||||||||||
| // with exponential backoff after they go down (see issue #2740). | ||||||||||||||
| long reconnectBase = config.get( | ||||||||||||||
| CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY); | ||||||||||||||
| builder.withReconnectionPolicy( | ||||||||||||||
| new ExponentialReconnectionPolicy(reconnectBase, | ||||||||||||||
| this.retryMaxDelay)); | ||||||||||||||
|
|
||||||||||||||
| // Credential options | ||||||||||||||
| String username = config.get(CassandraOptions.CASSANDRA_USERNAME); | ||||||||||||||
| String password = config.get(CassandraOptions.CASSANDRA_PASSWORD); | ||||||||||||||
|
|
@@ -161,7 +198,7 @@ public void rollback() { | |||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public ResultSet commit() { | ||||||||||||||
| ResultSet rs = this.session.execute(this.batch); | ||||||||||||||
| ResultSet rs = this.executeWithRetry(this.batch); | ||||||||||||||
| // Clear batch if execute() successfully (retained if failed) | ||||||||||||||
| this.batch.clear(); | ||||||||||||||
| return rs; | ||||||||||||||
|
|
@@ -174,6 +211,7 @@ public void commitAsync() { | |||||||||||||
| int processors = Math.min(statements.size(), 1023); | ||||||||||||||
| List<ResultSetFuture> results = new ArrayList<>(processors + 1); | ||||||||||||||
| for (Statement s : statements) { | ||||||||||||||
| // TODO: commitAsync is not retried (async retry semantics are complex) | ||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Timeline during a Cassandra restart: Both paths commit the same batch type; users cannot know which one they're calling or whether it will be protected. If implementing async retry is genuinely deferred, the TODO should be more explicit about the consequence — callers may see write failures during a Cassandra restart even with |
||||||||||||||
| ResultSetFuture future = this.session.executeAsync(s); | ||||||||||||||
| results.add(future); | ||||||||||||||
|
|
||||||||||||||
|
|
@@ -197,15 +235,77 @@ public ResultSet query(Statement statement) { | |||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| public ResultSet execute(Statement statement) { | ||||||||||||||
| return this.session.execute(statement); | ||||||||||||||
| return this.executeWithRetry(statement); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| public ResultSet execute(String statement) { | ||||||||||||||
| return this.session.execute(statement); | ||||||||||||||
| return this.executeWithRetry(new SimpleStatement(statement)); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| public ResultSet execute(String statement, Object... args) { | ||||||||||||||
| return this.session.execute(statement, args); | ||||||||||||||
| return this.executeWithRetry(new SimpleStatement(statement, args)); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| /** | ||||||||||||||
|
dpol1 marked this conversation as resolved.
|
||||||||||||||
| * Execute a statement, retrying on transient connectivity failures | ||||||||||||||
| * (NoHostAvailableException / OperationTimedOutException). The driver | ||||||||||||||
| * itself keeps retrying connections in the background via the | ||||||||||||||
| * reconnection policy, so once Cassandra comes back online, a | ||||||||||||||
| * subsequent attempt here will succeed without restarting the server. | ||||||||||||||
| * | ||||||||||||||
| * <p><b>Blocking note:</b> retries block the calling thread via | ||||||||||||||
| * {@link Thread#sleep(long)}. Worst-case a single call blocks for | ||||||||||||||
| * {@code maxRetries * retryMaxDelay} ms. Under high-throughput | ||||||||||||||
| * workloads concurrent threads may pile up in {@code sleep()} during | ||||||||||||||
| * a Cassandra outage. For such deployments lower | ||||||||||||||
| * {@code cassandra.reconnect_max_retries} (default 10) and | ||||||||||||||
| * {@code cassandra.reconnect_max_delay} (default 60000ms) so the | ||||||||||||||
| * request fails fast and pressure is released back to the caller. | ||||||||||||||
| */ | ||||||||||||||
| private ResultSet executeWithRetry(Statement statement) { | ||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the defaults During a Cassandra outage every in-flight thread sleeps through this sequence. On a busy server, hundreds of threads pile up in Consider lowering the defaults so the query-level retry is a short hiccup buffer, while the driver-level
|
||||||||||||||
| int retries = CassandraSessionPool.this.maxRetries; | ||||||||||||||
|
dpol1 marked this conversation as resolved.
|
||||||||||||||
| long interval = CassandraSessionPool.this.retryInterval; | ||||||||||||||
| long maxDelay = CassandraSessionPool.this.retryMaxDelay; | ||||||||||||||
| DriverException lastError = null; | ||||||||||||||
| for (int attempt = 0; attempt <= retries; attempt++) { | ||||||||||||||
| try { | ||||||||||||||
| return this.session.execute(statement); | ||||||||||||||
| } catch (NoHostAvailableException | OperationTimedOutException e) { | ||||||||||||||
| lastError = e; | ||||||||||||||
| if (attempt >= retries) { | ||||||||||||||
| break; | ||||||||||||||
| } | ||||||||||||||
| long cap = maxDelay > 0 ? maxDelay : interval; | ||||||||||||||
| long shift = 1L << Math.min(attempt, 20); | ||||||||||||||
| long delay; | ||||||||||||||
| try { | ||||||||||||||
| // Guard against Long overflow when retryInterval is huge. | ||||||||||||||
| delay = Math.min(Math.multiplyExact(interval, shift), cap); | ||||||||||||||
| } catch (ArithmeticException overflow) { | ||||||||||||||
| delay = cap; | ||||||||||||||
| } | ||||||||||||||
| LOG.warn("Cassandra temporarily unavailable ({}), " + | ||||||||||||||
| "retry {}/{} in {} ms", | ||||||||||||||
| e.getClass().getSimpleName(), attempt + 1, | ||||||||||||||
| retries, delay); | ||||||||||||||
| try { | ||||||||||||||
| Thread.sleep(delay); | ||||||||||||||
| } catch (InterruptedException ie) { | ||||||||||||||
| Thread.currentThread().interrupt(); | ||||||||||||||
| throw new BackendException("Interrupted while " + | ||||||||||||||
| "waiting to retry " + | ||||||||||||||
| "Cassandra query", ie); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| // Preserve original exception as cause (stack trace + type) by | ||||||||||||||
| // pre-formatting the message and using the (String, Throwable) | ||||||||||||||
| // constructor explicitly — avoids ambiguity with varargs overloads. | ||||||||||||||
| String msg = String.format( | ||||||||||||||
| "Failed to execute Cassandra query after %s retries: %s", | ||||||||||||||
| retries, | ||||||||||||||
| lastError == null ? "<null>" : lastError.getMessage()); | ||||||||||||||
| throw new BackendException(msg, lastError); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| private void tryOpen() { | ||||||||||||||
|
|
@@ -255,6 +355,61 @@ public boolean hasChanges() { | |||||||||||||
| return this.batch.size() > 0; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| /** | ||||||||||||||
| * Periodic liveness probe invoked by {@link BackendSessionPool} to | ||||||||||||||
| * recover thread-local sessions after Cassandra has been restarted. | ||||||||||||||
| * Reopens the driver session if it was closed and pings the cluster | ||||||||||||||
| * with a lightweight query. Any failure here is swallowed so the | ||||||||||||||
| * caller can still issue the real query, which will drive retries via | ||||||||||||||
| * {@link #executeWithRetry(Statement)}. | ||||||||||||||
| */ | ||||||||||||||
|
dpol1 marked this conversation as resolved.
|
||||||||||||||
| @Override | ||||||||||||||
| public void reconnectIfNeeded() { | ||||||||||||||
| if (!this.opened) { | ||||||||||||||
| return; | ||||||||||||||
| } | ||||||||||||||
| try { | ||||||||||||||
| if (this.session == null || this.session.isClosed()) { | ||||||||||||||
| this.session = null; | ||||||||||||||
| this.tryOpen(); | ||||||||||||||
| } | ||||||||||||||
| if (this.session != null) { | ||||||||||||||
| this.session.execute(new SimpleStatement(HEALTH_CHECK_CQL)); | ||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When Current flow: Desired flow: Suggested fix: } catch (DriverException e) {
LOG.debug("Cassandra health-check failed, resetting session: {}",
e.getMessage());
this.session = null; // force re-open on next query via opened()
}This also makes the |
||||||||||||||
| } | ||||||||||||||
| } catch (DriverException e) { | ||||||||||||||
| LOG.debug("Cassandra health-check failed, " + | ||||||||||||||
| "will retry on next query: {}", e.getMessage()); | ||||||||||||||
| } finally { | ||||||||||||||
| // Keep opened flag consistent with session: if tryOpen() | ||||||||||||||
| // failed to reopen, clear opened so the next execute() does | ||||||||||||||
| // not NPE before executeWithRetry() can intercept. | ||||||||||||||
| if (this.session == null) { | ||||||||||||||
| this.opened = false; | ||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The normal teardown flow is: If Additionally, @Override
public boolean opened() {
if (this.opened && this.session == null) {
this.tryOpen(); // already handles the re-open
}
return this.opened && this.session != null;
}So the NPE concern in the |
||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| /** | ||||||||||||||
| * Force-close the driver session so it is re-opened on the next | ||||||||||||||
| * {@link #opened()} call. Used when a failure is observed and we | ||||||||||||||
| * want to start fresh on the next attempt. | ||||||||||||||
| */ | ||||||||||||||
| @Override | ||||||||||||||
| public void reset() { | ||||||||||||||
| if (this.session == null) { | ||||||||||||||
| return; | ||||||||||||||
| } | ||||||||||||||
| try { | ||||||||||||||
| this.session.close(); | ||||||||||||||
| } catch (Exception e) { | ||||||||||||||
| // Do not swallow Error (OOM / StackOverflow); only log | ||||||||||||||
| // ordinary exceptions raised by the driver on close. | ||||||||||||||
| LOG.warn("Failed to reset Cassandra session", e); | ||||||||||||||
| } finally { | ||||||||||||||
| this.session = null; | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| public Collection<Statement> statements() { | ||||||||||||||
| return this.batch.getStatements(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reconnect_base_delayCASSANDRA_RECONNECT_BASE_DELAYis read twice from config: once in the constructor (line 72) to validate thebase ≤ maxconstraint, and again here inopen()(line 120-121) to build the reconnection policy.The constructor already stores
reconnectMaxasthis.retryMaxDelay, butreconnectBaseis discarded after validation. This means:config.get()is called redundantly on everyopen().Suggestion: store
reconnectBaseas a field alongsideretryMaxDelayin the constructor, then usethis.retryBaseDelayhere: