Skip to content

Cache parquet pruning setup across files with the same physical schema #21554

@alamb

Description

@alamb

Is your feature request related to a problem or challenge?

In the parquet opener, DataFusion currently does per-file schema adaptation and pruning setup, including predicate rewrites and pruning predicate construction:

  • // Adapt the projection & filter predicate to the physical file schema.
    // This evaluates missing columns and inserts any necessary casts.
    // After rewriting to the file schema, further simplifications may be possible.
    // For example, if `'a' = col_that_is_missing` becomes `'a' = NULL` that can then be simplified to `FALSE`
    // and we can avoid doing any more work on the file (bloom filters, loading the page index, etc.).
    // Additionally, if any casts were inserted we can move casts from the column to the literal side:
    // `CAST(col AS INT) = 5` can become `col = CAST(5 AS <col type>)`, which can be evaluated statically.
    //
    // When the schemas are identical and there is no predicate, the
    // rewriter is a no-op: column indices already match (partition
    // columns are appended after file columns in the table schema),
    // types are the same, and there are no missing columns. Skip the
    // tree walk entirely in that case.
    let needs_rewrite = prepared.predicate.is_some()
    || prepared.logical_file_schema != physical_file_schema;
    if needs_rewrite {
    let rewriter = prepared.expr_adapter_factory.create(
    Arc::clone(&prepared.logical_file_schema),
    Arc::clone(&physical_file_schema),
    )?;
    let simplifier = PhysicalExprSimplifier::new(&physical_file_schema);
    prepared.predicate = prepared
    .predicate
    .map(|p| simplifier.simplify(rewriter.rewrite(p)?))
    .transpose()?;
    prepared.projection = prepared
    .projection
    .try_map_exprs(|p| simplifier.simplify(rewriter.rewrite(p)?))?;
    }
    prepared.physical_file_schema = Arc::clone(&physical_file_schema);
    // Build predicates for this specific file
    let pruning_predicate = build_pruning_predicates(
    prepared.predicate.as_ref(),
    &physical_file_schema,
    &prepared.predicate_creation_errors,
    );
    // Only build page pruning predicate if page index is enabled
    let page_pruning_predicate = if prepared.enable_page_index {
    prepared.predicate.as_ref().and_then(|predicate| {
    let p = build_page_pruning_predicate(predicate, &physical_file_schema);
    (p.filter_number() > 0).then_some(p)
    })
    } else {
    None
  • ) -> Option<Arc<PruningPredicate>> {
    let predicate = predicate.as_ref()?;
    build_pruning_predicate(
    Arc::clone(predicate),
    file_schema,
    predicate_creation_errors,
    )
    }
    /// Returns a `ArrowReaderMetadata` with the page index loaded, loading
    /// it from the underlying `AsyncFileReader` if necessary.
    async fn load_page_index<T: AsyncFileReader>(
    reader_metadata: ArrowReaderMetadata,
    input: &mut T,
    options: ArrowReaderOptions,
    ) -> Result<ArrowReaderMetadata> {
    let parquet_metadata = reader_metadata.metadata();
    let missing_column_index = parquet_metadata.column_index().is_none();
    let missing_offset_index = parquet_metadata.offset_index().is_none();
    // You may ask yourself: why are we even checking if the page index is already loaded here?
    // Didn't we explicitly *not* load it above?
    // Well it's possible that a custom implementation of `AsyncFileReader` gives you
    // the page index even if you didn't ask for it (e.g. because it's cached)
    // so it's important to check that here to avoid extra work.
    if missing_column_index || missing_offset_index {

As @adriangb noted on #21480 (comment), many deployments only have a small number of physical schemas, often just one, so repeating the same work across many files is wasteful.

PR #21480 from @fpetkovski improved this area by avoiding page pruning predicate construction unless page indexes are enabled, but we can do better and cache equivalent pruning setup across files with the same physical schema.

Describe the solution you'd like

Cache parquet pruning setup across files when the physical schema and other correctness-relevant inputs are the same.

This likely includes:

  • expression/schema rewrite results
  • normal and pruning predicate construction

Describe alternatives you've considered

Do nothing

Additional context

Relevant links:

  • Tracking comment from @adriangb:
    Conditionally build page pruning predicates #21480 (comment)
  • Original PR from @fpetkovski:
    Conditionally build page pruning predicates #21480
  • Page index loading / page pruning setup:
    prepared,
    reader_metadata,
    options,
    },
    pruning_predicate,
    page_pruning_predicate,
    })
    }
    }
    impl FiltersPreparedParquetOpen {
    /// Load the page index if pruning requires it and metadata did not include it.
    async fn load_page_index(mut self) -> Result<Self> {
    // The page index is not stored inline in the parquet footer so the
    // metadata load above may not have read the page index structures yet.
    // If we need them for reading and they aren't yet loaded, we need to
    // load them now.
    if self.page_pruning_predicate.is_some() {
    self.loaded.reader_metadata = load_page_index(
    self.loaded.reader_metadata,
    &mut self.loaded.prepared.async_file_reader,
    self.loaded
    .options
    .clone()
    .with_page_index_policy(PageIndexPolicy::Optional),
    )
    .await?;
    }
    Ok(self)
    }
    /// Prune row groups using file ranges and parquet metadata.
    fn prune_row_groups(self) -> Result<RowGroupsPrunedParquetOpen> {
    let loaded = &self.loaded;
    let prepared = &loaded.prepared;
    let file_metadata = Arc::clone(loaded.reader_metadata.metadata());
    let rg_metadata = file_metadata.row_groups();
    // Determine which row groups to actually read. The idea is to skip
    // as many row groups as possible based on the metadata and query
    let mut row_groups = RowGroupAccessPlanFilter::new(create_initial_plan(
    &prepared.file_name,
    prepared.extensions.clone(),
    rg_metadata.len(),
    )?);

Metadata

Metadata

Assignees

No one assigned

    Labels

    performanceMake DataFusion faster

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions