Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.cloud.spanner.DatabaseAdminClient;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.SessionPoolOptions;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
Expand All @@ -38,6 +39,7 @@
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.PartialResultSet;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.sdk.options.ValueProvider;
Expand All @@ -61,6 +63,9 @@ public class SpannerAccessor implements AutoCloseable {
*/
private static final String USER_AGENT_PREFIX = "Apache_Beam_Java";

// Default wait time for session creation
static final java.time.Duration DEFAULT_SESSION_WAIT_DURATION = java.time.Duration.ofMinutes(5);

/** Instance ID to use when connecting to an experimental host. */
public static final String EXPERIMENTAL_HOST_INSTANCE_ID = "default";

Expand Down Expand Up @@ -270,6 +275,15 @@ static SpannerOptions buildSpannerOptions(SpannerConfig spannerConfig) {
builder.setCredentials(credentials.get());
}

ValueProvider<java.time.Duration> waitForSessionCreationDuration =
spannerConfig.getWaitForSessionCreationDuration();
java.time.Duration waitDuration =
Optional.ofNullable(waitForSessionCreationDuration)
.map(ValueProvider::get)
.orElse(DEFAULT_SESSION_WAIT_DURATION);
builder.setSessionPoolOption(
SessionPoolOptions.newBuilder().setWaitForMinSessionsDuration(waitDuration).build());

return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public String getHostValue() {

public abstract @Nullable ValueProvider<Credentials> getCredentials();

public abstract @Nullable ValueProvider<java.time.Duration> getWaitForSessionCreationDuration();

abstract Builder toBuilder();

public static SpannerConfig create() {
Expand Down Expand Up @@ -189,6 +191,9 @@ abstract Builder setExecuteStreamingSqlRetrySettings(

abstract Builder setPlainText(ValueProvider<Boolean> plainText);

abstract Builder setWaitForSessionCreationDuration(
ValueProvider<java.time.Duration> waitForSessionCreationDuration);

public abstract SpannerConfig build();
}

Expand Down Expand Up @@ -389,4 +394,24 @@ public SpannerConfig withUsingPlainTextChannel(ValueProvider<Boolean> plainText)
public SpannerConfig withUsingPlainTextChannel(boolean plainText) {
return withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText));
}

/**
* Sets the wait time for a multiplexed session to be available when creating a database client.
*
* <p>Setting this will block the {@link com.google.cloud.spanner.DatabaseClient} creation.
*
* @param waitForSessionCreationDuration The duration to wait. Defaults to {@link
* SpannerAccessor#DEFAULT_SESSION_WAIT_DURATION}.
* @return {@link SpannerConfig}
*/
public SpannerConfig withWaitForSessionCreationDuration(
ValueProvider<java.time.Duration> waitForSessionCreationDuration) {
return toBuilder().setWaitForSessionCreationDuration(waitForSessionCreationDuration).build();
}

public SpannerConfig withWaitForSessionCreationDuration(
java.time.Duration waitForSessionCreationDuration) {
return withWaitForSessionCreationDuration(
ValueProvider.StaticValueProvider.of(waitForSessionCreationDuration));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.SessionPoolOptions;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Struct;
Expand Down Expand Up @@ -118,6 +119,10 @@ public void setUp() throws Exception {
SpannerOptions.newBuilder()
.setProjectId(project)
.disableGrpcGcpExtension()
.setSessionPoolOption(
SessionPoolOptions.newBuilder()
.setWaitForMinSessionsDuration(java.time.Duration.ofMinutes(5))
.build())
.build()
.getService();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SessionPoolOptions;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
Expand Down Expand Up @@ -125,6 +126,10 @@ public void setUp() throws Exception {
SpannerOptions.newBuilder()
.setProjectId(project)
.disableGrpcGcpExtension()
.setSessionPoolOption(
SessionPoolOptions.newBuilder()
.setWaitForMinSessionsDuration(java.time.Duration.ofMinutes(5))
.build())
.build()
.getService();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.SessionPoolOptions;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import java.util.ArrayList;
Expand Down Expand Up @@ -81,6 +82,10 @@ protected void before() throws Throwable {
.setProjectId(projectId)
.setHost(host)
.disableGrpcGcpExtension()
.setSessionPoolOption(
SessionPoolOptions.newBuilder()
.setWaitForMinSessionsDuration(java.time.Duration.ofMinutes(5))
.build())
.build()
.getService();
databaseAdminClient = spanner.getDatabaseAdminClient();
Expand Down
Loading