diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index c8bf26a174..988bbf3ebc 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -19,9 +19,11 @@ use std::collections::{HashMap, HashSet}; use std::future::Future; use std::ops::RangeFrom; +use futures::{StreamExt, TryStreamExt}; use uuid::Uuid; use crate::error::Result; +use crate::runtime::spawn; use crate::spec::{ DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation, Snapshot, @@ -175,17 +177,24 @@ impl<'a> SnapshotProducer<'a> { let manifest_list = current_snapshot .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) .await?; - for manifest_list_entry in manifest_list.entries() { - let manifest = manifest_list_entry - .load_manifest(self.table.file_io()) - .await?; - for entry in manifest.entries() { - let file_path = entry.file_path(); - if new_files.contains(file_path) && entry.is_alive() { - referenced_files.push(file_path.to_string()); + + let entries: Vec<_> = manifest_list.consume_entries().into_iter().collect(); + futures::stream::iter(entries) + .map(|entry| { + let file_io = self.table.file_io().clone(); + spawn(async move { entry.load_manifest(&file_io).await }) + }) + .buffer_unordered(32) + .try_for_each(|manifest| { + for entry in manifest.entries() { + let file_path = entry.file_path(); + if new_files.contains(file_path) && entry.is_alive() { + referenced_files.push(file_path.to_string()); + } } - } - } + std::future::ready(Ok(())) + }) + .await?; } if !referenced_files.is_empty() {