Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions crates/catalog/s3tables/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ iceberg-storage-opendal = { workspace = true, features = ["opendal-s3"] }


[dev-dependencies]
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
futures = { workspace = true }
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
itertools = { workspace = true }
parquet = { workspace = true }
tokio = { workspace = true }
uuid = { workspace = true }
105 changes: 105 additions & 0 deletions crates/catalog/s3tables/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,7 @@ where T: std::fmt::Debug {

#[cfg(test)]
mod tests {
use futures::TryStreamExt;
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg::transaction::{ApplyTransactionAction, Transaction};

Expand Down Expand Up @@ -1175,4 +1176,108 @@ mod tests {
assert_eq!(err.message(), "Catalog name cannot be empty");
}
}

/// Verify that an S3 Table catalog can create a table, write data, load the same table, and read from it.
#[tokio::test]
async fn test_s3tables_create_table_write_load_table_read() {
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
use iceberg::writer::file_writer::ParquetWriterBuilder;
use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};

let catalog = match load_s3tables_catalog_from_env().await {
Ok(Some(c)) => c,
Ok(None) => return,
Err(e) => panic!("Error loading catalog: {e}"),
};

let ns = NamespaceIdent::new(format!("test_rw_{}", uuid::Uuid::new_v4().simple()));
catalog.create_namespace(&ns, HashMap::new()).await.unwrap();

let table_name = String::from("table");

let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
])
.build()
.unwrap();
let creation = TableCreation::builder()
.name(table_name.clone())
.schema(schema)
.build();

let table = catalog.create_table(&ns, creation).await.unwrap();

// Write one row.
let arrow_schema: Arc<arrow_schema::Schema> = Arc::new(
table
.metadata()
.current_schema()
.as_ref()
.try_into()
.unwrap(),
);
let batch = arrow_array::RecordBatch::try_new(arrow_schema, vec![Arc::new(
arrow_array::Int32Array::from(vec![42]),
)])
.unwrap();

// Locations will be generated based on the table metadata, which will be using `s3://` for Amazon S3 Tables.
let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
let file_name_generator = DefaultFileNameGenerator::new(
"test".to_string(),
None,
iceberg::spec::DataFileFormat::Parquet,
);
let parquet_writer_builder = ParquetWriterBuilder::new(
parquet::file::properties::WriterProperties::default(),
table.metadata().current_schema().clone(),
);
let rw = RollingFileWriterBuilder::new_with_default_file_size(
parquet_writer_builder,
table.file_io().clone(),
location_generator,
file_name_generator,
);
let mut writer = DataFileWriterBuilder::new(rw).build(None).await.unwrap();
writer.write(batch.clone()).await.unwrap();
let data_files = writer.close().await.unwrap();

let tx = Transaction::new(&table);
let tx = tx
.fast_append()
.add_data_files(data_files)
.apply(tx)
.unwrap();
tx.commit(&catalog).await.unwrap();

// Reload from catalog and read back.
let table_ident = TableIdent::new(ns.clone(), table_name.clone());
let reloaded = catalog.load_table(&table_ident).await.unwrap();
let batches: Vec<arrow_array::RecordBatch> = reloaded
.scan()
.select_all()
.build()
.expect("scan to be valid (snapshot exists, schema is OK)")
.to_arrow()
.await
.expect("scan tasks should be OK")
.try_collect()
.await
.expect("scan should complete successfully");

assert_eq!(batches.len(), 1);
assert_eq!(
batches[0], batch,
"read records should match records written earlier"
);

// Clean up.
catalog.purge_table(&table_ident).await.ok();
catalog.drop_namespace(&ns).await.ok();
}
}
Loading