diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 34a6e02150e7..b73af5e61a43 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 4 + "modification": 1 } diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index a1f352d05309..bbd55fee2fc8 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -51,6 +51,8 @@ dependencies { implementation library.java.joda_time implementation "org.apache.parquet:parquet-column:$parquet_version" implementation "org.apache.parquet:parquet-hadoop:$parquet_version" + implementation "org.apache.parquet:parquet-common:$parquet_version" + implementation project(":sdks:java:io:parquet") implementation "org.apache.orc:orc-core:$orc_version" implementation "org.apache.iceberg:iceberg-core:$iceberg_version" implementation "org.apache.iceberg:iceberg-api:$iceberg_version" @@ -74,11 +76,13 @@ dependencies { testImplementation library.java.bigdataoss_gcsio testImplementation library.java.bigdataoss_util_hadoop testImplementation "org.apache.parquet:parquet-avro:$parquet_version" - testImplementation "org.apache.parquet:parquet-common:$parquet_version" testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version" testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") testImplementation project(":sdks:java:extensions:google-cloud-platform-core") testImplementation library.java.junit + testImplementation library.java.hamcrest + testImplementation project(path: ":sdks:java:extensions:avro") + testImplementation 'org.awaitility:awaitility:4.2.0' // Hive catalog test dependencies testImplementation project(path: ":sdks:java:io:iceberg:hive") @@ -95,6 +99,7 @@ dependencies { testImplementation project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow") testImplementation project(":sdks:java:io:google-cloud-platform") testImplementation library.java.google_api_services_bigquery + testImplementation 'com.google.cloud:google-cloud-storage' testImplementation library.java.google_auth_library_oauth2_http testRuntimeOnly library.java.slf4j_jdk14 diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java new file mode 100644 index 000000000000..4a164700099a --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java @@ -0,0 +1,671 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.DATA_FILES; +import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.ERRORS; +import static org.apache.beam.sdk.metrics.Metrics.counter; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.sdk.values.PCollection.IsBounded.BOUNDED; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Semaphore; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.io.parquet.ParquetIO.ReadFiles.BeamParquetInputFile; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.GroupIntoBatches; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hasher; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hashing; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.MappingUtil; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.orc.OrcMetrics; +import org.apache.iceberg.parquet.ParquetSchemaUtil; +import org.apache.iceberg.parquet.ParquetUtil; +import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A transform that takes in a stream of file paths, converts them to Iceberg {@link DataFile}s with + * partition metadata and metrics, then commits them to an Iceberg {@link Table}. + */ +public class AddFiles extends PTransform, PCollectionRowTuple> { + static final String OUTPUT_TAG = "snapshots"; + static final String ERROR_TAG = "errors"; + private static final Duration DEFAULT_TRIGGER_INTERVAL = Duration.standardMinutes(10); + private static final Counter numFilesAdded = counter(AddFiles.class, "numFilesAdded"); + private static final Counter numErrorFiles = counter(AddFiles.class, "numErrorFiles"); + private static final Logger LOG = LoggerFactory.getLogger(AddFiles.class); + private static final int DEFAULT_FILES_TRIGGER = 1_000; + static final Schema ERROR_SCHEMA = + Schema.builder().addStringField("file").addStringField("error").build(); + private final IcebergCatalogConfig catalogConfig; + private final String tableIdentifier; + private final Duration intervalTrigger; + private final int numFilesTrigger; + private final @Nullable String locationPrefix; + private final @Nullable List partitionFields; + private final @Nullable Map tableProps; + + public AddFiles( + IcebergCatalogConfig catalogConfig, + String tableIdentifier, + @Nullable String locationPrefix, + @Nullable List partitionFields, + @Nullable Map tableProps, + @Nullable Integer numFilesTrigger, + @Nullable Duration intervalTrigger) { + this.catalogConfig = catalogConfig; + this.tableIdentifier = tableIdentifier; + this.partitionFields = partitionFields; + this.tableProps = tableProps; + this.intervalTrigger = intervalTrigger != null ? intervalTrigger : DEFAULT_TRIGGER_INTERVAL; + this.numFilesTrigger = numFilesTrigger != null ? numFilesTrigger : DEFAULT_FILES_TRIGGER; + this.locationPrefix = locationPrefix; + } + + @Override + public PCollectionRowTuple expand(PCollection input) { + LOG.info( + "AddFiles configured to commit after accumulating {} files, or after {} seconds.", + numFilesTrigger, + intervalTrigger.getStandardSeconds()); + if (!Strings.isNullOrEmpty(locationPrefix)) { + LOG.info( + "AddFiles configured to build partition metadata after the prefix: '{}'", locationPrefix); + } + + PCollectionTuple dataFiles = + input.apply( + "ConvertToDataFiles", + ParDo.of( + new ConvertToDataFile( + catalogConfig, + tableIdentifier, + locationPrefix, + partitionFields, + tableProps)) + .withOutputTags(DATA_FILES, TupleTagList.of(ERRORS))); + SchemaCoder sdfSchema; + try { + sdfSchema = SchemaRegistry.createDefault().getSchemaCoder(SerializableDataFile.class); + } catch (Exception e) { + throw new RuntimeException(e); + } + + PCollection> keyedFiles = + dataFiles + .get(DATA_FILES) + .setCoder(sdfSchema) + .apply("AddStaticKey", WithKeys.of((Void) null)); + + PCollection>> groupedFiles = + keyedFiles.isBounded().equals(BOUNDED) + ? keyedFiles.apply(GroupByKey.create()) + : keyedFiles.apply( + GroupIntoBatches.ofSize(numFilesTrigger) + .withMaxBufferingDuration(intervalTrigger)); + + PCollection snapshots = + groupedFiles + .apply( + "CommitFilesToIceberg", + ParDo.of(new CommitFilesDoFn(catalogConfig, tableIdentifier))) + .setRowSchema(SnapshotInfo.getSchema()); + + return PCollectionRowTuple.of( + OUTPUT_TAG, snapshots, ERROR_TAG, dataFiles.get(ERRORS).setRowSchema(ERROR_SCHEMA)); + } + + static class ConvertToDataFile extends DoFn { + private final IcebergCatalogConfig catalogConfig; + private final String identifier; + public static final TupleTag ERRORS = new TupleTag<>(); + public static final TupleTag DATA_FILES = new TupleTag<>(); + private final @Nullable String prefix; + private final @Nullable List partitionFields; + private final @Nullable Map tableProps; + private transient @MonotonicNonNull Table table; + // Limit open readers to avoid blowing up memory on one worker + private static final int MAX_READERS = 10; + private static final Semaphore ACTIVE_READERS = new Semaphore(MAX_READERS); + + public ConvertToDataFile( + IcebergCatalogConfig catalogConfig, + String identifier, + @Nullable String prefix, + @Nullable List partitionFields, + @Nullable Map tableProps) { + this.catalogConfig = catalogConfig; + this.identifier = identifier; + this.prefix = prefix; + this.partitionFields = partitionFields; + this.tableProps = tableProps; + } + + static final String PREFIX_ERROR = "File path did not start with the specified prefix"; + private static final String UNKNOWN_FORMAT_ERROR = "Could not determine the file's format"; + static final String UNKNOWN_PARTITION_ERROR = "Could not determine the file's partition: "; + + @ProcessElement + public void process(@Element String filePath, MultiOutputReceiver output) + throws IOException, InterruptedException { + FileFormat format; + try { + format = inferFormat(filePath); + } catch (UnknownFormatException e) { + output + .get(ERRORS) + .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, UNKNOWN_FORMAT_ERROR).build()); + numErrorFiles.inc(); + return; + } + + if (table == null) { + table = getOrCreateTable(getSchema(filePath, format)); + } + + // Check if the file path contains the provided prefix + if (table.spec().isPartitioned() + && !Strings.isNullOrEmpty(prefix) + && !filePath.startsWith(checkStateNotNull(prefix))) { + output + .get(ERRORS) + .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, PREFIX_ERROR).build()); + numErrorFiles.inc(); + return; + } + + InputFile inputFile = table.io().newInputFile(filePath); + + Metrics metrics = + getFileMetrics( + inputFile, format, MetricsConfig.forTable(table), MappingUtil.create(table.schema())); + + // Figure out which partition this DataFile should go to + String partitionPath; + if (table.spec().isUnpartitioned()) { + partitionPath = ""; + } else if (!Strings.isNullOrEmpty(prefix)) { + // option 1: use directory structure to determine partition + // Note: we don't validate the DataFile content here + partitionPath = getPartitionFromFilePath(filePath); + } else { + try { + // option 2: examine DataFile min/max statistics to determine partition + partitionPath = getPartitionFromMetrics(metrics, inputFile, table); + } catch (UnknownPartitionException e) { + output + .get(ERRORS) + .output( + Row.withSchema(ERROR_SCHEMA) + .addValues(filePath, UNKNOWN_PARTITION_ERROR + e.getMessage()) + .build()); + numErrorFiles.inc(); + return; + } + } + + DataFile df = + DataFiles.builder(table.spec()) + .withPath(filePath) + .withFormat(format) + .withMetrics(metrics) + .withFileSizeInBytes(inputFile.getLength()) + .withPartitionPath(partitionPath) + .build(); + + output.get(DATA_FILES).output(SerializableDataFile.from(df, partitionPath)); + } + + static T transformValue(Transform transform, Type type, ByteBuffer bytes) { + return transform.bind(type).apply(Conversions.fromByteBuffer(type, bytes)); + } + + private static T transformValue(Transform transform, Type type, Object value) { + return transform.bind(type).apply((W) value); + } + + private Table getOrCreateTable(org.apache.iceberg.Schema schema) { + PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, schema); + try { + return tableProps == null + ? catalogConfig.catalog().createTable(TableIdentifier.parse(identifier), schema, spec) + : catalogConfig + .catalog() + .createTable(TableIdentifier.parse(identifier), schema, spec, tableProps); + } catch (AlreadyExistsException e) { // if table already exists, just load it + return catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); + } + } + + /** + * We don't have a table yet, so we don't know which FileIO to use to read these files. Instead, + * we use Beam's FileSystem utilities to read the file and extract its schema to create the + * table + */ + private static org.apache.iceberg.Schema getSchema(String filePath, FileFormat format) + throws IOException { + Preconditions.checkArgument( + format.equals(FileFormat.PARQUET), "Table creation is only supported for Parquet files."); + try (ParquetFileReader reader = ParquetFileReader.open(getParquetInputFile(filePath))) { + MessageType messageType = reader.getFooter().getFileMetaData().getSchema(); + return ParquetSchemaUtil.convert(messageType); + } + } + + private String getPartitionFromFilePath(String filePath) { + if (checkStateNotNull(table).spec().isUnpartitioned()) { + return ""; + } + String partitionPath = filePath.substring(checkStateNotNull(prefix).length()); + int lastSlashIndex = partitionPath.lastIndexOf('/'); + + return lastSlashIndex > 0 ? partitionPath.substring(0, lastSlashIndex) : ""; + } + + /** + * Examines the min/max values of each partition column to determine the destination partition. + * + *

If the transformed min/max values are not equal for any given column, we won't be able to + * determine the partition. We also cannot fall back to a "null" partition, because that will + * also get skipped by most queries. + * + *

The Bucket partition transform is an exceptional case because it is not monotonic, meaning + * it's not enough to just compare the min and max values. There may be a middle value somewhere + * that gets hashed to a different value. For this transform, we'll need to read all the values + * in the column ensure they all get transformed to the same partition value. + * + *

In these cases, we output the DataFile to the DLQ, because assigning an incorrect + * partition may lead to it being incorrectly ignored by downstream queries. + */ + static String getPartitionFromMetrics(Metrics metrics, InputFile inputFile, Table table) + throws UnknownPartitionException, IOException, InterruptedException { + List fields = table.spec().fields(); + List sourceIds = + fields.stream().map(PartitionField::sourceId).collect(Collectors.toList()); + Metrics partitionMetrics; + // Check if metrics already includes partition columns (configured by table properties): + if (metrics.lowerBounds().keySet().containsAll(sourceIds) + && metrics.upperBounds().keySet().containsAll(sourceIds)) { + partitionMetrics = metrics; + } else { + // Otherwise, recollect metrics and ensure it includes all partition fields. + // Note: we don't attach these additional metrics to the DataFile because we can't assume + // that's in the user's best interest. + // Some tables are very wide and users may not want to store excessive metadata. + List sourceNames = + fields.stream() + .map(pf -> table.schema().findColumnName(pf.sourceId())) + .collect(Collectors.toList()); + Map configProps = + sourceNames.stream() + .collect(Collectors.toMap(s -> "write.metadata.metrics.column." + s, s -> "full")); + MetricsConfig configWithPartitionFields = MetricsConfig.fromProperties(configProps); + partitionMetrics = + getFileMetrics( + inputFile, + inferFormat(inputFile.location()), + configWithPartitionFields, + MappingUtil.create(table.schema())); + } + + PartitionKey pk = new PartitionKey(table.spec(), table.schema()); + + HashMap bucketPartitions = new HashMap<>(); + for (int i = 0; i < fields.size(); i++) { + PartitionField field = fields.get(i); + Transform transform = field.transform(); + if (transform.toString().contains("bucket[")) { + bucketPartitions.put(i, field); + } + } + + // first, read only metadata for the non-bucket partition types + for (int i = 0; i < fields.size(); i++) { + PartitionField field = fields.get(i); + // skip bucket partitions (we will process them below) + if (bucketPartitions.containsKey(i)) { + continue; + } + Type type = table.schema().findType(field.sourceId()); + Transform transform = field.transform(); + + // Make a best effort estimate by comparing the lower and upper transformed values. + // If the transformed values are equal, assume that the DataFile's data safely + // aligns with the same partition. + ByteBuffer lowerBytes = partitionMetrics.lowerBounds().get(field.sourceId()); + ByteBuffer upperBytes = partitionMetrics.upperBounds().get(field.sourceId()); + if (lowerBytes == null && upperBytes == null) { + continue; + } else if (lowerBytes == null || upperBytes == null) { + throw new UnknownPartitionException( + "Only one of the min/max was was null, for field " + + table.schema().findColumnName(field.sourceId())); + } + Object lowerTransformedValue = transformValue(transform, type, lowerBytes); + Object upperTransformedValue = transformValue(transform, type, upperBytes); + + if (!Objects.deepEquals(lowerTransformedValue, upperTransformedValue)) { + // The DataFile contains values that align to different partitions, so we cannot + // safely determine a partition. + throw new UnknownPartitionException( + "Min and max transformed values were not equal, for column: " + field.name()); + } + + pk.set(i, lowerTransformedValue); + } + + // bucket transform needs extra processing (see java doc above) + if (!bucketPartitions.isEmpty()) { + // Optimize by only reading bucket-transformed columns into memory + org.apache.iceberg.Schema bucketCols = + TypeUtil.select( + table.schema(), + bucketPartitions.values().stream() + .map(PartitionField::sourceId) + .collect(Collectors.toSet())); + + // Keep one instance of transformed value per column. Use this to compare against each + // record's transformed value. + // Values in the same columns must yield the same transformed value, otherwise we cannot + // determine a partition + // from this file. + Map transformedValues = new HashMap<>(); + + // Do a one-time read of the file and compare all bucket-transformed columns + ACTIVE_READERS.acquire(); + try (CloseableIterable reader = ReadUtils.createReader(inputFile, bucketCols)) { + for (Record record : reader) { + for (Map.Entry entry : bucketPartitions.entrySet()) { + int partitionIndex = entry.getKey(); + PartitionField partitionField = entry.getValue(); + Transform transform = partitionField.transform(); + Types.NestedField field = table.schema().findField(partitionField.sourceId()); + Object value = record.getField(field.name()); + + // set initial transformed value for this column + @Nullable Object transformedValue = transformedValues.get(partitionIndex); + Object currentTransformedValue = transformValue(transform, field.type(), value); + if (transformedValue == null) { + transformedValues.put(partitionIndex, checkStateNotNull(currentTransformedValue)); + continue; + } + + if (!Objects.deepEquals(currentTransformedValue, transformedValue)) { + throw new UnknownPartitionException( + "Found records with conflicting transformed values, for column: " + + field.name()); + } + } + } + } finally { + ACTIVE_READERS.release(); + } + + for (Map.Entry partitionCol : transformedValues.entrySet()) { + pk.set(partitionCol.getKey(), partitionCol.getValue()); + } + } + return pk.toPath(); + } + } + + /** + * A stateful {@link DoFn} that commits batches of files to an Iceberg table. + * + *

Addresses two primary concerns: + * + *

    + *
  • Concurrency: Being stateful on a dummy {@code Void} key forces the runner to + * process batches sequentially, preventing concurrent commit conflicts on the Iceberg + * table. + *
  • Idempotency: Prevents duplicate commits during bundle failures by calculating a + * deterministic hash for the file set. This ID is stored in the Iceberg {@code Snapshot} + * summary, under the key {@code "beam.add-files-commit-id"}. Before committing, the DoFn + * travereses backwards through recent snapshots to check if the current batch's ID is + * already present. + *
+ * + *

Outputs the resulting Iceberg {@link Snapshot} information. + */ + static class CommitFilesDoFn extends DoFn>, Row> { + private final IcebergCatalogConfig catalogConfig; + private final String identifier; + private transient @MonotonicNonNull Table table = null; + private static final String COMMIT_ID_KEY = "beam.add-files-commit-id"; + + @StateId("lastCommitTimestamp") + private final StateSpec> lastCommitTimestamp = + StateSpecs.value(VarLongCoder.of()); + + public CommitFilesDoFn(IcebergCatalogConfig catalogConfig, String identifier) { + this.catalogConfig = catalogConfig; + this.identifier = identifier; + } + + @StartBundle + public void start() { + if (table == null) { + table = catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); + } + } + + @ProcessElement + public void process( + @Element KV> files, + @AlwaysFetched @StateId("lastCommitTimestamp") ValueState lastCommitTimestamp, + OutputReceiver output) { + String commitId = commitHash(files.getValue()); + Table table = checkStateNotNull(this.table); + table.refresh(); + + if (shouldSkip(commitId, lastCommitTimestamp.read())) { + return; + } + + int numFiles = 0; + AppendFiles appendFiles = table.newFastAppend(); + for (SerializableDataFile file : files.getValue()) { + DataFile df = file.createDataFile(table.specs()); + appendFiles.appendFile(df); + numFiles++; + } + appendFiles.set(COMMIT_ID_KEY, commitId); + LOG.info("Committing {} files, with commit ID: {}", numFiles, commitId); + appendFiles.commit(); + + Snapshot snapshot = table.currentSnapshot(); + output.output(SnapshotInfo.fromSnapshot(snapshot).toRow()); + lastCommitTimestamp.write(snapshot.timestampMillis()); + numFilesAdded.inc(numFiles); + } + + private String commitHash(Iterable files) { + Hasher hasher = Hashing.sha256().newHasher(); + + // Extract, sort, and hash to ensure deterministic output + List paths = new ArrayList<>(); + for (SerializableDataFile file : files) { + paths.add(file.getPath()); + } + Collections.sort(paths); + + for (String path : paths) { + hasher.putString(path, StandardCharsets.UTF_8); + } + return hasher.hash().toString(); + } + + /** + * Performs a look-back through Iceberg table history to determine if this specific batch of + * files has already been successfully committed. + */ + private boolean shouldSkip(String commitUID, @Nullable Long lastCommitTimestamp) { + if (lastCommitTimestamp == null) { + return false; + } + Table table = checkStateNotNull(this.table); + + // check past snapshots to see if they contain the commit ID + @Nullable Snapshot current = table.currentSnapshot(); + while (current != null && current.timestampMillis() > lastCommitTimestamp) { + Map summary = current.summary(); + if (summary != null && commitUID.equals(summary.get(COMMIT_ID_KEY))) { + return true; // commit already happened, we should skip + } + if (current.parentId() == null) { + break; + } + current = table.snapshot(current.parentId()); + } + + return false; + } + } + + @SuppressWarnings("argument") + public static Metrics getFileMetrics( + InputFile file, FileFormat format, MetricsConfig config, NameMapping mapping) + throws IOException { + switch (format) { + case PARQUET: + try (ParquetFileReader reader = + ParquetFileReader.open(getParquetInputFile(file.location()))) { + ParquetMetadata footer = reader.getFooter(); + MessageType originalMessageType = footer.getFileMetaData().getSchema(); + if (!ParquetSchemaUtil.hasIds(originalMessageType)) { + footer = getFooterWithTypeIds(originalMessageType, footer, mapping); + } + + return ParquetUtil.footerMetrics(footer, Stream.empty(), config, mapping); + } + case ORC: + return OrcMetrics.fromInputFile(file, config, mapping); + case AVRO: + return new Metrics(Avro.rowCount(file), null, null, null, null); + default: + throw new UnsupportedOperationException("Unsupported format: " + format); + } + } + + /** Tries to infer other file formats. Defaults to Parquet. */ + public static FileFormat inferFormat(String path) { + String lowerPath = path.toLowerCase(); + + if (lowerPath.endsWith(".parquet") || lowerPath.endsWith(".pqt")) { + return FileFormat.PARQUET; + } else if (lowerPath.endsWith(".orc")) { + return FileFormat.ORC; + } else if (lowerPath.endsWith(".avro")) { + return FileFormat.AVRO; + } else { + throw new UnknownFormatException(); + } + } + + static ParquetMetadata getFooterWithTypeIds( + MessageType originalMessageType, ParquetMetadata footer, NameMapping mapping) { + originalMessageType = ParquetSchemaUtil.applyNameMapping(originalMessageType, mapping); + FileMetaData oldFileMeta = footer.getFileMetaData(); + FileMetaData newFileMeta = + new FileMetaData( + originalMessageType, oldFileMeta.getKeyValueMetaData(), oldFileMeta.getCreatedBy()); + return new ParquetMetadata(newFileMeta, footer.getBlocks()); + } + + static org.apache.parquet.io.InputFile getParquetInputFile(String filePath) throws IOException { + ResourceId resourceId = + Iterables.getOnlyElement(FileSystems.match(filePath).metadata()).resourceId(); + Compression compression = Compression.detect(checkStateNotNull(resourceId.getFilename())); + SeekableByteChannel channel = + (SeekableByteChannel) compression.readDecompressed(FileSystems.open(resourceId)); + return new BeamParquetInputFile(channel); + } + + static class UnknownFormatException extends IllegalArgumentException {} + + static class UnknownPartitionException extends IllegalStateException { + UnknownPartitionException(String msg) { + super(msg); + } + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java new file mode 100644 index 000000000000..a04853c8ad96 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFilesSchemaTransformProvider.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.sdk.io.iceberg.AddFiles.ERROR_TAG; +import static org.apache.beam.sdk.io.iceberg.AddFiles.OUTPUT_TAG; +import static org.apache.beam.sdk.io.iceberg.AddFilesSchemaTransformProvider.Configuration; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling; +import org.apache.beam.sdk.transforms.Filter; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; + +@AutoService(SchemaTransformProvider.class) +public class AddFilesSchemaTransformProvider extends TypedSchemaTransformProvider { + @Override + public AddFilesSchemaTransform from(Configuration configuration) { + return new AddFilesSchemaTransform(configuration); + } + + @Override + public String identifier() { + return "beam:schematransform:iceberg_add_files:v1"; + } + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class Configuration { + public static Builder builder() { + return new AutoValue_AddFilesSchemaTransformProvider_Configuration.Builder(); + } + + @SchemaFieldDescription("A fully-qualified table identifier.") + public abstract String getTable(); + + @SchemaFieldDescription("Properties used to set up the Iceberg catalog.") + public abstract @Nullable Map getCatalogProperties(); + + @SchemaFieldDescription("Properties passed to the Hadoop ") + public abstract @Nullable Map getConfigProperties(); + + @SchemaFieldDescription( + "For a streaming pipeline, sets the frequency at which incoming files are appended. Defaults to 600 (10 minutes). " + + "A commit is triggered when either this or append batch size is reached.") + public abstract @Nullable Integer getTriggeringFrequencySeconds(); + + @SchemaFieldDescription( + "For a streaming pipeline, sets the desired number of appended files per commit. Defaults to 100,000 files. " + + "A commit is triggered when either this or append triggering interval is reached.") + public abstract @Nullable Integer getAppendBatchSize(); + + @SchemaFieldDescription( + "The prefix shared among all partitions. For example, a data file may have the following" + + " location:%n" + + "'gs://bucket/namespace/table/data/id=13/name=beam/data_file.parquet'%n%n" + + "The provided prefix should go up until the partition information:%n" + + "'gs://bucket/namespace/table/data/'.%n" + + "If not provided, will try determining each DataFile's partition from its metrics metadata.") + public abstract @Nullable String getLocationPrefix(); + + @SchemaFieldDescription( + "Fields used to create a partition spec that is applied when tables are created. For a field 'foo', " + + "the available partition transforms are:\n\n" + + "- `foo`\n" + + "- `truncate(foo, N)`\n" + + "- `bucket(foo, N)`\n" + + "- `hour(foo)`\n" + + "- `day(foo)`\n" + + "- `month(foo)`\n" + + "- `year(foo)`\n" + + "- `void(foo)`\n\n" + + "For more information on partition transforms, please visit https://iceberg.apache.org/spec/#partition-transforms.") + public abstract @Nullable List getPartitionFields(); + + @SchemaFieldDescription( + "Iceberg table properties to be set on the table when it is created.\n" + + "For more information on table properties," + + " please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties.") + public abstract @Nullable Map getTableProperties(); + + @SchemaFieldDescription("This option specifies whether and where to output unwritable rows.") + public abstract @Nullable ErrorHandling getErrorHandling(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setTable(String table); + + public abstract Builder setCatalogProperties(Map catalogProperties); + + public abstract Builder setConfigProperties(Map confProperties); + + public abstract Builder setTriggeringFrequencySeconds(Integer triggeringFrequencySeconds); + + public abstract Builder setAppendBatchSize(Integer size); + + public abstract Builder setLocationPrefix(String prefix); + + public abstract Builder setPartitionFields(List fields); + + public abstract Builder setTableProperties(Map props); + + public abstract Builder setErrorHandling(ErrorHandling errorHandling); + + public abstract Configuration build(); + } + + public IcebergCatalogConfig getIcebergCatalog() { + return IcebergCatalogConfig.builder() + .setCatalogProperties(getCatalogProperties()) + .setConfigProperties(getConfigProperties()) + .build(); + } + } + + public static class AddFilesSchemaTransform extends SchemaTransform { + private final Configuration configuration; + + public AddFilesSchemaTransform(Configuration configuration) { + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + Schema inputSchema = input.getSinglePCollection().getSchema(); + Preconditions.checkState( + inputSchema.getFieldCount() == 1 + && inputSchema.getField(0).getType().getTypeName().equals(Schema.TypeName.STRING), + "Incoming Row Schema must contain only one field of type String. Instead, got schema: %s", + inputSchema); + + @Nullable Integer frequency = configuration.getTriggeringFrequencySeconds(); + + PCollectionRowTuple result = + input + .getSinglePCollection() + .apply("Filter empty paths", Filter.by(row -> row.getString(0) != null)) + .apply( + "ExtractPaths", + MapElements.into(TypeDescriptors.strings()) + .via(row -> checkStateNotNull(row.getString(0)))) + .apply( + new AddFiles( + configuration.getIcebergCatalog(), + configuration.getTable(), + configuration.getLocationPrefix(), + configuration.getPartitionFields(), + configuration.getTableProperties(), + configuration.getAppendBatchSize(), + frequency != null ? Duration.standardSeconds(frequency) : null)); + + PCollectionRowTuple output = PCollectionRowTuple.of("snapshots", result.get(OUTPUT_TAG)); + ErrorHandling errorHandling = configuration.getErrorHandling(); + if (errorHandling != null) { + output = output.and(errorHandling.getOutput(), result.get(ERROR_TAG)); + } + return output; + } + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java index 2b3117f8bf84..805cc0672940 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java @@ -68,10 +68,13 @@ class PartitionUtils { static PartitionSpec toPartitionSpec( @Nullable List fields, org.apache.beam.sdk.schemas.Schema beamSchema) { + return toPartitionSpec(fields, IcebergUtils.beamSchemaToIcebergSchema(beamSchema)); + } + + static PartitionSpec toPartitionSpec(@Nullable List fields, Schema schema) { if (fields == null) { return PartitionSpec.unpartitioned(); } - Schema schema = IcebergUtils.beamSchemaToIcebergSchema(beamSchema); PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); for (String field : fields) { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java index a4d95ca249b4..e7f50882f433 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java @@ -46,9 +46,11 @@ import org.apache.iceberg.encryption.EncryptedInputFile; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.MappingUtil; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.parquet.ParquetReader; @@ -112,6 +114,34 @@ static ParquetReader createReader(FileScanTask task, Table table, Schema true); } + static ParquetReader createReader(InputFile inputFile, Schema schema) { + ParquetReadOptions.Builder optionsBuilder; + if (inputFile instanceof HadoopInputFile) { + // remove read properties already set that may conflict with this read + Configuration conf = new Configuration(((HadoopInputFile) inputFile).getConf()); + for (String property : READ_PROPERTIES_TO_REMOVE) { + conf.unset(property); + } + optionsBuilder = HadoopReadOptions.builder(conf); + } else { + optionsBuilder = ParquetReadOptions.builder(); + } + optionsBuilder = + optionsBuilder + .withRange(0, inputFile.getLength()) + .withMaxAllocationInBytes(MAX_FILE_BUFFER_SIZE); + + return new ParquetReader<>( + inputFile, + schema, + optionsBuilder.build(), + fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema), + MappingUtil.create(schema), + Expressions.alwaysTrue(), + false, + true); + } + static Map constantsMap( FileScanTask task, BiFunction converter, diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesIT.java new file mode 100644 index 000000000000..a67707e3dbb7 --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesIT.java @@ -0,0 +1,536 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static java.lang.String.format; +import static org.apache.beam.sdk.io.FileIO.Write.defaultNaming; +import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamSchemaToIcebergSchema; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.sdk.values.TypeDescriptors.strings; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import com.google.api.services.storage.model.StorageObject; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.Notification; +import com.google.cloud.storage.NotificationInfo; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageException; +import com.google.cloud.storage.StorageOptions; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.runners.direct.DirectOptions; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; +import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.extensions.gcp.util.GcsUtil; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubGrpcClient; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.TestPubsubOptions; +import org.apache.beam.sdk.io.parquet.ParquetIO; +import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Deduplicate; +import org.apache.beam.sdk.transforms.Filter; +import org.apache.beam.sdk.transforms.JsonToRow; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.hadoop.util.Lists; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.RESTCatalog; +import org.apache.iceberg.util.Pair; +import org.awaitility.Awaitility; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration tests for {@link AddFiles}, using Pubsub notifications to pass newly created file + * paths downstream to add to an Iceberg table. + */ +public class AddFilesIT { + private static final Logger LOG = LoggerFactory.getLogger(AddFilesIT.class); + + private static final String WAREHOUSE = "gs://managed-iceberg-biglake-its"; + private static final String PROJECT = + TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); + @Rule public TestName testName = new TestName(); + private final String notificationsTopic = + format( + "projects/%s/topics/%s-%s-%s", + PROJECT, + AddFilesIT.class.getSimpleName(), + testName.getMethodName(), + System.currentTimeMillis()); + private static final Schema NOTIFICATION_SCHEMA = + Schema.builder() + .addStringField("bucket") + .addStringField("name") + .addStringField("kind") + .build(); + private static final Map BIGLAKE_PROPS = + Map.of( + "type", "rest", + "uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog", + "warehouse", WAREHOUSE, + "header.x-goog-user-project", PROJECT, + "rest.auth.type", "google", + "io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO", + "rest-metrics-reporting-enabled", "false"); + private Storage storage; + private PubsubClient pubsub; + private Notification notification; + private final String namespace = getClass().getSimpleName(); + private String srcTableName; + private String destTableName; + private TableIdentifier srcTableId; + private TableIdentifier destTableId; + private long salt; + private String dirName; + private static final Schema ROW_SCHEMA = + Schema.builder().addInt64Field("id").addStringField("name").addInt32Field("age").build(); + private static final List PARTITION_FIELDS = + Arrays.asList("bucket(id, 16)", "truncate(name, 8)", "age"); + private static final PartitionSpec SPEC = + PartitionUtils.toPartitionSpec(PARTITION_FIELDS, ROW_SCHEMA); + private static final Map TABLE_PROPS = ImmutableMap.of("foo", "bar"); + private static final List TEST_ROWS = + IntStream.range(0, 20) + .mapToObj( + i -> Row.withSchema(ROW_SCHEMA).addValues((long) i, "name_" + i, i + 30).build()) + .collect(Collectors.toList()); + private final RESTCatalog catalog = new RESTCatalog(); + + @Before + public void setup() throws IOException { + storage = StorageOptions.newBuilder().build().getService(); + pubsub = + PubsubGrpcClient.FACTORY.newClient( + null, null, TestPipeline.testingPipelineOptions().as(TestPubsubOptions.class)); + + pubsub.createTopic(PubsubClient.topicPathFromPath(notificationsTopic)); + + NotificationInfo notificationInfo = + NotificationInfo.newBuilder(notificationsTopic) + .setEventTypes(NotificationInfo.EventType.OBJECT_FINALIZE) + .setPayloadFormat(NotificationInfo.PayloadFormat.JSON_API_V1) + .build(); + try { + notification = storage.createNotification(WAREHOUSE.replace("gs://", ""), notificationInfo); + } catch (StorageException e) { + if (e.getMessage().contains("Too many overlapping notifications")) { + List existing = storage.listNotifications(WAREHOUSE.replace("gs://", "")); + LOG.warn( + "Too many notifications on bucket {}: {}. Deleting existing notifications to make room: {}", + WAREHOUSE, + e, + existing.stream() + .map(NotificationInfo::getNotificationId) + .collect(Collectors.toList())); + existing.forEach( + n -> storage.deleteNotification(WAREHOUSE.replace("gs://", ""), n.getNotificationId())); + + // try creating it again + notification = storage.createNotification(WAREHOUSE.replace("gs://", ""), notificationInfo); + } else { + throw e; + } + } + + salt = System.currentTimeMillis(); + dirName = format("%s-%s/%s", getClass().getSimpleName(), salt, testName.getMethodName()); + srcTableName = "src_" + testName.getMethodName() + "_" + salt; + destTableName = "dest_" + testName.getMethodName() + "_" + salt; + srcTableId = TableIdentifier.of(namespace, srcTableName); + destTableId = TableIdentifier.of(namespace, destTableName); + + catalog.initialize("test_catalog", BIGLAKE_PROPS); + cleanupCatalog(); + catalog.createNamespace(Namespace.of(namespace)); + } + + private void cleanupCatalog() { + Namespace ns = Namespace.of(namespace); + if (catalog.namespaceExists(ns)) { + catalog.listTables(ns).forEach(catalog::dropTable); + catalog.dropNamespace(ns); + } + } + + @After + public void cleanup() { + try { + pubsub.deleteTopic(PubsubClient.topicPathFromPath(notificationsTopic)); + pubsub.close(); + } catch (Exception e) { + LOG.warn("Failed to clean up PubSub", e); + } + + try { + storage.deleteNotification(WAREHOUSE.replace("gs://", ""), notification.getNotificationId()); + storage.close(); + } catch (Exception e) { + LOG.warn("Failed to clean up GCS notifications", e); + } + + try { + cleanupCatalog(); + } catch (Exception e) { + LOG.warn("Failed to clean up Iceberg catalog", e); + } + + try { + Iterable blobs = + storage + .list(WAREHOUSE.replace("gs://", ""), Storage.BlobListOption.prefix(dirName)) + .getValues(); + blobs.forEach(b -> storage.delete(b.getBlobId())); + } catch (Exception e) { + LOG.warn("Failed to clean up GCS bucket", e); + } + } + + @Test + public void testStreamingImportFromExistingIcebergTable() + throws IOException, InterruptedException { + // first create a source iceberg table + catalog.createTable(srcTableId, beamSchemaToIcebergSchema(ROW_SCHEMA), SPEC); + + String filter = format("%s/%s/data/", namespace, srcTableName); + + // build AddFiles pipeline and let it run in the background + PipelineResult addFilesPipeline = startAddFilesListener(filter); + + // before writing, confirm the destination table still does not exist + assertFalse(catalog.tableExists(destTableId)); + + // write some rows to the source table + LOG.info("Writing records to the source table"); + Pipeline q = Pipeline.create(); + q.apply(Create.of(TEST_ROWS)) + .setRowSchema(ROW_SCHEMA) + .apply( + Managed.write(Managed.ICEBERG) + .withConfig( + ImmutableMap.of( + "table", srcTableId.toString(), "catalog_properties", BIGLAKE_PROPS))); + q.run().waitUntilFinish(); + + // check that the destination table has been created + Awaitility.await() + .atMost(java.time.Duration.ofMinutes(5)) + .pollInterval(java.time.Duration.ofSeconds(10)) + .until(() -> catalog.tableExists(destTableId)); + LOG.info("Destination table has been created"); + + LOG.info("Checking if all source files have been registered in the destination table"); + Awaitility.await() + .atMost(java.time.Duration.ofMinutes(2)) + .pollInterval(java.time.Duration.ofSeconds(5)) + .until(() -> checkTableFiles() != null); + LOG.info("Destination table has registered all source files."); + Pair, Map> srcAndDestfiles = + checkStateNotNull(checkTableFiles()); + + for (Map.Entry srcFile : srcAndDestfiles.first().entrySet()) { + String location = srcFile.getKey(); + DataFile destFile = + checkStateNotNull( + srcAndDestfiles.second().get(location), + "Source file '%s' was not registered in the destination table", + location); + + // check that partition metadata was preserved + assertEquals(destFile.partition(), srcFile.getValue().partition()); + } + + // safe to cancel the AddFiles pipeline now + LOG.info("Canceling AddFiles listener."); + addFilesPipeline.cancel(); + + // check all records are there + checkRecordsInDestinationTable(); + } + + /** + * Fetch all added files in both tables. Return null if some files have not yet propagated to + * destination table + */ + private @Nullable Pair, Map> checkTableFiles() { + Table destTable = catalog.loadTable(destTableId); + Table srcTable = catalog.loadTable(srcTableId); + + Map srcFiles = new HashMap<>(); + Map destFiles = new HashMap<>(); + for (Snapshot snapshot : srcTable.snapshots()) { + snapshot.addedDataFiles(srcTable.io()).forEach(df -> srcFiles.put(df.location(), df)); + } + for (Snapshot snapshot : destTable.snapshots()) { + snapshot.addedDataFiles(destTable.io()).forEach(df -> destFiles.put(df.location(), df)); + } + + LOG.info( + "Number of source files: {}, Number of registered destination files: {}", + srcFiles.size(), + destFiles.size()); + + if (srcFiles.size() != destFiles.size()) { + Set onlyInSrc = new HashSet<>(srcFiles.keySet()); + onlyInSrc.removeAll(destFiles.keySet()); + LOG.info("Missing source files: {}", onlyInSrc); + return null; + } + + return Pair.of(srcFiles, destFiles); + } + + @Test + public void testStreamingParquetImport() + throws InterruptedException, TimeoutException, IOException { + // start with a table that does not exist + + String parquetDir = format("%s/%s/", WAREHOUSE, dirName); + String tempDir = format("%s/%s-tmp/", WAREHOUSE, dirName); + + // let the add files pipeline run in the background + PipelineResult addFilesPipeline = startAddFilesListener(dirName); + + // before writing, confirm the destination table still does not exist + assertFalse(catalog.tableExists(destTableId)); + + // write some parquet files + LOG.info("Writing records to the parquet dir"); + Pipeline q = Pipeline.create(); + org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(ROW_SCHEMA); + q.apply(Create.of(TEST_ROWS)) + .setRowSchema(ROW_SCHEMA) + .apply( + MapElements.into(TypeDescriptor.of(GenericRecord.class)) + .via(AvroUtils.getRowToGenericRecordFunction(avroSchema))) + .setCoder(AvroCoder.of(avroSchema)) + .apply( + FileIO.writeDynamic() + .by( + record -> + format("%s-%s-%s", record.get("id"), record.get("name"), record.get("age"))) + .via(ParquetIO.sink(avroSchema)) + .withNaming(name -> defaultNaming(name, ".parquet")) + .withTempDirectory(tempDir) + .to(parquetDir) + .withDestinationCoder(StringUtf8Coder.of())); + q.run().waitUntilFinish(); + + GcsUtil gcsUtil = TestPipeline.testingPipelineOptions().as(GcsOptions.class).getGcsUtil(); + + Iterable objects = + gcsUtil.listObjects(WAREHOUSE.replace("gs://", ""), dirName, null).getItems(); + List writtenFilePaths = + Lists.newArrayList(objects).stream() + .map(o -> format("gs://%s/%s", o.getBucket(), o.getName())) + .collect(Collectors.toList()); + LOG.info("Written file paths: {}", writtenFilePaths); + + // check that the destination table has been created + Awaitility.await() + .atMost(java.time.Duration.ofMinutes(1)) + .pollInterval(java.time.Duration.ofSeconds(5)) + .until(() -> catalog.tableExists(destTableId)); + LOG.info("Destination table has been created"); + + LOG.info("Checking if all source files have been registered in the destination table"); + Awaitility.await() + .atMost(java.time.Duration.ofMinutes(2)) + .pollInterval(java.time.Duration.ofSeconds(5)) + .until(() -> checkTableHasRegisteredParquetFiles(writtenFilePaths)); + LOG.info( + "Destination table has registered all source files ({} files).", writtenFilePaths.size()); + + // safe to cancel the AddFiles pipeline now + LOG.info("Canceling AddFiles listener."); + addFilesPipeline.cancel(); + + // check all records are there + checkRecordsInDestinationTable(); + } + + @Test + public void testBatchParquetImport() throws IOException { + // start with a table that does not exist + + String parquetDir = format("%s/%s/", WAREHOUSE, dirName); + String tempDir = format("%s/%s-tmp/", WAREHOUSE, dirName); + + // write some parquet files + LOG.info("Writing records to the parquet dir"); + Pipeline q = Pipeline.create(); + org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(ROW_SCHEMA); + q.apply(Create.of(TEST_ROWS)) + .setRowSchema(ROW_SCHEMA) + .apply( + MapElements.into(TypeDescriptor.of(GenericRecord.class)) + .via(AvroUtils.getRowToGenericRecordFunction(avroSchema))) + .setCoder(AvroCoder.of(avroSchema)) + .apply( + FileIO.writeDynamic() + .by( + record -> + format("%s-%s-%s", record.get("id"), record.get("name"), record.get("age"))) + .via(ParquetIO.sink(avroSchema)) + .withNaming(name -> defaultNaming(name, ".parquet")) + .withTempDirectory(tempDir) + .to(parquetDir) + .withDestinationCoder(StringUtf8Coder.of())); + q.run().waitUntilFinish(); + + GcsUtil gcsUtil = TestPipeline.testingPipelineOptions().as(GcsOptions.class).getGcsUtil(); + + Iterable objects = + gcsUtil.listObjects(WAREHOUSE.replace("gs://", ""), dirName, null).getItems(); + List writtenFilePaths = + Lists.newArrayList(objects).stream() + .map(o -> format("gs://%s/%s", o.getBucket(), o.getName())) + .collect(Collectors.toList()); + LOG.info("Written file paths: {}", writtenFilePaths); + + // before adding, confirm the destination table still does not exist + assertFalse(catalog.tableExists(destTableId)); + + // run batch AddFiles + Pipeline p = Pipeline.create(); + PCollectionRowTuple tuple = + p.apply(Create.of(writtenFilePaths)) + .apply( + new AddFiles( + IcebergCatalogConfig.builder().setCatalogProperties(BIGLAKE_PROPS).build(), + namespace + "." + destTableName, + null, + PARTITION_FIELDS, + TABLE_PROPS, + 10, + Duration.standardSeconds(10))); + PAssert.that(tuple.get("errors")).empty(); + p.run().waitUntilFinish(); + + // check that the destination table has been created + assertTrue(catalog.tableExists(destTableId)); + LOG.info("Destination table has been created"); + + LOG.info("Checking if all source files have been registered in the destination table"); + assertTrue(checkTableHasRegisteredParquetFiles(writtenFilePaths)); + LOG.info( + "Destination table has registered all source files ({} files).", writtenFilePaths.size()); + + // check all records are there + checkRecordsInDestinationTable(); + } + + private void checkRecordsInDestinationTable() { + Pipeline s = Pipeline.create(); + PCollection destRows = + s.apply( + Managed.read(Managed.ICEBERG) + .withConfig( + ImmutableMap.of( + "table", destTableId.toString(), "catalog_properties", BIGLAKE_PROPS))) + .getSinglePCollection(); + PAssert.that(destRows).containsInAnyOrder(TEST_ROWS); + s.run().waitUntilFinish(); + } + + private boolean checkTableHasRegisteredParquetFiles(List parquetFiles) { + Table destTable = catalog.loadTable(destTableId); + + int numRegisteredFiles = 0; + for (Snapshot snapshot : destTable.snapshots()) { + numRegisteredFiles += Iterables.size(snapshot.addedDataFiles(destTable.io())); + } + LOG.info( + "Number of source files: {}, Number of registered destination files: {}", + parquetFiles.size(), + numRegisteredFiles); + return numRegisteredFiles == parquetFiles.size(); + } + + private PipelineResult startAddFilesListener(String filter) throws InterruptedException { + DirectOptions options = TestPipeline.testingPipelineOptions().as(DirectOptions.class); + options.setBlockOnRun(false); + Pipeline p = Pipeline.create(options); + + PCollectionRowTuple tuple = + p.apply(PubsubIO.readStrings().fromTopic(notificationsTopic)) + .apply(JsonToRow.withSchema(NOTIFICATION_SCHEMA)) + .apply(Filter.by(row -> row.getString("name").contains(filter))) + .apply( + MapElements.into(strings()) + .via( + row -> + format("gs://%s/%s", row.getString("bucket"), row.getString("name")))) + .apply(Deduplicate.values()) + .apply( + new AddFiles( + IcebergCatalogConfig.builder().setCatalogProperties(BIGLAKE_PROPS).build(), + namespace + "." + destTableName, + null, + PARTITION_FIELDS, + TABLE_PROPS, + 10, + Duration.standardSeconds(10))); + PAssert.that(tuple.get("errors")).empty(); + PipelineResult result = p.run(); + + LOG.info( + "Started running the AddFiles listener pipeline. Waiting for 10s to allow it enough time to setup"); + Thread.sleep(10_000); + return result; + } +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java new file mode 100644 index 000000000000..287b9140e000 --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java @@ -0,0 +1,655 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.PREFIX_ERROR; +import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.UNKNOWN_PARTITION_ERROR; +import static org.apache.beam.sdk.io.iceberg.AddFiles.ConvertToDataFile.getPartitionFromMetrics; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasEntry; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionData; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.MappingUtil; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.SerializableFunction; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class AddFilesTest { + @Rule public TemporaryFolder temp = new TemporaryFolder(); + private String root; + @Rule public TestPipeline pipeline = TestPipeline.create(); + + private HadoopCatalog catalog; + private TableIdentifier tableId; + private final org.apache.iceberg.Schema icebergSchema = + new org.apache.iceberg.Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "name", Types.StringType.get()), + Types.NestedField.required(3, "age", Types.IntegerType.get())); + private final List partitionFields = Arrays.asList("age", "truncate(name, 3)"); + private final PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, icebergSchema); + private final PartitionKey wrapper = new PartitionKey(spec, icebergSchema); + private final Map tableProps = + ImmutableMap.of("write.metadata.metrics.default", "full", "foo", "bar"); + private IcebergCatalogConfig catalogConfig; + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + @Rule public TestName testName = new TestName(); + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Rule + public transient TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default"); + + @Before + public void setup() throws Exception { + // Root for existing data files: + root = temp.getRoot().getAbsolutePath() + "/"; + + // Set up a local Hadoop Catalog + catalog = new HadoopCatalog(new Configuration(), warehouse.location); + tableId = TableIdentifier.of("default", testName.getMethodName()); + + catalogConfig = + IcebergCatalogConfig.builder() + .setCatalogProperties( + ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location)) + .build(); + } + + @Test + public void testAddPartitionedFiles() throws Exception { + testAddFilesWithPartitionPath(true); + } + + @Test + public void testAddUnPartitionedFiles() throws Exception { + testAddFilesWithPartitionPath(false); + } + + public void testAddFilesWithPartitionPath(boolean isPartitioned) throws Exception { + // 1. Generate two local Parquet file. + // Include Hive-like partition path if testing partition case + String partitionPath1 = isPartitioned ? "age=20/name_trunc=Mar/" : ""; + String file1 = root + partitionPath1 + "data1.parquet"; + wrapper.wrap(record(-1, "Mar", 20)); + DataWriter writer = createWriter(file1, isPartitioned ? wrapper.copy() : null); + writer.write(record(1, "Mark", 20)); + writer.write(record(2, "Martin", 20)); + writer.close(); + + String partitionPath2 = isPartitioned ? "age=25/name_trunc=Sam/" : ""; + String file2 = root + partitionPath2 + "data2.parquet"; + wrapper.wrap(record(-1, "Sam", 25)); + DataWriter writer2 = createWriter(file2, isPartitioned ? wrapper.copy() : null); + writer2.write(record(3, "Samantha", 25)); + writer2.write(record(4, "Sammy", 25)); + writer2.close(); + + // 2. Setup the input PCollection + PCollection inputFiles = pipeline.apply("Create Input", Create.of(file1, file2)); + + // 3. Apply the transform (Trigger aggressively for testing) + PCollectionRowTuple output = + inputFiles.apply( + new AddFiles( + catalogConfig, + tableId.toString(), + isPartitioned ? root : null, + isPartitioned ? partitionFields : null, + tableProps, + 2, // trigger at 2 files + Duration.standardSeconds(10))); + + // 4. Validate PCollection Outputs + PAssert.that(output.get("errors")).empty(); + + // 5. Run the pipeline + pipeline.run().waitUntilFinish(); + + // 6. Validate the Iceberg Table was created with the correct spec and properties + Table table = catalog.loadTable(tableId); + tableProps.forEach((key, value) -> assertThat(table.properties(), hasEntry(key, value))); + assertEquals(isPartitioned ? spec : PartitionSpec.unpartitioned(), table.spec()); + + // Check that we have exactly 1 snapshot with 2 files + assertEquals(1, Iterables.size(table.snapshots())); + + List addedFiles = + Lists.newArrayList(table.currentSnapshot().addedDataFiles(table.io())); + assertEquals(2, addedFiles.size()); + + // Verify file paths + assertTrue(addedFiles.stream().anyMatch(df -> df.location().contains("data1.parquet"))); + assertTrue(addedFiles.stream().anyMatch(df -> df.location().contains("data2.parquet"))); + + // check metrics metadata is preserved + DataFile writtenDf1 = writer.toDataFile(); + DataFile writtenDf2 = writer2.toDataFile(); + DataFile addedDf1 = + Iterables.getOnlyElement( + addedFiles.stream() + .filter(df -> df.location().contains("data1.parquet")) + .collect(Collectors.toList())); + DataFile addedDf2 = + Iterables.getOnlyElement( + addedFiles.stream() + .filter(df -> df.location().contains("data2.parquet")) + .collect(Collectors.toList())); + + assertEquals(writtenDf1.lowerBounds(), addedDf1.lowerBounds()); + assertEquals(writtenDf1.upperBounds(), addedDf1.upperBounds()); + assertEquals(writtenDf2.lowerBounds(), addedDf2.lowerBounds()); + assertEquals(writtenDf2.upperBounds(), addedDf2.upperBounds()); + + // check partition metadata is preserved + assertEquals(writtenDf1.partition(), addedDf1.partition()); + assertEquals(writtenDf2.partition(), addedDf2.partition()); + } + + @Test + public void testAddFilesWithPartitionFromMetrics() throws IOException { + // 1. Generate local Parquet files with no directory structure. + String file1 = root + "data1.parquet"; + DataWriter writer = createWriter(file1); + writer.write(record(1, "Mark", 20)); + writer.write(record(2, "Martin", 20)); + writer.close(); + PartitionData expectedPartition1 = new PartitionData(spec.partitionType()); + expectedPartition1.set(0, 20); + expectedPartition1.set(1, "Mar"); + + String file2 = root + "data2.parquet"; + DataWriter writer2 = createWriter(file2); + writer2.write(record(3, "Samantha", 25)); + writer2.write(record(4, "Sammy", 25)); + writer2.close(); + PartitionData expectedPartition2 = new PartitionData(spec.partitionType()); + expectedPartition2.set(0, 25); + expectedPartition2.set(1, "Sam"); + + // Also create a "bad" DataFile, containing values that correspond to different partitions + // This file should get output to the DLQ, because we cannot determine its partition + String file3 = root + "data3.parquet"; + DataWriter writer3 = createWriter(file3); + writer3.write(record(5, "Johnny", 25)); + writer3.write(record(6, "Yaseen", 32)); + writer3.close(); + + // 2. Setup the input PCollection + PCollection inputFiles = pipeline.apply("Create Input", Create.of(file1, file2, file3)); + + // 3. Apply the transform (Trigger aggressively for testing) + PCollectionRowTuple output = + inputFiles.apply( + new AddFiles( + catalogConfig, + tableId.toString(), + null, // no prefix, so determine partition from DF metrics + partitionFields, + tableProps, + 2, // trigger at 2 files + Duration.standardSeconds(10))); + + // 4. There should be an error for File3, because its partition could not be determined + PAssert.that(output.get("errors")) + .satisfies( + errorRows -> { + Row errorRow = Iterables.getOnlyElement(errorRows); + checkState( + errorRow.getSchema().equals(AddFiles.ERROR_SCHEMA) + && file3.equals(errorRow.getString(0)) + && checkStateNotNull(errorRow.getString(1)) + .startsWith(UNKNOWN_PARTITION_ERROR)); + return null; + }); + + // 5. Run the pipeline + pipeline.run().waitUntilFinish(); + + // 6. Validate the Iceberg Table was created with the correct spec and properties + Table table = catalog.loadTable(tableId); + tableProps.forEach((key, value) -> assertThat(table.properties(), hasEntry(key, value))); + assertEquals(spec, table.spec()); + + // Check that we have exactly 1 snapshot with 2 files + assertEquals(1, Iterables.size(table.snapshots())); + + List addedFiles = + Lists.newArrayList(table.currentSnapshot().addedDataFiles(table.io())); + assertEquals(2, addedFiles.size()); + + // Verify file paths + assertTrue(addedFiles.stream().anyMatch(df -> df.location().contains("data1.parquet"))); + assertTrue(addedFiles.stream().anyMatch(df -> df.location().contains("data2.parquet"))); + + // check metrics metadata is preserved + DataFile writtenDf1 = writer.toDataFile(); + DataFile writtenDf2 = writer2.toDataFile(); + DataFile addedDf1 = + Iterables.getOnlyElement( + addedFiles.stream() + .filter(df -> df.location().contains("data1.parquet")) + .collect(Collectors.toList())); + DataFile addedDf2 = + Iterables.getOnlyElement( + addedFiles.stream() + .filter(df -> df.location().contains("data2.parquet")) + .collect(Collectors.toList())); + + assertEquals(writtenDf1.lowerBounds(), addedDf1.lowerBounds()); + assertEquals(writtenDf1.upperBounds(), addedDf1.upperBounds()); + assertEquals(writtenDf2.lowerBounds(), addedDf2.lowerBounds()); + assertEquals(writtenDf2.upperBounds(), addedDf2.upperBounds()); + + // check partition metadata is preserved + assertEquals(expectedPartition1, addedDf1.partition()); + assertEquals(expectedPartition2, addedDf2.partition()); + } + + @Test + public void testStreamingAdds() throws IOException { + List paths = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + String file = String.format("%sdata_%s.parquet", root, i); + DataWriter writer = createWriter(file); + writer.write(record(1, "SomeName", 30)); + writer.close(); + paths.add(file); + } + + PCollection files = + pipeline.apply( + TestStream.create(StringUtf8Coder.of()) + .addElements( + paths.get(0), + paths.subList(1, 15).toArray(new String[] {})) // should commit twice + .advanceProcessingTime(Duration.standardSeconds(10)) + .addElements( + paths.get(15), + paths.subList(16, 40).toArray(new String[] {})) // should commit 3 times + .advanceProcessingTime(Duration.standardSeconds(10)) + .addElements( + paths.get(40), + paths.subList(41, 45).toArray(new String[] {})) // should commit once + .advanceWatermarkToInfinity()); + + files.apply( + new AddFiles( + catalogConfig, + tableId.toString(), + null, + null, + null, + 10, // trigger at 10 files + Duration.standardSeconds(5))); + pipeline.run().waitUntilFinish(); + + Table table = catalog.loadTable(tableId); + + List snapshots = Lists.newArrayList(table.snapshots()); + snapshots.sort(Comparator.comparingLong(Snapshot::timestampMillis)); + + assertEquals(6, snapshots.size()); + assertEquals(10, Iterables.size(snapshots.get(0).addedDataFiles(table.io()))); + assertEquals(5, Iterables.size(snapshots.get(1).addedDataFiles(table.io()))); + assertEquals(10, Iterables.size(snapshots.get(2).addedDataFiles(table.io()))); + assertEquals(10, Iterables.size(snapshots.get(3).addedDataFiles(table.io()))); + assertEquals(5, Iterables.size(snapshots.get(4).addedDataFiles(table.io()))); + assertEquals(5, Iterables.size(snapshots.get(5).addedDataFiles(table.io()))); + } + + @Test + public void testUnknownFormatErrors() throws Exception { + catalog.createTable(tableId, icebergSchema); + // Create a dummy text file (unsupported extension) + File txtFile = temp.newFile("unsupported.txt"); + txtFile.createNewFile(); + + PCollection inputFiles = + pipeline.apply("Create Input", Create.of(txtFile.getAbsolutePath())); + + AddFiles addFiles = new AddFiles(catalogConfig, tableId.toString(), null, null, null, 1, null); + PCollectionRowTuple outputTuple = inputFiles.apply(addFiles); + + // Validate the file ended up in the errors PCollection with the correct schema + PAssert.that(outputTuple.get("errors")) + .containsInAnyOrder( + Row.withSchema(AddFiles.ERROR_SCHEMA) + .addValues(txtFile.getAbsolutePath(), "Could not determine the file's format") + .build()); + + pipeline.run().waitUntilFinish(); + } + + @Test + public void testPartitionPrefixErrors() throws Exception { + // Drop unpartitioned table and create a partitioned one + catalog.dropTable(tableId); + PartitionSpec spec = PartitionSpec.builderFor(icebergSchema).identity("name").build(); + catalog.createTable(tableId, icebergSchema, spec); + + String file1 = root + "data1.parquet"; + wrapper.wrap(record(-1, "And", 30)); + DataWriter writer = createWriter(file1, wrapper.copy()); + writer.write(record(1, "Andrew", 30)); + writer.close(); + + PCollection inputFiles = pipeline.apply("Create Input", Create.of(file1)); + + // Notice locationPrefix is "some/prefix/" but the absolute path doesn't start with it + AddFiles addFiles = + new AddFiles(catalogConfig, tableId.toString(), "some/prefix/", null, null, 1, null); + PCollectionRowTuple outputTuple = inputFiles.apply(addFiles); + + PAssert.that(outputTuple.get("errors")) + .containsInAnyOrder( + Row.withSchema(AddFiles.ERROR_SCHEMA).addValues(file1, PREFIX_ERROR).build()); + + pipeline.run().waitUntilFinish(); + } + + @Test + public void testRecognizesBucketPartitionMismatch() throws IOException { + catalog.dropTable(tableId); + + String file1 = root + "data1.parquet"; + wrapper.wrap(record(-1, "And", 30)); + DataWriter writer = createWriter(file1, wrapper.copy()); + writer.write(record(1, "Andrew", 30)); + writer.write(record(5, "Sally", 30)); + writer.write(record(10, "Ahmed", 30)); + writer.close(); + + // 1 (min) and 10 (max) will transform to bucket=0 + // 5 (some middle value) transforms to bucket=1 + // To prove this transform value mapping^, below is a sanity check. + // We should recognize that we cannot assign a partition to such a file, and pass it to DLQ. + List partitionFields = Arrays.asList("bucket(id, 2)", "age"); + PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, icebergSchema); + PartitionField bucketPartition = spec.fields().get(0); + assertEquals("id_bucket", bucketPartition.name()); + assertTrue(bucketPartition.transform().toString().contains("bucket[")); + SerializableFunction transformFunc = + (SerializableFunction) + bucketPartition.transform().bind(Types.LongType.get()); + assertEquals(0, (int) transformFunc.apply(1L)); + assertEquals(1, (int) transformFunc.apply(5L)); + assertEquals(0, (int) transformFunc.apply(10L)); + + AddFiles addFiles = + new AddFiles(catalogConfig, tableId.toString(), null, partitionFields, null, 1, null); + PCollection inputFiles = pipeline.apply("Create Input", Create.of(file1)); + PCollectionRowTuple outputTuple = inputFiles.apply(addFiles); + + PAssert.that(outputTuple.get("errors")) + .containsInAnyOrder( + Row.withSchema(AddFiles.ERROR_SCHEMA) + .addValues( + file1, + UNKNOWN_PARTITION_ERROR + + "Found records with conflicting transformed values, for column: id") + .build()); + pipeline.run().waitUntilFinish(); + } + + @Test + public void testGetPartitionFromMetrics() throws IOException, InterruptedException { + PartitionSpec partitionSpec = + PartitionSpec.builderFor(icebergSchema) + .bucket("id", 2) + .truncate("name", 4) + .identity("age") + .build(); + + List testCases = + Arrays.asList( + PartitionTestCase.of( + root + "data_1.parquet", + record(1, "aaaa", 10), + Arrays.asList( + record(1, "aaaa123", 10), + record(10, "aaaa789", 10), + record(100, "aaaa456", 10)), + Arrays.asList(1, CharBuffer.wrap("aaaa123"), 10), + Arrays.asList(100, CharBuffer.wrap("aaaa789"), 10), + "id_bucket=0/name_trunc=aaaa/age=10"), + PartitionTestCase.of( + root + "data_2.parquet", + record(1, "bbbb", 30), + Arrays.asList( + record(5, "bbbb789", 30), + record(55, "bbbb456", 30), + record(500, "bbbb123", 30)), + Arrays.asList(5, CharBuffer.wrap("bbbb123"), 30), + Arrays.asList(500, CharBuffer.wrap("bbbb789"), 30), + "id_bucket=1/name_trunc=bbbb/age=30")); + + PartitionKey pk = new PartitionKey(partitionSpec, icebergSchema); + MetricsConfig metricsConfig = MetricsConfig.fromProperties(tableProps); + Table table = catalog.createTable(tableId, icebergSchema, partitionSpec); + + for (PartitionTestCase caze : testCases) { + List records = caze.records; + String fileName = caze.fileName; + pk.wrap(caze.partition); + DataWriter writer = createWriter(fileName, pk.copy()); + + for (Record record : records) { + writer.write(record); + } + writer.close(); + InputFile file = table.io().newInputFile(fileName); + + Metrics metrics = + AddFiles.getFileMetrics( + file, FileFormat.PARQUET, metricsConfig, MappingUtil.create(icebergSchema)); + for (int i = 0; i < partitionSpec.fields().size(); i++) { + PartitionField partitionField = partitionSpec.fields().get(i); + Types.NestedField field = icebergSchema.findField(partitionField.sourceId()); + ByteBuffer lowerBytes = metrics.lowerBounds().get(field.fieldId()); + ByteBuffer upperBytes = metrics.upperBounds().get(field.fieldId()); + + Object lower = Conversions.fromByteBuffer(field.type(), lowerBytes); + Object upper = Conversions.fromByteBuffer(field.type(), upperBytes); + + assertEquals(caze.expectedLower.get(i), lower); + assertEquals(caze.expectedUpper.get(i), upper); + } + + String partitionPath = getPartitionFromMetrics(metrics, file, table); + assertEquals(caze.expectedPartition, partitionPath); + } + } + + @Test + public void testThrowPartitionMismatchError() throws IOException, InterruptedException { + PartitionSpec partitionSpec = + PartitionSpec.builderFor(icebergSchema) + .bucket("id", 2) + .truncate("name", 4) + .identity("age") + .build(); + + List testCases = + Arrays.asList( + PartitionTestCase.of( + root + "data_1.parquet", + record(1, "aaaa", 10), + Arrays.asList( + record(1, "aaaa123", 10), record(10, "abab", 10), record(100, "aaaa789", 10)), + Arrays.asList(1, CharBuffer.wrap("aaaa123"), 10), + Arrays.asList(100, CharBuffer.wrap("abab"), 10), + "error"), + PartitionTestCase.of( + root + "data_2.parquet", + record(1, "bbbb", 30), + Arrays.asList( + record(5, "bbbb", 30), record(55, "bbbb", 30), record(500, "bbbb", 50)), + Arrays.asList(5, CharBuffer.wrap("bbbb"), 30), + Arrays.asList(500, CharBuffer.wrap("bbbb"), 50), + "error")); + + PartitionKey pk = new PartitionKey(partitionSpec, icebergSchema); + MetricsConfig metricsConfig = MetricsConfig.fromProperties(tableProps); + Table table = catalog.createTable(tableId, icebergSchema, partitionSpec); + + for (PartitionTestCase caze : testCases) { + List records = caze.records; + String fileName = caze.fileName; + pk.wrap(caze.partition); + DataWriter writer = createWriter(fileName, pk.copy()); + + for (Record record : records) { + writer.write(record); + } + writer.close(); + InputFile file = table.io().newInputFile(fileName); + + Metrics metrics = + AddFiles.getFileMetrics( + file, FileFormat.PARQUET, metricsConfig, MappingUtil.create(icebergSchema)); + // check that lower/upper stats are still fetched correctly + for (int i = 0; i < partitionSpec.fields().size(); i++) { + PartitionField partitionField = partitionSpec.fields().get(i); + Types.NestedField field = icebergSchema.findField(partitionField.sourceId()); + ByteBuffer lowerBytes = metrics.lowerBounds().get(field.fieldId()); + ByteBuffer upperBytes = metrics.upperBounds().get(field.fieldId()); + + Object lower = Conversions.fromByteBuffer(field.type(), lowerBytes); + Object upper = Conversions.fromByteBuffer(field.type(), upperBytes); + + assertEquals(caze.expectedLower.get(i), lower); + assertEquals(caze.expectedUpper.get(i), upper); + } + + assertThrows( + AddFiles.UnknownPartitionException.class, + () -> getPartitionFromMetrics(metrics, file, table)); + } + } + + static class PartitionTestCase { + String fileName; + StructLike partition; + List records; + List expectedLower; + List expectedUpper; + String expectedPartition; + + PartitionTestCase( + String fileName, + StructLike partition, + List records, + List expectedLower, + List expectedUpper, + String expectedPartition) { + this.fileName = fileName; + this.partition = partition; + this.records = records; + this.expectedLower = expectedLower; + this.expectedUpper = expectedUpper; + this.expectedPartition = expectedPartition; + } + + static PartitionTestCase of( + String fileName, + StructLike partition, + List records, + List expectedLower, + List expectedUpper, + String expectedPartition) { + return new PartitionTestCase( + fileName, partition, records, expectedLower, expectedUpper, expectedPartition); + } + } + + private DataWriter createWriter(String file) throws IOException { + return createWriter(file, null); + } + + private DataWriter createWriter(String file, @Nullable StructLike partition) + throws IOException { + return Parquet.writeData(Files.localOutput(file)) + .schema(icebergSchema) + .withSpec(partition != null ? spec : PartitionSpec.unpartitioned()) + .withPartition(partition) + .createWriterFunc(GenericParquetWriter::create) + .build(); + } + + private Record record(int id, String name, int age) { + return GenericRecord.create(icebergSchema).copy("id", id, "name", name, "age", age); + } +} diff --git a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java index e6e4e27d74f3..d365cd5f7c77 100644 --- a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java +++ b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java @@ -971,10 +971,10 @@ public Progress getProgress() { } } - private static class BeamParquetInputFile implements InputFile { + public static class BeamParquetInputFile implements InputFile { private final SeekableByteChannel seekableByteChannel; - BeamParquetInputFile(SeekableByteChannel seekableByteChannel) { + public BeamParquetInputFile(SeekableByteChannel seekableByteChannel) { this.seekableByteChannel = seekableByteChannel; }