Skip to content

Add IO runtime support to IcebergPartitionedScan#3

Open
toutane wants to merge 4 commits intocharlesantoine.leger/partitioned-scanfrom
charlesantoine.leger/support-io-runtime
Open

Add IO runtime support to IcebergPartitionedScan#3
toutane wants to merge 4 commits intocharlesantoine.leger/partitioned-scanfrom
charlesantoine.leger/support-io-runtime

Conversation

@toutane
Copy link
Copy Markdown

@toutane toutane commented Apr 2, 2026

Summary

This PR implements Option 3 as described in this document and is linked to Jira ticket QECO-1015.

The upstream PR apache#2308 is also related to this feature and would replace the proposed implementation when merged if relevant.

Adds optional IO runtime segregation to IcebergPartitionedScan and IcebergPartitionedTableProvider, mirroring the IOExec pattern from dd-datafusion.

What

IcebergPartitionedScan

  • New optional io_handle: Option<tokio::runtime::Handle> field, set via with_io_handle(Handle).
  • When set, execute() spawns the Parquet read (opendal) on the IO runtime and bridges results back to the CPU runtime via an mpsc::channel + ChannelRecordBatchStream.

IcebergPartitionedTableProvider

  • Same optional handle, set via with_io_handle(Handle).
  • When set, scan()'s network I/O (load_table + plan_files) is spawned on the IO runtime via Handle::spawn(...).await, preventing DNS/HTTP calls from running on the CPU runtime during DataFusion's physical planning phase.

Why

When DataFusion runs on a dedicated CPU runtime, opendal and iceberg async I/O must not execute on its threads,doing so causes latency spikes and, in some configurations, a panic (nested block_on). This is the same problem solved by dd-datafusion's IOExec wrapper, and this implementation mirrors it directly.

Design notes

  • Both types default to io_handle: None; the feature is opt-in via the builder method, no behavior change for existing callers.
  • CHANNEL_BUFFER_SIZE = 32 provides backpressure between runtimes (mirrors dd-datafusion's IOExec).
  • The JoinHandle from the spawned Parquet-read task is intentionally dropped (detached). Errors are forwarded via the channel; if the task panics, tx is dropped and the consumer sees end-of-stream. This matches dd-datafusion's IOExec behaviour.
  • fetch_tasks is extracted as a static method so it can be passed to Handle::spawn without capturing &self.

@toutane toutane marked this pull request as ready for review April 7, 2026 08:52
@toutane toutane changed the title iceberg-datafusion: add IO runtime support to IcebergPartitionedScan Add IO runtime support to IcebergPartitionedScan Apr 7, 2026
Copy link
Copy Markdown
Member

@notfilippo notfilippo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks very good! great job

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants