Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,10 @@
*/
package org.apache.beam.sdk.extensions.gcp.util;

import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.gax.paging.Page;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.model.Bucket;
import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import com.google.auth.Credentials;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage.BlobGetOption;
Expand All @@ -38,9 +33,6 @@
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.BlobResult;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.MissingStrategy;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.OverwriteStrategy;
Expand Down Expand Up @@ -87,86 +79,38 @@ public static GcsCountersOptions create(
public static class GcsUtilFactory implements DefaultValueFactory<GcsUtil> {
@Override
public GcsUtil create(PipelineOptions options) {
GcsOptions gcsOptions = options.as(GcsOptions.class);
Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions);
return new GcsUtil(
storageBuilder.build(),
storageBuilder.getHttpRequestInitializer(),
gcsOptions.getExecutorService(),
ExperimentalOptions.hasExperiment(options, "use_grpc_for_gcs"),
gcsOptions.getGcpCredential(),
gcsOptions.getGcsUploadBufferSizeBytes(),
gcsOptions.getGcsRewriteDataOpBatchLimit(),
GcsCountersOptions.create(
gcsOptions.getEnableBucketReadMetricCounter()
? gcsOptions.getGcsReadCounterPrefix()
: null,
gcsOptions.getEnableBucketWriteMetricCounter()
? gcsOptions.getGcsWriteCounterPrefix()
: null),
gcsOptions);
return new GcsUtil(options);
}
}

/** @deprecated use {@link GcsPath#getNonWildcardPrefix(String)} instead. */
@Deprecated
public static String getNonWildcardPrefix(String globExp) {
return GcsPath.getNonWildcardPrefix(globExp);
}

/** @deprecated use {@link GcsPath#isWildcard(GcsPath)} instead. */
@Deprecated
public static boolean isWildcard(GcsPath spec) {
return GcsPath.isWildcard(spec);
}

@VisibleForTesting
GcsUtil(
Storage storageClient,
HttpRequestInitializer httpRequestInitializer,
ExecutorService executorService,
Boolean shouldUseGrpc,
Credentials credentials,
@Nullable Integer uploadBufferSizeBytes,
@Nullable Integer rewriteDataOpBatchLimit,
GcsCountersOptions gcsCountersOptions,
GcsOptions gcsOptions) {
this.delegate =
new GcsUtilV1(
storageClient,
httpRequestInitializer,
executorService,
shouldUseGrpc,
credentials,
uploadBufferSizeBytes,
rewriteDataOpBatchLimit,
gcsCountersOptions.delegate,
gcsOptions);

if (ExperimentalOptions.hasExperiment(gcsOptions, "use_gcsutil_v2")) {
this.delegateV2 = new GcsUtilV2(gcsOptions);
GcsUtil(PipelineOptions options) {
this.delegate = new GcsUtilV1.GcsUtilFactory().create(options);
if (ExperimentalOptions.hasExperiment(options, "use_gcsutil_v2")) {
this.delegateV2 = new GcsUtilV2.GcsUtilFactory().create(options);
} else {
this.delegateV2 = null;
}
}

protected void setStorageClient(Storage storageClient) {
delegate.setStorageClient(storageClient);
}

protected void setBatchRequestSupplier(Supplier<GcsUtilV1.BatchInterface> supplier) {
delegate.setBatchRequestSupplier(supplier);
}

public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
if (delegateV2 != null) {
return delegateV2.expand(gcsPattern);
}
return delegate.expand(gcsPattern);
}

@VisibleForTesting
@Nullable
Integer getUploadBufferSizeBytes() {
return delegate.getUploadBufferSizeBytes();
}

public long fileSize(GcsPath path) throws IOException {
if (delegateV2 != null) {
return delegateV2.fileSize(path);
Expand All @@ -180,13 +124,6 @@ public StorageObject getObject(GcsPath gcsPath) throws IOException {
return delegate.getObject(gcsPath);
}

/** @deprecated use {@link #getBlob(GcsPath, BlobGetOption...)}. */
@Deprecated
@VisibleForTesting
StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper) throws IOException {
return delegate.getObject(gcsPath, backoff, sleeper);
}

public Blob getBlob(GcsPath gcsPath, BlobGetOption... options) throws IOException {
if (delegateV2 != null) {
return delegateV2.getBlob(gcsPath, options);
Expand Down Expand Up @@ -248,11 +185,6 @@ public Page<Blob> listBlobs(
throw new IOException("GcsUtil V2 not initialized.");
}

@VisibleForTesting
List<Long> fileSizes(List<GcsPath> paths) throws IOException {
return delegate.fileSizes(paths);
}

public SeekableByteChannel open(GcsPath path) throws IOException {
return delegate.open(path);
}
Expand Down Expand Up @@ -389,50 +321,6 @@ public void removeBucket(BucketInfo bucketInfo) throws IOException {
}
}

@VisibleForTesting
boolean bucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
return delegate.bucketAccessible(path, backoff, sleeper);
}

@VisibleForTesting
void verifyBucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
delegate.verifyBucketAccessible(path, backoff, sleeper);
}

@VisibleForTesting
@Nullable
Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
return delegate.getBucket(path, backoff, sleeper);
}

@VisibleForTesting
void createBucket(String projectId, Bucket bucket, BackOff backoff, Sleeper sleeper)
throws IOException {
delegate.createBucket(projectId, bucket, backoff, sleeper);
}

@VisibleForTesting
void removeBucket(Bucket bucket, BackOff backoff, Sleeper sleeper) throws IOException {
delegate.removeBucket(bucket, backoff, sleeper);
}

@VisibleForTesting
List<GcsUtilV1.BatchInterface> makeGetBatches(
Collection<GcsPath> paths, List<StorageObjectOrIOException[]> results) throws IOException {
List<GcsUtilV1.StorageObjectOrIOException[]> legacyResults = new java.util.ArrayList<>();
List<GcsUtilV1.BatchInterface> legacyBatch = delegate.makeGetBatches(paths, legacyResults);

for (GcsUtilV1.StorageObjectOrIOException[] legacyResult : legacyResults) {
StorageObjectOrIOException[] result = new StorageObjectOrIOException[legacyResult.length];
for (int i = 0; i < legacyResult.length; ++i) {
result[i] = StorageObjectOrIOException.fromLegacy(legacyResult[i]);
}
results.add(result);
}

return legacyBatch;
}

public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames)
throws IOException {
delegate.copy(srcFilenames, destFilenames);
Expand Down Expand Up @@ -497,32 +385,6 @@ public void rename(
}
}

@VisibleForTesting
@SuppressWarnings("JdkObsolete") // for LinkedList
java.util.LinkedList<GcsUtilV1.RewriteOp> makeRewriteOps(
Iterable<String> srcFilenames,
Iterable<String> destFilenames,
boolean deleteSource,
boolean ignoreMissingSource,
boolean ignoreExistingDest)
throws IOException {
return delegate.makeRewriteOps(
srcFilenames, destFilenames, deleteSource, ignoreMissingSource, ignoreExistingDest);
}

@VisibleForTesting
@SuppressWarnings("JdkObsolete") // for LinkedList
List<GcsUtilV1.BatchInterface> makeRewriteBatches(
java.util.LinkedList<GcsUtilV1.RewriteOp> rewrites) throws IOException {
return delegate.makeRewriteBatches(rewrites);
}

@VisibleForTesting
List<GcsUtilV1.BatchInterface> makeRemoveBatches(Collection<String> filenames)
throws IOException {
return delegate.makeRemoveBatches(filenames);
}

public void remove(Collection<String> filenames) throws IOException {
delegate.remove(filenames);
}
Expand Down
Loading
Loading