Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions be/src/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,14 @@ Status ParquetReader::_next_row_group_reader() {
// process page index and generate the ranges to read
auto& row_group = _t_metadata->row_groups[_current_row_group_index.row_group_id];

// Lazily load position delete rows now that we know at least one row group
// survived min/max + bloom filter filtering. This avoids reading delete files
// when all row groups are filtered out (matching Spark/Iceberg's lazy behavior).
if (_position_delete_loader) {
RETURN_IF_ERROR(_position_delete_loader());
_position_delete_loader = nullptr;
}

RowGroupReader::PositionDeleteContext position_delete_ctx =
_get_position_delete_ctx(row_group, _current_row_group_index);
io::FileReaderSPtr group_file_reader;
Expand Down
17 changes: 13 additions & 4 deletions be/src/format/parquet/vparquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include <stddef.h>
#include <stdint.h>

#include <list>
#include <functional>
#include <memory>
#include <string>
#include <tuple>
Expand All @@ -32,7 +32,6 @@
#include "common/status.h"
#include "format/generic_reader.h"
#include "format/parquet/parquet_common.h"
#include "format/parquet/parquet_predicate.h"
#include "format/parquet/vparquet_column_reader.h"
#include "format/parquet/vparquet_group_reader.h"
#include "format/table/table_format_reader.h"
Expand All @@ -41,7 +40,6 @@
#include "io/fs/file_reader.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "runtime/runtime_profile.h"
#include "storage/olap_scan_common.h"
#include "util/obj_lru_cache.h"

namespace cctz {
Expand Down Expand Up @@ -144,6 +142,14 @@ class ParquetReader : public GenericReader {
// set the delete rows in current parquet file
void set_delete_rows(const std::vector<int64_t>* delete_rows) { _delete_rows = delete_rows; }

// Set a lazy loader for position delete rows. The loader will be called once
// when the first row group survives min/max+bloom filter filtering, right before
// the delete rows are needed. This avoids reading delete files when all row groups
// are filtered out.
void set_position_delete_loader(std::function<Status()> loader) {
_position_delete_loader = std::move(loader);
}

int64_t size() const { return _file_reader->size(); }

Status get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
Expand Down Expand Up @@ -185,7 +191,8 @@ class ParquetReader : public GenericReader {
int64_t get_total_rows() const override;

bool has_delete_operations() const override {
return _delete_rows != nullptr && !_delete_rows->empty();
return (_delete_rows != nullptr && !_delete_rows->empty()) ||
_position_delete_loader != nullptr;
}

protected:
Expand Down Expand Up @@ -334,6 +341,8 @@ class ParquetReader : public GenericReader {
// Deleted rows will be marked by Iceberg/Paimon. So we should filter deleted rows when reading it.
const std::vector<int64_t>* _delete_rows = nullptr;
int64_t _delete_rows_index = 0;
// Lazy loader for position delete rows. Called once before first use.
std::function<Status()> _position_delete_loader;

// Used for column lazy read.
RowGroupReader::LazyReadContext _lazy_read_ctx;
Expand Down
45 changes: 38 additions & 7 deletions be/src/format/table/iceberg_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,24 @@ Status IcebergTableReader::get_next_block_inner(Block* block, size_t* read_rows,
return _shrink_block_if_need(block);
}

Status IcebergTableReader::_execute_or_defer_delete_loading(std::function<Status()> loader,
size_t num_files) {
if (_file_format == Fileformat::PARQUET) {
auto* parquet_reader = static_cast<ParquetReader*>(_file_format_reader.get());
parquet_reader->set_position_delete_loader(
[this, loader = std::move(loader), num_files]() -> Status {
RETURN_IF_ERROR(loader());
COUNTER_UPDATE(_iceberg_profile.num_delete_files, num_files);
return Status::OK();
});
_has_deferred_delete_files = true;
} else {
RETURN_IF_ERROR(loader());
COUNTER_UPDATE(_iceberg_profile.num_delete_files, num_files);
}
return Status::OK();
}

Status IcebergTableReader::init_row_filters() {
// We get the count value by doris's be, so we don't need to read the delete file
if (_push_down_agg_type == TPushAggOp::type::COUNT && _table_level_row_count > 0) {
Expand Down Expand Up @@ -249,11 +267,17 @@ Status IcebergTableReader::init_row_filters() {
}
}

// Equality deletes must be loaded eagerly: they determine which additional
// columns are needed for the data file read schema.
if (!equality_delete_files.empty()) {
RETURN_IF_ERROR(_process_equality_delete(equality_delete_files));
_file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
COUNTER_UPDATE(_iceberg_profile.num_delete_files, equality_delete_files.size());
}

// Position deletes and deletion vectors are deferred for Parquet: they are
// only loaded when the first row group survives min/max + bloom filter
// filtering. This avoids wasting I/O when all row groups are filtered out.
if (!deletion_vector_files.empty()) {
if (deletion_vector_files.size() != 1) [[unlikely]] {
/*
Expand All @@ -263,18 +287,25 @@ Status IcebergTableReader::init_row_filters() {
*/
return Status::DataQualityError("This iceberg data file has multiple DVs.");
}
RETURN_IF_ERROR(
read_deletion_vector(table_desc.original_file_path, deletion_vector_files[0]));

auto data_file_path = table_desc.original_file_path;
auto dv_file = deletion_vector_files[0];
RETURN_IF_ERROR(_execute_or_defer_delete_loading(
[this, data_file_path = std::move(data_file_path), dv_file = std::move(dv_file)]()
-> Status { return read_deletion_vector(data_file_path, dv_file); },
deletion_vector_files.size()));
_file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
// Readers can safely ignore position delete files if there is a DV for a data file.
} else if (!position_delete_files.empty()) {
RETURN_IF_ERROR(
_position_delete_base(table_desc.original_file_path, position_delete_files));
auto data_file_path = table_desc.original_file_path;
auto num_files = position_delete_files.size();
RETURN_IF_ERROR(_execute_or_defer_delete_loading(
[this, data_file_path = std::move(data_file_path),
pos_delete_files = std::move(position_delete_files)]() mutable -> Status {
return _position_delete_base(data_file_path, pos_delete_files);
},
num_files));
_file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
}

COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size());
return Status::OK();
}

Expand Down
15 changes: 14 additions & 1 deletion be/src/format/table/iceberg_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <cstddef>
#include <cstdint>
#include <functional>
#include <string>
#include <unordered_map>
#include <unordered_set>
Expand Down Expand Up @@ -95,7 +96,8 @@ class IcebergTableReader : public TableFormatReader, public TableSchemaChangeHel
virtual void set_delete_rows() = 0;

bool has_delete_operations() const override {
return _equality_delete_impls.size() > 0 || TableFormatReader::has_delete_operations();
return _equality_delete_impls.size() > 0 || _has_deferred_delete_files ||
TableFormatReader::has_delete_operations();
}

Status read_deletion_vector(const std::string& data_file_path,
Expand Down Expand Up @@ -142,6 +144,13 @@ class IcebergTableReader : public TableFormatReader, public TableSchemaChangeHel
void _generate_equality_delete_block(Block* block,
const std::vector<std::string>& equality_delete_col_names,
const std::vector<DataTypePtr>& equality_delete_col_types);

// For Parquet format, defers the delete file loading to be triggered lazily
// when the first row group survives min/max + bloom filter filtering.
// For other formats, executes the loader immediately.
// In both cases, updates the num_delete_files profile counter accordingly.
Status _execute_or_defer_delete_loading(std::function<Status()> loader, size_t num_files);

// Equality delete should read the primary columns. Add the missing columns
Status _expand_block_if_need(Block* block);
// Remove the added delete columns
Expand Down Expand Up @@ -185,6 +194,10 @@ class IcebergTableReader : public TableFormatReader, public TableSchemaChangeHel
std::unordered_map<int, std::string> _id_to_block_column_name;

std::shared_ptr<RowLineageColumns> _row_lineage_columns;

// Whether position delete or deletion vector files are deferred for lazy loading.
// Used by has_delete_operations() to correctly report delete state before loading.
bool _has_deferred_delete_files = false;
};

class IcebergParquetReader final : public IcebergTableReader {
Expand Down
Loading