Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 1
"modification": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.sdk.values.PCollection.IsBounded.BOUNDED;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
Expand Down Expand Up @@ -81,6 +82,7 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.MappingUtil;
Expand Down Expand Up @@ -240,7 +242,18 @@ public void process(@Element String filePath, MultiOutputReceiver output)
}

if (table == null) {
table = getOrCreateTable(getSchema(filePath, format));
try {
table = getOrCreateTable(filePath, format);
} catch (FileNotFoundException e) {
output
.get(ERRORS)
.output(
Row.withSchema(ERROR_SCHEMA)
.addValues(filePath, checkStateNotNull(e.getMessage()))
.build());
numErrorFiles.inc();
return;
}
}

// Check if the file path contains the provided prefix
Expand All @@ -256,9 +269,24 @@ public void process(@Element String filePath, MultiOutputReceiver output)

InputFile inputFile = table.io().newInputFile(filePath);

Metrics metrics =
getFileMetrics(
inputFile, format, MetricsConfig.forTable(table), MappingUtil.create(table.schema()));
Metrics metrics;
try {
metrics =
getFileMetrics(
inputFile,
format,
MetricsConfig.forTable(table),
MappingUtil.create(table.schema()));
} catch (FileNotFoundException e) {
output
.get(ERRORS)
.output(
Row.withSchema(ERROR_SCHEMA)
.addValues(filePath, checkStateNotNull(e.getMessage()))
.build());
numErrorFiles.inc();
return;
}

// Figure out which partition this DataFile should go to
String partitionPath;
Expand Down Expand Up @@ -304,16 +332,23 @@ private static <W, T> T transformValue(Transform<W, T> transform, Type type, Obj
return transform.bind(type).apply((W) value);
}

private Table getOrCreateTable(org.apache.iceberg.Schema schema) {
PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, schema);
private Table getOrCreateTable(String filePath, FileFormat format) throws IOException {
TableIdentifier tableId = TableIdentifier.parse(identifier);
try {
return tableProps == null
? catalogConfig.catalog().createTable(TableIdentifier.parse(identifier), schema, spec)
: catalogConfig
.catalog()
.createTable(TableIdentifier.parse(identifier), schema, spec, tableProps);
} catch (AlreadyExistsException e) { // if table already exists, just load it
return catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
return catalogConfig.catalog().loadTable(tableId);
} catch (NoSuchTableException e) {
try {
org.apache.iceberg.Schema schema = getSchema(filePath, format);
PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, schema);

return tableProps == null
? catalogConfig.catalog().createTable(TableIdentifier.parse(identifier), schema, spec)
: catalogConfig
.catalog()
.createTable(TableIdentifier.parse(identifier), schema, spec, tableProps);
} catch (AlreadyExistsException e2) { // if table already exists, just load it
return catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasEntry;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
Expand Down Expand Up @@ -422,8 +423,6 @@ public void testPartitionPrefixErrors() throws Exception {

@Test
public void testRecognizesBucketPartitionMismatch() throws IOException {
catalog.dropTable(tableId);

String file1 = root + "data1.parquet";
wrapper.wrap(record(-1, "And", 30));
DataWriter<Record> writer = createWriter(file1, wrapper.copy());
Expand Down Expand Up @@ -464,6 +463,30 @@ public void testRecognizesBucketPartitionMismatch() throws IOException {
pipeline.run().waitUntilFinish();
}

@Test
public void testCatchFileNotFoundException() throws IOException {
String file = root + "non-existent.parquet";

PCollectionRowTuple outputTuple =
pipeline
.apply("Create Input", Create.of(file))
.apply(new AddFiles(catalogConfig, tableId.toString(), null, null, null, 1, null));

PAssert.that(outputTuple.get("errors"))
.satisfies(
rows -> {
Row error = Iterables.getOnlyElement(rows);
String errorFile = error.getString("file");
String message = error.getString("error");

assertEquals(file, errorFile);
assertThat(message, containsString("No files found"));
assertThat(message, containsString(errorFile));
return null;
});
pipeline.run().waitUntilFinish();
}

@Test
public void testGetPartitionFromMetrics() throws IOException, InterruptedException {
PartitionSpec partitionSpec =
Expand Down
Loading