diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 98f61a8528aa..f85f15a6d8c6 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -54,7 +54,15 @@ impl TableProviderFactory for ListingTableFactory { cmd: &CreateExternalTable, ) -> Result> { // TODO (https://github.com/apache/datafusion/issues/11600) remove downcast_ref from here. Should file format factory be an extension to session state? - let session_state = state.as_any().downcast_ref::().unwrap(); + let session_state = + state + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion_common::internal_datafusion_err!( + "ListingTableFactory requires SessionState" + ) + })?; let file_format = session_state .get_file_format_factory(cmd.file_type.as_str()) .ok_or(config_datafusion_err!( @@ -546,4 +554,103 @@ mod tests { "Statistics cache should not be pre-warmed when collect_statistics is disabled" ); } + + #[tokio::test] + async fn test_create_with_invalid_session() { + use async_trait::async_trait; + use datafusion_catalog::Session; + use datafusion_common::Result; + use datafusion_common::config::TableOptions; + use datafusion_execution::TaskContext; + use datafusion_execution::config::SessionConfig; + use datafusion_physical_expr::PhysicalExpr; + use datafusion_physical_plan::ExecutionPlan; + use std::any::Any; + use std::collections::HashMap; + use std::sync::Arc; + + // A mock Session that is NOT SessionState + #[derive(Debug)] + struct MockSession; + + #[async_trait] + impl Session for MockSession { + fn session_id(&self) -> &str { + "mock_session" + } + fn config(&self) -> &SessionConfig { + unimplemented!() + } + async fn create_physical_plan( + &self, + _logical_plan: &datafusion_expr::LogicalPlan, + ) -> Result> { + unimplemented!() + } + fn create_physical_expr( + &self, + _expr: datafusion_expr::Expr, + _df_schema: &DFSchema, + ) -> Result> { + unimplemented!() + } + fn scalar_functions( + &self, + ) -> &HashMap> { + unimplemented!() + } + fn aggregate_functions( + &self, + ) -> &HashMap> { + unimplemented!() + } + fn window_functions( + &self, + ) -> &HashMap> { + unimplemented!() + } + fn runtime_env(&self) -> &Arc { + unimplemented!() + } + fn execution_props( + &self, + ) -> &datafusion_expr::execution_props::ExecutionProps { + unimplemented!() + } + fn as_any(&self) -> &dyn Any { + self + } + fn table_options(&self) -> &TableOptions { + unimplemented!() + } + fn table_options_mut(&mut self) -> &mut TableOptions { + unimplemented!() + } + fn task_ctx(&self) -> Arc { + unimplemented!() + } + } + + let factory = ListingTableFactory::new(); + let mock_session = MockSession; + + let name = TableReference::bare("foo"); + let cmd = CreateExternalTable::builder( + name, + "foo.csv".to_string(), + "csv", + Arc::new(DFSchema::empty()), + ) + .build(); + + // This should return an error, not panic + let result = factory.create(&mock_session, &cmd).await; + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .strip_backtrace() + .contains("Internal error: ListingTableFactory requires SessionState") + ); + } }