diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index b73af5e61a43..7ab7bcd9a9c6 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 1 + "modification": 2 } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java index 4a164700099a..e250536382ed 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java @@ -23,6 +23,7 @@ import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.beam.sdk.values.PCollection.IsBounded.BOUNDED; +import java.io.FileNotFoundException; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SeekableByteChannel; @@ -81,6 +82,7 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.MappingUtil; @@ -240,7 +242,18 @@ public void process(@Element String filePath, MultiOutputReceiver output) } if (table == null) { - table = getOrCreateTable(getSchema(filePath, format)); + try { + table = getOrCreateTable(filePath, format); + } catch (FileNotFoundException e) { + output + .get(ERRORS) + .output( + Row.withSchema(ERROR_SCHEMA) + .addValues(filePath, checkStateNotNull(e.getMessage())) + .build()); + numErrorFiles.inc(); + return; + } } // Check if the file path contains the provided prefix @@ -256,9 +269,24 @@ public void process(@Element String filePath, MultiOutputReceiver output) InputFile inputFile = table.io().newInputFile(filePath); - Metrics metrics = - getFileMetrics( - inputFile, format, MetricsConfig.forTable(table), MappingUtil.create(table.schema())); + Metrics metrics; + try { + metrics = + getFileMetrics( + inputFile, + format, + MetricsConfig.forTable(table), + MappingUtil.create(table.schema())); + } catch (FileNotFoundException e) { + output + .get(ERRORS) + .output( + Row.withSchema(ERROR_SCHEMA) + .addValues(filePath, checkStateNotNull(e.getMessage())) + .build()); + numErrorFiles.inc(); + return; + } // Figure out which partition this DataFile should go to String partitionPath; @@ -304,16 +332,23 @@ private static T transformValue(Transform transform, Type type, Obj return transform.bind(type).apply((W) value); } - private Table getOrCreateTable(org.apache.iceberg.Schema schema) { - PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, schema); + private Table getOrCreateTable(String filePath, FileFormat format) throws IOException { + TableIdentifier tableId = TableIdentifier.parse(identifier); try { - return tableProps == null - ? catalogConfig.catalog().createTable(TableIdentifier.parse(identifier), schema, spec) - : catalogConfig - .catalog() - .createTable(TableIdentifier.parse(identifier), schema, spec, tableProps); - } catch (AlreadyExistsException e) { // if table already exists, just load it - return catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); + return catalogConfig.catalog().loadTable(tableId); + } catch (NoSuchTableException e) { + try { + org.apache.iceberg.Schema schema = getSchema(filePath, format); + PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, schema); + + return tableProps == null + ? catalogConfig.catalog().createTable(TableIdentifier.parse(identifier), schema, spec) + : catalogConfig + .catalog() + .createTable(TableIdentifier.parse(identifier), schema, spec, tableProps); + } catch (AlreadyExistsException e2) { // if table already exists, just load it + return catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); + } } } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java index 287b9140e000..56ba36919e51 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AddFilesTest.java @@ -23,6 +23,7 @@ import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.hasEntry; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; @@ -422,8 +423,6 @@ public void testPartitionPrefixErrors() throws Exception { @Test public void testRecognizesBucketPartitionMismatch() throws IOException { - catalog.dropTable(tableId); - String file1 = root + "data1.parquet"; wrapper.wrap(record(-1, "And", 30)); DataWriter writer = createWriter(file1, wrapper.copy()); @@ -464,6 +463,30 @@ public void testRecognizesBucketPartitionMismatch() throws IOException { pipeline.run().waitUntilFinish(); } + @Test + public void testCatchFileNotFoundException() throws IOException { + String file = root + "non-existent.parquet"; + + PCollectionRowTuple outputTuple = + pipeline + .apply("Create Input", Create.of(file)) + .apply(new AddFiles(catalogConfig, tableId.toString(), null, null, null, 1, null)); + + PAssert.that(outputTuple.get("errors")) + .satisfies( + rows -> { + Row error = Iterables.getOnlyElement(rows); + String errorFile = error.getString("file"); + String message = error.getString("error"); + + assertEquals(file, errorFile); + assertThat(message, containsString("No files found")); + assertThat(message, containsString(errorFile)); + return null; + }); + pipeline.run().waitUntilFinish(); + } + @Test public void testGetPartitionFromMetrics() throws IOException, InterruptedException { PartitionSpec partitionSpec =