-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Iceberg Add Files #37701
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Iceberg Add Files #37701
Changes from all commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
9b9d553
add files transform and schematransform
ahmedabu98 98328b8
minor fixes
ahmedabu98 49936ab
add tests
ahmedabu98 096f9de
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 3b2e86d
create table if needed; determine partition spec from metrics
ahmedabu98 f507d0a
spotless
ahmedabu98 9045996
add batch route as well
ahmedabu98 d4a0db1
spotless
ahmedabu98 e9e55b1
extract SchemaTransform logic out
ahmedabu98 194fbfd
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 e983676
add integration tests
ahmedabu98 f5ba17e
spotless
ahmedabu98 f5ace5c
spotless
ahmedabu98 15df0e7
fix deps
ahmedabu98 1f9272f
add a few more tests; add error handling output
ahmedabu98 e7f949d
clarify comments
ahmedabu98 a4e4ec9
add comment
ahmedabu98 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| { | ||
| "comment": "Modify this file in a trivial way to cause this test suite to run.", | ||
| "modification": 4 | ||
| "modification": 1 | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
671 changes: 671 additions & 0 deletions
671
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java
Large diffs are not rendered by default.
Oops, something went wrong.
190 changes: 190 additions & 0 deletions
190
...iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,190 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.sdk.io.iceberg; | ||
|
|
||
| import static org.apache.beam.sdk.io.iceberg.AddFiles.ERROR_TAG; | ||
| import static org.apache.beam.sdk.io.iceberg.AddFiles.OUTPUT_TAG; | ||
| import static org.apache.beam.sdk.io.iceberg.AddFilesSchemaTransformProvider.Configuration; | ||
| import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; | ||
|
|
||
| import com.google.auto.service.AutoService; | ||
| import com.google.auto.value.AutoValue; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import org.apache.beam.sdk.schemas.AutoValueSchema; | ||
| import org.apache.beam.sdk.schemas.Schema; | ||
| import org.apache.beam.sdk.schemas.annotations.DefaultSchema; | ||
| import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; | ||
| import org.apache.beam.sdk.schemas.transforms.SchemaTransform; | ||
| import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; | ||
| import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; | ||
| import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling; | ||
| import org.apache.beam.sdk.transforms.Filter; | ||
| import org.apache.beam.sdk.transforms.MapElements; | ||
| import org.apache.beam.sdk.values.PCollectionRowTuple; | ||
| import org.apache.beam.sdk.values.TypeDescriptors; | ||
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; | ||
| import org.checkerframework.checker.nullness.qual.Nullable; | ||
| import org.joda.time.Duration; | ||
|
|
||
| @AutoService(SchemaTransformProvider.class) | ||
| public class AddFilesSchemaTransformProvider extends TypedSchemaTransformProvider<Configuration> { | ||
| @Override | ||
| public AddFilesSchemaTransform from(Configuration configuration) { | ||
| return new AddFilesSchemaTransform(configuration); | ||
| } | ||
|
|
||
| @Override | ||
| public String identifier() { | ||
| return "beam:schematransform:iceberg_add_files:v1"; | ||
| } | ||
|
|
||
| @DefaultSchema(AutoValueSchema.class) | ||
| @AutoValue | ||
| public abstract static class Configuration { | ||
| public static Builder builder() { | ||
| return new AutoValue_AddFilesSchemaTransformProvider_Configuration.Builder(); | ||
| } | ||
|
|
||
| @SchemaFieldDescription("A fully-qualified table identifier.") | ||
| public abstract String getTable(); | ||
|
|
||
| @SchemaFieldDescription("Properties used to set up the Iceberg catalog.") | ||
| public abstract @Nullable Map<String, String> getCatalogProperties(); | ||
|
|
||
| @SchemaFieldDescription("Properties passed to the Hadoop ") | ||
| public abstract @Nullable Map<String, String> getConfigProperties(); | ||
|
|
||
| @SchemaFieldDescription( | ||
| "For a streaming pipeline, sets the frequency at which incoming files are appended. Defaults to 600 (10 minutes). " | ||
| + "A commit is triggered when either this or append batch size is reached.") | ||
| public abstract @Nullable Integer getTriggeringFrequencySeconds(); | ||
|
|
||
| @SchemaFieldDescription( | ||
| "For a streaming pipeline, sets the desired number of appended files per commit. Defaults to 100,000 files. " | ||
| + "A commit is triggered when either this or append triggering interval is reached.") | ||
| public abstract @Nullable Integer getAppendBatchSize(); | ||
|
|
||
| @SchemaFieldDescription( | ||
| "The prefix shared among all partitions. For example, a data file may have the following" | ||
| + " location:%n" | ||
| + "'gs://bucket/namespace/table/data/id=13/name=beam/data_file.parquet'%n%n" | ||
| + "The provided prefix should go up until the partition information:%n" | ||
| + "'gs://bucket/namespace/table/data/'.%n" | ||
| + "If not provided, will try determining each DataFile's partition from its metrics metadata.") | ||
| public abstract @Nullable String getLocationPrefix(); | ||
|
|
||
| @SchemaFieldDescription( | ||
| "Fields used to create a partition spec that is applied when tables are created. For a field 'foo', " | ||
| + "the available partition transforms are:\n\n" | ||
| + "- `foo`\n" | ||
| + "- `truncate(foo, N)`\n" | ||
| + "- `bucket(foo, N)`\n" | ||
| + "- `hour(foo)`\n" | ||
| + "- `day(foo)`\n" | ||
| + "- `month(foo)`\n" | ||
| + "- `year(foo)`\n" | ||
| + "- `void(foo)`\n\n" | ||
| + "For more information on partition transforms, please visit https://iceberg.apache.org/spec/#partition-transforms.") | ||
| public abstract @Nullable List<String> getPartitionFields(); | ||
|
|
||
| @SchemaFieldDescription( | ||
| "Iceberg table properties to be set on the table when it is created.\n" | ||
| + "For more information on table properties," | ||
| + " please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties.") | ||
| public abstract @Nullable Map<String, String> getTableProperties(); | ||
|
|
||
| @SchemaFieldDescription("This option specifies whether and where to output unwritable rows.") | ||
| public abstract @Nullable ErrorHandling getErrorHandling(); | ||
|
|
||
| @AutoValue.Builder | ||
| public abstract static class Builder { | ||
| public abstract Builder setTable(String table); | ||
|
|
||
| public abstract Builder setCatalogProperties(Map<String, String> catalogProperties); | ||
|
|
||
| public abstract Builder setConfigProperties(Map<String, String> confProperties); | ||
|
|
||
| public abstract Builder setTriggeringFrequencySeconds(Integer triggeringFrequencySeconds); | ||
|
|
||
| public abstract Builder setAppendBatchSize(Integer size); | ||
|
|
||
| public abstract Builder setLocationPrefix(String prefix); | ||
|
|
||
| public abstract Builder setPartitionFields(List<String> fields); | ||
|
|
||
| public abstract Builder setTableProperties(Map<String, String> props); | ||
|
|
||
| public abstract Builder setErrorHandling(ErrorHandling errorHandling); | ||
|
|
||
| public abstract Configuration build(); | ||
| } | ||
|
|
||
| public IcebergCatalogConfig getIcebergCatalog() { | ||
| return IcebergCatalogConfig.builder() | ||
| .setCatalogProperties(getCatalogProperties()) | ||
| .setConfigProperties(getConfigProperties()) | ||
| .build(); | ||
| } | ||
| } | ||
|
|
||
| public static class AddFilesSchemaTransform extends SchemaTransform { | ||
| private final Configuration configuration; | ||
|
|
||
| public AddFilesSchemaTransform(Configuration configuration) { | ||
| this.configuration = configuration; | ||
| } | ||
|
|
||
| @Override | ||
| public PCollectionRowTuple expand(PCollectionRowTuple input) { | ||
| Schema inputSchema = input.getSinglePCollection().getSchema(); | ||
| Preconditions.checkState( | ||
| inputSchema.getFieldCount() == 1 | ||
| && inputSchema.getField(0).getType().getTypeName().equals(Schema.TypeName.STRING), | ||
| "Incoming Row Schema must contain only one field of type String. Instead, got schema: %s", | ||
| inputSchema); | ||
|
|
||
| @Nullable Integer frequency = configuration.getTriggeringFrequencySeconds(); | ||
|
|
||
| PCollectionRowTuple result = | ||
| input | ||
| .getSinglePCollection() | ||
| .apply("Filter empty paths", Filter.by(row -> row.getString(0) != null)) | ||
| .apply( | ||
| "ExtractPaths", | ||
| MapElements.into(TypeDescriptors.strings()) | ||
| .via(row -> checkStateNotNull(row.getString(0)))) | ||
| .apply( | ||
| new AddFiles( | ||
| configuration.getIcebergCatalog(), | ||
| configuration.getTable(), | ||
| configuration.getLocationPrefix(), | ||
| configuration.getPartitionFields(), | ||
| configuration.getTableProperties(), | ||
| configuration.getAppendBatchSize(), | ||
| frequency != null ? Duration.standardSeconds(frequency) : null)); | ||
|
|
||
| PCollectionRowTuple output = PCollectionRowTuple.of("snapshots", result.get(OUTPUT_TAG)); | ||
| ErrorHandling errorHandling = configuration.getErrorHandling(); | ||
| if (errorHandling != null) { | ||
| output = output.and(errorHandling.getOutput(), result.get(ERROR_TAG)); | ||
| } | ||
| return output; | ||
| } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.