Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
* (YAML) Added WriteToDatadog transform ([#38362](https://github.com/apache/beam/issues/38362)).
* (Java) Flink 2.1 and 2.2 support is added ([#38947](https://github.com/apache/beam/issues/38947)) ([#38978](https://github.com/apache/beam/issues/38978)); Flink 1.17 and 1.18 support is dropped.
* (Python) MqttIO is now supported in Python via cross-language ([#21060](https://github.com/apache/beam/issues/21060)).
* Added support for attributing BigQuery API quota and billing to a specific GCP project (quota project): `quota_project_id` parameter in `ReadFromBigQuery` or `--quota_project_id` pipeline option (Python), `--bigQueryQuotaProjectId` pipeline option (Java), and `bigqueryio.WithQuotaProject` read/query option (Go) ([#37431](https://github.com/apache/beam/issues/37431)).

## Breaking Changes

Expand Down
30 changes: 27 additions & 3 deletions sdks/go/pkg/beam/io/bigqueryio/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
bq "google.golang.org/api/bigquery/v2"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
)

// writeSizeLimit is the maximum number of rows allowed by BQ in a write.
Expand Down Expand Up @@ -88,14 +89,14 @@ func NewQualifiedTableName(s string) (QualifiedTableName, error) {
// Read reads all rows from the given table. The table must have a schema
// compatible with the given type, t, and Read returns a PCollection<t>. If the
// table has more rows than t, then Read is implicitly a projection.
func Read(s beam.Scope, project, table string, t reflect.Type) beam.PCollection {
func Read(s beam.Scope, project, table string, t reflect.Type, options ...func(*QueryOptions) error) beam.PCollection {
mustParseTable(table)

s = s.Scope("bigquery.Read")

stmt := constructSelectStatement(t, bigQueryTag, table)

return query(s, project, stmt, t)
return query(s, project, stmt, t, options...)
}

func constructSelectStatement(t reflect.Type, tagKey string, table string) string {
Expand All @@ -114,6 +115,9 @@ func constructSelectStatement(t reflect.Type, tagKey string, table string) strin
type QueryOptions struct {
// UseStandardSQL enables BigQuery's Standard SQL dialect when executing a query.
UseStandardSQL bool
// QuotaProject is the GCP project ID used for quota and billing attribution
// of the BigQuery API calls, if different from the project the data resides in.
QuotaProject string
}

// UseStandardSQL enables BigQuery's Standard SQL dialect when executing a query.
Expand All @@ -124,6 +128,26 @@ func UseStandardSQL() func(qo *QueryOptions) error {
}
}

// WithQuotaProject sets the GCP project ID used for quota and billing
// attribution of the BigQuery API calls, if different from the project the
// data resides in. The credentials used must have the
// serviceusage.services.use permission on that project.
func WithQuotaProject(project string) func(qo *QueryOptions) error {
return func(qo *QueryOptions) error {
qo.QuotaProject = project
return nil
}
}

// clientOptions returns the BigQuery client options implied by qo.
func clientOptions(qo QueryOptions) []option.ClientOption {
var opts []option.ClientOption
if qo.QuotaProject != "" {
opts = append(opts, option.WithQuotaProject(qo.QuotaProject))
}
return opts
}

// Query executes a query. The output must have a schema compatible with the given
// type, t. It returns a PCollection<t>.
func Query(s beam.Scope, project, q string, t reflect.Type, options ...func(*QueryOptions) error) beam.PCollection {
Expand Down Expand Up @@ -157,7 +181,7 @@ type queryFn struct {
}

func (f *queryFn) ProcessElement(ctx context.Context, _ []byte, emit func(beam.X)) error {
client, err := bigquery.NewClient(ctx, f.Project)
client, err := bigquery.NewClient(ctx, f.Project, clientOptions(f.Options)...)
if err != nil {
return err
}
Expand Down
19 changes: 19 additions & 0 deletions sdks/go/pkg/beam/io/bigqueryio/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,22 @@ func Test_mustInferSchema(t *testing.T) {
})
}
}

func TestWithQuotaProject(t *testing.T) {
qo := QueryOptions{}
if err := WithQuotaProject("quota-project")(&qo); err != nil {
t.Fatalf("WithQuotaProject() returned err: %v", err)
}
if got, want := qo.QuotaProject, "quota-project"; got != want {
t.Errorf("qo.QuotaProject = %q, want %q", got, want)
}
}

func TestClientOptions(t *testing.T) {
if got := clientOptions(QueryOptions{}); len(got) != 0 {
t.Errorf("clientOptions(no quota) = %d options, want 0", len(got))
}
if got := clientOptions(QueryOptions{QuotaProject: "quota-project"}); len(got) != 1 {
t.Errorf("clientOptions(with quota) = %d options, want 1", len(got))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,16 @@ public interface BigQueryOptions

void setBigQueryProject(String value);

@Description(
"GCP project ID used for quota and billing attribution of BigQuery API requests "
+ "(sets the X-Goog-User-Project header), if different from the project the data "
+ "resides in. If unspecified, the project associated with the credentials is used. "
+ "The credentials used must have the serviceusage.services.use permission on this "
+ "project.")
String getBigQueryQuotaProjectId();

void setBigQueryQuotaProjectId(String value);

@Description("Maximum (best effort) size of a single append to the storage API.")
@Default.Integer(2 * 1024 * 1024)
Integer getStorageApiAppendThresholdBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import com.google.api.services.bigquery.model.TableRow;
import com.google.auth.Credentials;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest;
Expand Down Expand Up @@ -1738,6 +1739,27 @@ private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws Inte
}
}

/**
* Returns credentials with the quota project applied, if one is configured and the credentials
* support it. The quota project sets the {@code X-Goog-User-Project} header so that BigQuery API
* requests are billed against that project's quota.
*/
@VisibleForTesting
static @Nullable Credentials maybeWithQuotaProjectId(
@Nullable Credentials credential, @Nullable String quotaProjectId) {
if (Strings.isNullOrEmpty(quotaProjectId) || credential == null) {
return credential;
}
if (credential instanceof GoogleCredentials) {
return ((GoogleCredentials) credential).createWithQuotaProject(quotaProjectId);
}
LOG.warn(
"Credentials of type {} do not support a quota project. "
+ "The bigQueryQuotaProjectId option will be ignored.",
credential.getClass().getName());
return credential;
}

/** Returns a BigQuery client builder using the specified {@link BigQueryOptions}. */
private static Bigquery.Builder newBigQueryClient(BigQueryOptions options) {
// Do not log 404. It clutters the output and is possibly even required by the
Expand All @@ -1748,7 +1770,8 @@ private static Bigquery.Builder newBigQueryClient(BigQueryOptions options) {
httpRequestInitializer.setReadTimeout(options.getHTTPReadTimeout());
httpRequestInitializer.setWriteTimeout(options.getHTTPWriteTimeout());
ImmutableList.Builder<HttpRequestInitializer> initBuilder = ImmutableList.builder();
Credentials credential = options.getGcpCredential();
Credentials credential =
maybeWithQuotaProjectId(options.getGcpCredential(), options.getBigQueryQuotaProjectId());
initBuilder.add(
credential == null
? new NullCredentialInitializer()
Expand Down Expand Up @@ -1787,6 +1810,10 @@ private static BigQueryWriteClient newBigQueryWriteClient(BigQueryOptions option
if (!Strings.isNullOrEmpty(endpoint)) {
builder.setEndpoint(trimSchemaIfNecessary(endpoint));
}
@Nullable String quotaProjectId = options.getBigQueryQuotaProjectId();
if (!Strings.isNullOrEmpty(quotaProjectId)) {
builder.setQuotaProjectId(quotaProjectId);
}
return BigQueryWriteClient.create(
builder
.setCredentialsProvider(() -> options.as(GcpOptions.class).getGcpCredential())
Expand Down Expand Up @@ -1911,6 +1938,10 @@ public void onRetryAttempt(Status status, Metadata metadata) {
if (!Strings.isNullOrEmpty(endpoint)) {
settingsBuilder.setEndpoint(trimSchemaIfNecessary(endpoint));
}
@Nullable String quotaProjectId = options.getBigQueryQuotaProjectId();
if (!Strings.isNullOrEmpty(quotaProjectId)) {
settingsBuilder.setQuotaProjectId(quotaProjectId);
}

UnaryCallSettings.Builder<CreateReadSessionRequest, ReadSession> createReadSessionSettings =
settingsBuilder.getStubSettingsBuilder().createReadSessionSettings();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.auth.Credentials;
import com.google.auth.oauth2.AccessToken;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.QuotaProjectIdProvider;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
Expand All @@ -87,6 +91,7 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
Expand Down Expand Up @@ -2173,6 +2178,71 @@ public RetryInfo parseBytes(byte[] serialized) {
assertEquals(123456, (long) container.getCounter(metricName).getCumulative());
}

@Test
public void testQuotaProjectIdOverrides() throws IOException {
BigQueryOptions options = PipelineOptionsFactory.create().as(BigQueryOptions.class);
options.setBigQueryQuotaProjectId("my-quota-project");

assertEquals(
"my-quota-project",
new BigQueryServicesImpl.StorageClientImpl(options)
.getClient()
.getSettings()
.getQuotaProjectId());
assertEquals(
"my-quota-project",
new BigQueryServicesImpl.WriteStreamServiceImpl(options)
.getClient()
.getSettings()
.getQuotaProjectId());
}

@Test
public void testMaybeWithQuotaProjectId() throws IOException {
GoogleCredentials credentials =
GoogleCredentials.create(AccessToken.newBuilder().setTokenValue("fake-token").build());

Credentials withQuota =
BigQueryServicesImpl.maybeWithQuotaProjectId(credentials, "my-quota-project");
assertEquals("my-quota-project", ((QuotaProjectIdProvider) withQuota).getQuotaProjectId());

// No quota project configured: credentials are returned unchanged.
assertEquals(credentials, BigQueryServicesImpl.maybeWithQuotaProjectId(credentials, null));
// Empty quota project is treated as unset.
assertEquals(credentials, BigQueryServicesImpl.maybeWithQuotaProjectId(credentials, ""));
// Null credentials pass through.
assertNull(BigQueryServicesImpl.maybeWithQuotaProjectId(null, "my-quota-project"));

// Credentials that don't support a quota project are returned unchanged.
Credentials unsupported =
new Credentials() {
@Override
public String getAuthenticationType() {
return "test";
}

@Override
public Map<String, List<String>> getRequestMetadata(java.net.URI uri) {
return Collections.emptyMap();
}

@Override
public boolean hasRequestMetadata() {
return false;
}

@Override
public boolean hasRequestMetadataOnly() {
return true;
}

@Override
public void refresh() {}
};
assertEquals(
unsupported, BigQueryServicesImpl.maybeWithQuotaProjectId(unsupported, "my-quota-project"));
}

@Test
public void testEndpointOverrides() throws IOException {
BigQueryOptions options = PipelineOptionsFactory.create().as(BigQueryOptions.class);
Expand Down
42 changes: 42 additions & 0 deletions sdks/python/apache_beam/internal/gcp/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,48 @@ def get_service_credentials(pipeline_options):
return _Credentials.get_service_credentials(pipeline_options)


def with_quota_project(credentials, quota_project_id):
"""For internal use only; no backwards-compatibility guarantees.

Apply a quota project to credentials if supported.

The quota project is used to bill API requests to a specific GCP project,
separate from the project that owns the service account or data.

Args:
credentials: The credentials object (either _ApitoolsCredentialsAdapter
or a google.auth credentials object).
quota_project_id: The GCP project ID to use for quota and billing.

Returns:
Credentials with the quota project applied, or the original credentials
if quota project is not supported or credentials is None.
"""
if not _GOOGLE_AUTH_AVAILABLE or credentials is None or (quota_project_id
is None):
return credentials

# Get the underlying google-auth credentials if wrapped
if hasattr(credentials, 'get_google_auth_credentials'):
underlying_creds = credentials.get_google_auth_credentials()
else:
underlying_creds = credentials

# Apply quota project if supported
if hasattr(underlying_creds, 'with_quota_project'):
new_creds = underlying_creds.with_quota_project(quota_project_id)
# Re-wrap if the original was wrapped
if hasattr(credentials, 'get_google_auth_credentials'):
return _ApitoolsCredentialsAdapter(new_creds)
return new_creds

_LOGGER.warning(
'Credentials of type %s do not support quota project. '
'The quota_project_id parameter will be ignored.',
type(underlying_creds).__name__)
return credentials


if _GOOGLE_AUTH_AVAILABLE:

class _ApitoolsCredentialsAdapter:
Expand Down
60 changes: 60 additions & 0 deletions sdks/python/apache_beam/internal/gcp/auth_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,5 +132,65 @@ def raise_(scopes=None):
auth._LOGGER.removeHandler(loggerHandler)


@unittest.skipIf(gauth is None, 'Google Auth dependencies are not installed')
class WithQuotaProjectTest(unittest.TestCase):
"""Tests for with_quota_project function."""
def test_with_quota_project_returns_credentials_unchanged_when_none(self):
"""Test that None credentials are returned unchanged."""
result = auth.with_quota_project(None, 'my-project')
self.assertIsNone(result)

def test_with_quota_project_returns_credentials_unchanged_when_no_quota(self):
"""Test that credentials are returned unchanged when
quota_project_id is None."""
mock_creds = mock.MagicMock()
result = auth.with_quota_project(mock_creds, None)
self.assertEqual(result, mock_creds)
mock_creds.with_quota_project.assert_not_called()

@mock.patch('apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter')
def test_with_quota_project_applies_quota_to_wrapped_credentials(
self, mock_adapter_class):
"""Test that quota project is applied to wrapped credentials."""
mock_inner_creds = mock.MagicMock()
mock_new_creds = mock.MagicMock()
mock_inner_creds.with_quota_project.return_value = mock_new_creds

mock_adapter = mock.MagicMock()
mock_adapter.get_google_auth_credentials.return_value = mock_inner_creds

mock_adapter_instance = mock.MagicMock()
mock_adapter_class.return_value = mock_adapter_instance

result = auth.with_quota_project(mock_adapter, 'my-billing-project')

mock_inner_creds.with_quota_project.assert_called_once_with(
'my-billing-project')
# Result should be a new adapter wrapping the new credentials
mock_adapter_class.assert_called_once_with(mock_new_creds)
self.assertEqual(result, mock_adapter_instance)

def test_with_quota_project_applies_quota_to_direct_credentials(self):
"""Test that quota project is applied to direct credentials."""
mock_creds = mock.MagicMock(spec=['with_quota_project'])
mock_new_creds = mock.MagicMock()
mock_creds.with_quota_project.return_value = mock_new_creds

result = auth.with_quota_project(mock_creds, 'my-billing-project')

mock_creds.with_quota_project.assert_called_once_with('my-billing-project')
self.assertEqual(result, mock_new_creds)

def test_with_quota_project_returns_original_when_not_supported(self):
"""Test that original credentials are returned when
with_quota_project is not supported."""
# Create a mock without with_quota_project method
mock_creds = mock.MagicMock(spec=[])

result = auth.with_quota_project(mock_creds, 'my-billing-project')

self.assertEqual(result, mock_creds)


if __name__ == '__main__':
unittest.main()
Loading
Loading