Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,46 @@ public static synchronized CassandraOptions instance() {
positiveInt(),
12 * 60 * 60
);

public static final ConfigOption<Long> CASSANDRA_RECONNECT_BASE_DELAY =
new ConfigOption<>(
"cassandra.reconnect_base_delay",
"The base delay in milliseconds used by the driver's " +
"exponential reconnection policy when a Cassandra host " +
"becomes unreachable.",
rangeInt(100L, Long.MAX_VALUE),
1000L
);

public static final ConfigOption<Long> CASSANDRA_RECONNECT_MAX_DELAY =
new ConfigOption<>(
"cassandra.reconnect_max_delay",
"The maximum delay in milliseconds used by the driver's " +
"exponential reconnection policy when a Cassandra host " +
"becomes unreachable.",
rangeInt(1000L, Long.MAX_VALUE),
60_000L
);

public static final ConfigOption<Integer> CASSANDRA_RECONNECT_MAX_RETRIES =
new ConfigOption<>(
"cassandra.reconnect_max_retries",
"The maximum number of retries applied at query-time when " +
"a Cassandra host is temporarily unreachable " +
"(NoHostAvailableException / OperationTimedOutException). " +
"Set to 0 to disable query-time retries.",
rangeInt(0, Integer.MAX_VALUE),
10
);

public static final ConfigOption<Long> CASSANDRA_RECONNECT_INTERVAL =
new ConfigOption<>(
"cassandra.reconnect_interval",
"The interval in milliseconds between query-time retries " +
"when a Cassandra host is temporarily unreachable. The " +
"actual wait grows with exponential backoff, capped at " +
"cassandra.reconnect_max_delay.",
rangeInt(100L, Long.MAX_VALUE),
5000L
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.hugegraph.backend.store.BackendSessionPool;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.Cluster;
Expand All @@ -34,22 +36,49 @@
import com.datastax.driver.core.ProtocolOptions.Compression;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.OperationTimedOutException;
import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;

public class CassandraSessionPool extends BackendSessionPool {

private static final Logger LOG = Log.logger(CassandraSessionPool.class);

private static final int SECOND = 1000;
private static final String HEALTH_CHECK_CQL =
"SELECT now() FROM system.local";

private Cluster cluster;
private final String keyspace;
private final int maxRetries;
private final long retryInterval;
private final long retryMaxDelay;

public CassandraSessionPool(HugeConfig config,
String keyspace, String store) {
super(config, keyspace + "/" + store);
this.cluster = null;
this.keyspace = keyspace;
this.maxRetries = config.get(
CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES);
this.retryInterval = config.get(
CassandraOptions.CASSANDRA_RECONNECT_INTERVAL);
long reconnectBase = config.get(
CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY);
long reconnectMax = config.get(
CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY);
E.checkArgument(reconnectMax >= reconnectBase,
"'%s' (%s) must be >= '%s' (%s)",
CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(),
reconnectMax,
CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(),
reconnectBase);
this.retryMaxDelay = reconnectMax;
}

@Override
Expand Down Expand Up @@ -86,6 +115,14 @@ public synchronized void open() {

builder.withSocketOptions(socketOptions);

// Reconnection policy: let driver keep retrying nodes in background
// with exponential backoff after they go down (see issue #2740).
long reconnectBase = config.get(
CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Duplicate config read for reconnect_base_delay

CASSANDRA_RECONNECT_BASE_DELAY is read twice from config: once in the constructor (line 72) to validate the base ≤ max constraint, and again here in open() (line 120-121) to build the reconnection policy.

The constructor already stores reconnectMax as this.retryMaxDelay, but reconnectBase is discarded after validation. This means:

  1. config.get() is called redundantly on every open().
  2. If the config object were mutable (or in tests with different config instances), the two reads could theoretically diverge.

Suggestion: store reconnectBase as a field alongside retryMaxDelay in the constructor, then use this.retryBaseDelay here:

// constructor
this.retryBaseDelay = reconnectBase;   // add this field
this.retryMaxDelay  = reconnectMax;

// open()
builder.withReconnectionPolicy(
        new ExponentialReconnectionPolicy(this.retryBaseDelay,
                                          this.retryMaxDelay));

builder.withReconnectionPolicy(
new ExponentialReconnectionPolicy(reconnectBase,
this.retryMaxDelay));

// Credential options
String username = config.get(CassandraOptions.CASSANDRA_USERNAME);
String password = config.get(CassandraOptions.CASSANDRA_PASSWORD);
Expand Down Expand Up @@ -161,7 +198,7 @@ public void rollback() {

@Override
public ResultSet commit() {
ResultSet rs = this.session.execute(this.batch);
ResultSet rs = this.executeWithRetry(this.batch);
// Clear batch if execute() successfully (retained if failed)
this.batch.clear();
return rs;
Expand All @@ -174,6 +211,7 @@ public void commitAsync() {
int processors = Math.min(statements.size(), 1023);
List<ResultSetFuture> results = new ArrayList<>(processors + 1);
for (Statement s : statements) {
// TODO: commitAsync is not retried (async retry semantics are complex)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

‼️ commitAsync() has no retry protection — inconsistent write reliability

commit() (sync path) wraps all statements in executeWithRetry, but commitAsync() calls this.session.executeAsync(s) directly. This means write operations on the async path get no protection against transient NoHostAvailableException / OperationTimedOutException — exactly the failures this PR targets.

Timeline during a Cassandra restart:

Thread A (sync):   commit() → executeWithRetry() → retries → succeeds ✓
Thread B (async):  commitAsync() → executeAsync() → NoHostAvailableException ✗

Both paths commit the same batch type; users cannot know which one they're calling or whether it will be protected.

If implementing async retry is genuinely deferred, the TODO should be more explicit about the consequence — callers may see write failures during a Cassandra restart even with maxRetries > 0 configured. Consider at minimum logging a warning at startup when maxRetries > 0.

ResultSetFuture future = this.session.executeAsync(s);
results.add(future);

Expand All @@ -197,15 +235,77 @@ public ResultSet query(Statement statement) {
}

public ResultSet execute(Statement statement) {
return this.session.execute(statement);
return this.executeWithRetry(statement);
}

public ResultSet execute(String statement) {
return this.session.execute(statement);
return this.executeWithRetry(new SimpleStatement(statement));
}

public ResultSet execute(String statement, Object... args) {
return this.session.execute(statement, args);
return this.executeWithRetry(new SimpleStatement(statement, args));
}

/**
Comment thread
dpol1 marked this conversation as resolved.
* Execute a statement, retrying on transient connectivity failures
* (NoHostAvailableException / OperationTimedOutException). The driver
* itself keeps retrying connections in the background via the
* reconnection policy, so once Cassandra comes back online, a
* subsequent attempt here will succeed without restarting the server.
*
* <p><b>Blocking note:</b> retries block the calling thread via
* {@link Thread#sleep(long)}. Worst-case a single call blocks for
* {@code maxRetries * retryMaxDelay} ms. Under high-throughput
* workloads concurrent threads may pile up in {@code sleep()} during
* a Cassandra outage. For such deployments lower
* {@code cassandra.reconnect_max_retries} (default 10) and
* {@code cassandra.reconnect_max_delay} (default 60000ms) so the
* request fails fast and pressure is released back to the caller.
*/
private ResultSet executeWithRetry(Statement statement) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Default settings allow a single query call to block for up to ~7 minutes

With the defaults maxRetries = 10, reconnect_interval = 5000 ms, and reconnect_max_delay = 60 000 ms, the worst-case wall time for one executeWithRetry() call is:

attempt 0 → sleep(min(5000 * 1, 60000)) =  5 000 ms
attempt 1 → sleep(min(5000 * 2, 60000)) = 10 000 ms
attempt 2 → sleep(min(5000 * 4, 60000)) = 20 000 ms
attempt 3 → sleep(min(5000 * 8, 60000)) = 40 000 ms
attempts 4–9    → sleep(60 000 ms each) = 360 000 ms
                                          ──────────
total blocking time ≈ 435 seconds (~7 min)

During a Cassandra outage every in-flight thread sleeps through this sequence. On a busy server, hundreds of threads pile up in Thread.sleep(), exhausting the thread pool well before maxRetries is hit.

Consider lowering the defaults so the query-level retry is a short hiccup buffer, while the driver-level ExponentialReconnectionPolicy handles the actual node reconnection in the background:

Option Current default Suggested
reconnect_max_retries 10 3
reconnect_interval 5 000 ms 1 000 ms
reconnect_max_delay 60 000 ms 10 000 ms

int retries = CassandraSessionPool.this.maxRetries;
Comment thread
dpol1 marked this conversation as resolved.
long interval = CassandraSessionPool.this.retryInterval;
long maxDelay = CassandraSessionPool.this.retryMaxDelay;
DriverException lastError = null;
for (int attempt = 0; attempt <= retries; attempt++) {
try {
return this.session.execute(statement);
} catch (NoHostAvailableException | OperationTimedOutException e) {
lastError = e;
if (attempt >= retries) {
break;
}
long cap = maxDelay > 0 ? maxDelay : interval;
long shift = 1L << Math.min(attempt, 20);
long delay;
try {
// Guard against Long overflow when retryInterval is huge.
delay = Math.min(Math.multiplyExact(interval, shift), cap);
} catch (ArithmeticException overflow) {
delay = cap;
}
LOG.warn("Cassandra temporarily unavailable ({}), " +
"retry {}/{} in {} ms",
e.getClass().getSimpleName(), attempt + 1,
retries, delay);
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new BackendException("Interrupted while " +
"waiting to retry " +
"Cassandra query", ie);
}
}
}
// Preserve original exception as cause (stack trace + type) by
// pre-formatting the message and using the (String, Throwable)
// constructor explicitly — avoids ambiguity with varargs overloads.
String msg = String.format(
"Failed to execute Cassandra query after %s retries: %s",
retries,
lastError == null ? "<null>" : lastError.getMessage());
throw new BackendException(msg, lastError);
}

private void tryOpen() {
Expand Down Expand Up @@ -255,6 +355,61 @@ public boolean hasChanges() {
return this.batch.size() > 0;
}

/**
* Periodic liveness probe invoked by {@link BackendSessionPool} to
* recover thread-local sessions after Cassandra has been restarted.
* Reopens the driver session if it was closed and pings the cluster
* with a lightweight query. Any failure here is swallowed so the
* caller can still issue the real query, which will drive retries via
* {@link #executeWithRetry(Statement)}.
*/
Comment thread
dpol1 marked this conversation as resolved.
@Override
public void reconnectIfNeeded() {
if (!this.opened) {
return;
}
try {
if (this.session == null || this.session.isClosed()) {
this.session = null;
this.tryOpen();
}
if (this.session != null) {
this.session.execute(new SimpleStatement(HEALTH_CHECK_CQL));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Health-check failure silently leaves the dead session in place

When session.execute(HEALTH_CHECK_CQL) throws a DriverException, the exception is caught and logged at DEBUG — but the session object that produced the failure is kept. The probe's purpose is to detect a dead session early; if it fails the session should be marked unhealthy immediately.

Current flow:

health-check → DriverException
  → LOG.debug(...)
  → session is NOT cleared
  → next executeWithRetry() tries the same dead session
    → pays the full retry cost anyway

Desired flow:

health-check → DriverException
  → session = null   ← mark unhealthy now
  → opened() will call tryOpen() before the next real query

Suggested fix:

} catch (DriverException e) {
    LOG.debug("Cassandra health-check failed, resetting session: {}",
              e.getMessage());
    this.session = null;   // force re-open on next query via opened()
}

This also makes the finally block that sets this.opened = false unnecessary (see separate comment on that).

}
} catch (DriverException e) {
LOG.debug("Cassandra health-check failed, " +
"will retry on next query: {}", e.getMessage());
} finally {
// Keep opened flag consistent with session: if tryOpen()
// failed to reopen, clear opened so the next execute() does
// not NPE before executeWithRetry() can intercept.
if (this.session == null) {
this.opened = false;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

‼️ Mutating this.opened in reconnectIfNeeded() breaks parent-class lifecycle invariants

opened is a field of AbstractBackendSession that tracks the ref-counted lifecycle managed by BackendSessionPool. Setting it to false here bypasses that contract.

The normal teardown flow is:

BackendSessionPool.closeSession()
  → session.detach()         // decrements ref
  → if ref == 0: session.close()
  → threadLocalSession.remove()
  → sessions.remove(threadId)
  → sessionCount.decrementAndGet()

If this.opened = false is set here instead, the session object stays registered in sessions and sessionCount is never decremented. On the next getOrNewSession() call the pool sees opened == false and creates a new session, leaking the old entry.

Additionally, opened() already guards against a null session:

@Override
public boolean opened() {
    if (this.opened && this.session == null) {
        this.tryOpen();           // already handles the re-open
    }
    return this.opened && this.session != null;
}

So the NPE concern in the finally comment is already handled by opened(). The finally block can be removed entirely — just catch the exception and log, letting the normal opened() / executeWithRetry() path handle the rest.

}
}
}

/**
* Force-close the driver session so it is re-opened on the next
* {@link #opened()} call. Used when a failure is observed and we
* want to start fresh on the next attempt.
*/
@Override
public void reset() {
if (this.session == null) {
return;
}
try {
this.session.close();
} catch (Exception e) {
// Do not swallow Error (OOM / StackOverflow); only log
// ordinary exceptions raised by the driver on close.
LOG.warn("Failed to reset Cassandra session", e);
} finally {
this.session = null;
}
}

public Collection<Statement> statements() {
return this.batch.getStatements();
}
Expand Down
Loading
Loading