diff --git a/quickwit/quickwit-compaction/src/compaction_pipeline.rs b/quickwit/quickwit-compaction/src/compaction_pipeline.rs index 62bb040e0ba..021b3fa27c5 100644 --- a/quickwit/quickwit-compaction/src/compaction_pipeline.rs +++ b/quickwit/quickwit-compaction/src/compaction_pipeline.rs @@ -113,6 +113,10 @@ impl CompactionPipeline { } } + pub fn status(&self) -> &PipelineStatus { + &self.status + } + fn supervisables(&self) -> Vec<&dyn Supervisable> { let Some(handles) = &self.handles else { return Vec::new(); diff --git a/quickwit/quickwit-compaction/src/compactor_supervisor.rs b/quickwit/quickwit-compaction/src/compactor_supervisor.rs index e1b27eb4a30..292c861972c 100644 --- a/quickwit/quickwit-compaction/src/compactor_supervisor.rs +++ b/quickwit/quickwit-compaction/src/compactor_supervisor.rs @@ -12,22 +12,30 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::str::FromStr; +use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; -use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler}; +use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, SpawnContext}; use quickwit_common::io::Limiter; use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::TempDirectory; -use quickwit_indexing::IndexingSplitStore; +use quickwit_common::uri::Uri; +use quickwit_config::{IndexingSettings, RetentionPolicy, SearchSettings, build_doc_mapper}; +use quickwit_doc_mapper::DocMapping; +use quickwit_indexing::merge_policy::{MergeOperation, merge_policy_from_settings}; +use quickwit_indexing::{IndexingSplitCache, IndexingSplitStore}; +use quickwit_metastore::SplitMetadata; use quickwit_proto::compaction::{ - CompactionFailure, CompactionInProgress, CompactionPlannerServiceClient, CompactionSuccess, - ReportStatusRequest, + CompactionFailure, CompactionInProgress, CompactionPlannerService, + CompactionPlannerServiceClient, CompactionSuccess, MergeTaskAssignment, ReportStatusRequest, }; +use quickwit_proto::indexing::MergePipelineId; use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_proto::types::NodeId; use quickwit_storage::StorageResolver; -use tracing::info; +use tracing::{error, info, warn}; use crate::compaction_pipeline::{CompactionPipeline, PipelineStatus, PipelineStatusUpdate}; @@ -47,7 +55,7 @@ pub struct CompactorSupervisor { // Shared resources distributed to pipelines when spawning actor chains. io_throughput_limiter: Option, - split_store: IndexingSplitStore, + split_cache: Arc, metastore: MetastoreServiceClient, storage_resolver: StorageResolver, max_concurrent_split_uploads: usize, @@ -64,7 +72,7 @@ impl CompactorSupervisor { planner_client: CompactionPlannerServiceClient, num_pipeline_slots: usize, io_throughput_limiter: Option, - split_store: IndexingSplitStore, + split_cache: Arc, metastore: MetastoreServiceClient, storage_resolver: StorageResolver, max_concurrent_split_uploads: usize, @@ -77,7 +85,7 @@ impl CompactorSupervisor { planner_client, pipelines, io_throughput_limiter, - split_store, + split_cache, metastore, storage_resolver, max_concurrent_split_uploads, @@ -97,6 +105,100 @@ impl CompactorSupervisor { statuses } + async fn process_new_tasks( + &mut self, + assignments: Vec, + spawn_ctx: &SpawnContext, + ) { + for assignment in assignments { + let task_id = assignment.task_id.clone(); + if let Err(err) = self.spawn_task(assignment, spawn_ctx).await { + error!(task_id=%task_id, error=%err, "failed to spawn compaction task"); + } + } + } + + async fn spawn_task( + &mut self, + assignment: MergeTaskAssignment, + spawn_ctx: &SpawnContext, + ) -> anyhow::Result<()> { + let slot_idx = self + .pipelines + .iter() + .position(|slot| match slot { + None => true, + Some(p) => matches!( + p.status(), + PipelineStatus::Completed | PipelineStatus::Failed { .. } + ), + }) + .ok_or_else(|| anyhow::anyhow!("no free pipeline slot"))?; + let scratch_directory = self + .compaction_root_directory + .named_temp_child(&assignment.task_id)?; + let mut pipeline = self + .build_compaction_pipeline(assignment, scratch_directory) + .await?; + pipeline.spawn_pipeline(spawn_ctx)?; + self.pipelines[slot_idx] = Some(pipeline); + Ok(()) + } + + async fn build_compaction_pipeline( + &self, + assignment: MergeTaskAssignment, + scratch_directory: TempDirectory, + ) -> anyhow::Result { + let splits: Vec = assignment + .splits_metadata_json + .iter() + .map(|json| serde_json::from_str(json)) + .collect::, serde_json::Error>>()?; + let doc_mapping: DocMapping = serde_json::from_str(&assignment.doc_mapping_json)?; + let search_settings: SearchSettings = + serde_json::from_str(&assignment.search_settings_json)?; + let indexing_settings: IndexingSettings = + serde_json::from_str(&assignment.indexing_settings_json)?; + let retention_policy: Option = + if assignment.retention_policy_json.is_empty() { + None + } else { + Some(serde_json::from_str(&assignment.retention_policy_json)?) + }; + let index_uid = assignment + .index_uid + .ok_or_else(|| anyhow::anyhow!("missing index_uid in MergeTaskAssignment"))?; + + let index_storage_uri = Uri::from_str(&assignment.index_storage_uri)?; + let index_storage = self.storage_resolver.resolve(&index_storage_uri).await?; + let split_store = IndexingSplitStore::new(index_storage, self.split_cache.clone()); + + let doc_mapper = build_doc_mapper(&doc_mapping, &search_settings)?; + let merge_policy = merge_policy_from_settings(&indexing_settings); + let merge_operation = MergeOperation::new_merge_operation(splits); + let pipeline_id = MergePipelineId { + node_id: self.node_id.clone(), + index_uid, + source_id: assignment.source_id, + }; + + Ok(CompactionPipeline::new( + assignment.task_id, + scratch_directory, + merge_operation, + pipeline_id, + doc_mapper, + merge_policy, + retention_policy, + self.metastore.clone(), + split_store, + self.io_throughput_limiter.clone(), + self.max_concurrent_split_uploads, + self.event_broker.clone(), + )) + } + fn build_report_status_request( &self, statuses: &[PipelineStatusUpdate], @@ -176,8 +278,16 @@ impl Handler for CompactorSupervisor { ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { let statuses = self.check_pipeline_statuses(); - let _request = self.build_report_status_request(&statuses); - // TODO: send request to planner via gRPC, clear completed/failed slots on success. + let request = self.build_report_status_request(&statuses); + match self.planner_client.report_status(request).await { + Ok(response) => { + self.process_new_tasks(response.new_tasks, ctx.spawn_ctx()) + .await; + } + Err(err) => { + warn!(error=%err, "failed to report status to compaction planner"); + } + } ctx.schedule_self_msg(CHECK_PIPELINE_STATUSES_INTERVAL, CheckPipelineStatuses); Ok(()) } @@ -194,14 +304,12 @@ mod tests { }; use quickwit_proto::metastore::{MetastoreServiceClient, MockMetastoreService}; use quickwit_proto::types::NodeId; - use quickwit_storage::{RamStorage, StorageResolver}; + use quickwit_storage::StorageResolver; use super::*; use crate::compaction_pipeline::tests::test_pipeline; fn test_supervisor(num_slots: usize) -> CompactorSupervisor { - let storage = Arc::new(RamStorage::default()); - let split_store = IndexingSplitStore::create_without_local_store_for_test(storage); let metastore = MetastoreServiceClient::from_mock(MockMetastoreService::new()); let compaction_client = CompactionPlannerServiceClient::from_mock(MockCompactionPlannerService::new()); @@ -210,7 +318,7 @@ mod tests { compaction_client, num_slots, None, - split_store, + Arc::new(IndexingSplitCache::no_caching()), metastore, StorageResolver::for_test(), 2, @@ -282,6 +390,176 @@ mod tests { assert!(request.failures.is_empty()); } + fn test_assignment(task_id: &str, split_ids: &[&str]) -> MergeTaskAssignment { + let index_metadata = + quickwit_metastore::IndexMetadata::for_test("test-index", "ram:///test-index"); + let config = &index_metadata.index_config; + let splits: Vec = split_ids + .iter() + .map(|id| quickwit_metastore::SplitMetadata::for_test(id.to_string())) + .collect(); + MergeTaskAssignment { + task_id: task_id.to_string(), + splits_metadata_json: splits + .iter() + .map(|s| serde_json::to_string(s).unwrap()) + .collect(), + doc_mapping_json: serde_json::to_string(&config.doc_mapping).unwrap(), + search_settings_json: serde_json::to_string(&config.search_settings).unwrap(), + indexing_settings_json: serde_json::to_string(&config.indexing_settings).unwrap(), + retention_policy_json: String::new(), + index_uid: Some(index_metadata.index_uid.clone()), + source_id: "test-source".to_string(), + index_storage_uri: config.index_uri.to_string(), + } + } + + #[tokio::test] + async fn test_build_compaction_pipeline_deserialization_errors() { + let supervisor = test_supervisor(4); + let scratch = TempDirectory::for_test; + + // Bad splits JSON. + let mut assignment = test_assignment("t", &["s1"]); + assignment.splits_metadata_json = vec!["not json".to_string()]; + assert!( + supervisor + .build_compaction_pipeline(assignment, scratch()) + .await + .is_err() + ); + + // Bad doc mapping JSON. + let mut assignment = test_assignment("t", &["s1"]); + assignment.doc_mapping_json = "not json".to_string(); + assert!( + supervisor + .build_compaction_pipeline(assignment, scratch()) + .await + .is_err() + ); + + // Bad search settings JSON. + let mut assignment = test_assignment("t", &["s1"]); + assignment.search_settings_json = "not json".to_string(); + assert!( + supervisor + .build_compaction_pipeline(assignment, scratch()) + .await + .is_err() + ); + + // Bad indexing settings JSON. + let mut assignment = test_assignment("t", &["s1"]); + assignment.indexing_settings_json = "not json".to_string(); + assert!( + supervisor + .build_compaction_pipeline(assignment, scratch()) + .await + .is_err() + ); + + // Bad retention policy JSON (non-empty but invalid). + let mut assignment = test_assignment("t", &["s1"]); + assignment.retention_policy_json = "not json".to_string(); + assert!( + supervisor + .build_compaction_pipeline(assignment, scratch()) + .await + .is_err() + ); + + // Missing index_uid. + let mut assignment = test_assignment("t", &["s1"]); + assignment.index_uid = None; + assert!( + supervisor + .build_compaction_pipeline(assignment, scratch()) + .await + .is_err() + ); + } + + #[tokio::test] + async fn test_spawn_task_fails_when_all_slots_occupied() { + let universe = Universe::new(); + let mut supervisor = test_supervisor(2); + + supervisor + .spawn_task(test_assignment("task-1", &["s1"]), universe.spawn_ctx()) + .await + .unwrap(); + supervisor + .spawn_task(test_assignment("task-2", &["s2"]), universe.spawn_ctx()) + .await + .unwrap(); + + // Both slots InProgress — no room. + assert!( + supervisor + .spawn_task(test_assignment("task-3", &["s3"]), universe.spawn_ctx()) + .await + .is_err() + ); + + universe.assert_quit().await; + } + + #[tokio::test] + async fn test_end_to_end_report_status_and_spawn() { + let universe = Universe::new(); + + // Mock planner that returns one assignment on the first call. + let assignment = test_assignment("planner-task-1", &["s1", "s2"]); + let assignments = vec![assignment]; + let assignments_clone = assignments.clone(); + let mut mock = quickwit_proto::compaction::MockCompactionPlannerService::new(); + mock.expect_report_status().times(1).returning(move |_req| { + Ok(quickwit_proto::compaction::ReportStatusResponse { + new_tasks: assignments_clone.clone(), + }) + }); + + let metastore = MetastoreServiceClient::from_mock(MockMetastoreService::new()); + let client = CompactionPlannerServiceClient::from_mock(mock); + let mut supervisor = CompactorSupervisor::new( + NodeId::from("test-node"), + client, + 3, + None, + Arc::new(IndexingSplitCache::no_caching()), + metastore, + StorageResolver::for_test(), + 2, + EventBroker::default(), + TempDirectory::for_test(), + ); + + // Simulate what the handler does: collect statuses, report, process response. + let statuses = supervisor.check_pipeline_statuses(); + let request = supervisor.build_report_status_request(&statuses); + assert_eq!(request.available_slots, 3); + + let response = supervisor + .planner_client + .report_status(request) + .await + .unwrap(); + supervisor + .process_new_tasks(response.new_tasks, universe.spawn_ctx()) + .await; + + // Verify the pipeline was spawned. + let statuses = supervisor.check_pipeline_statuses(); + let request = supervisor.build_report_status_request(&statuses); + assert_eq!(request.in_progress.len(), 1); + assert_eq!(request.in_progress[0].task_id, "planner-task-1"); + assert_eq!(request.in_progress[0].split_ids.len(), 2); + assert_eq!(request.available_slots, 2); + + universe.assert_quit().await; + } + #[test] fn test_build_report_status_request_mixed_statuses() { let supervisor = test_supervisor(4); diff --git a/quickwit/quickwit-compaction/src/lib.rs b/quickwit/quickwit-compaction/src/lib.rs index 05f79d83f73..98e5fc9a3d1 100644 --- a/quickwit/quickwit-compaction/src/lib.rs +++ b/quickwit/quickwit-compaction/src/lib.rs @@ -20,13 +20,15 @@ mod compaction_pipeline; mod compactor_supervisor; pub mod planner; +use std::sync::Arc; + pub use compactor_supervisor::CompactorSupervisor; use quickwit_actors::{Mailbox, Universe}; use quickwit_common::io; use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::TempDirectory; use quickwit_config::CompactorConfig; -use quickwit_indexing::IndexingSplitStore; +use quickwit_indexing::IndexingSplitCache; use quickwit_proto::compaction::CompactionPlannerServiceClient; use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_proto::types::NodeId; @@ -39,7 +41,7 @@ pub async fn start_compactor_service( node_id: NodeId, compaction_client: CompactionPlannerServiceClient, compactor_config: &CompactorConfig, - split_store: IndexingSplitStore, + split_cache: Arc, metastore: MetastoreServiceClient, storage_resolver: StorageResolver, event_broker: EventBroker, @@ -53,7 +55,7 @@ pub async fn start_compactor_service( compaction_client, compactor_config.max_concurrent_pipelines.get(), io_throughput_limiter, - split_store, + split_cache, metastore, storage_resolver, compactor_config.max_concurrent_split_uploads, diff --git a/quickwit/quickwit-indexing/src/lib.rs b/quickwit/quickwit-indexing/src/lib.rs index 8adde285bf3..66c50d4d150 100644 --- a/quickwit/quickwit-indexing/src/lib.rs +++ b/quickwit/quickwit-indexing/src/lib.rs @@ -31,7 +31,9 @@ pub use crate::actors::{ }; pub use crate::controlled_directory::ControlledDirectory; use crate::models::IndexingStatistics; -pub use crate::split_store::{IndexingSplitStore, get_tantivy_directory_from_split_bundle}; +pub use crate::split_store::{ + IndexingSplitCache, IndexingSplitStore, get_tantivy_directory_from_split_bundle, +}; pub mod actors; mod controlled_directory; diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index bc6db8835e0..4c2e8d8499c 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -81,7 +81,7 @@ use quickwit_control_plane::{IndexerNodeInfo, IndexerPool}; use quickwit_index_management::{IndexService as IndexManager, IndexServiceError}; use quickwit_indexing::actors::{IndexingService, MergeSchedulerService}; use quickwit_indexing::models::ShardPositionsService; -use quickwit_indexing::{IndexingSplitStore, start_indexing_service}; +use quickwit_indexing::start_indexing_service; use quickwit_ingest::{ GetMemoryCapacity, IngestRequest, IngestRouter, IngestServiceClient, Ingester, IngesterPool, IngesterPoolEntry, LocalShardsUpdate, get_idle_shard_timeout, @@ -114,7 +114,7 @@ use quickwit_search::{ SearchJobPlacer, SearchService, SearchServiceClient, SearcherContext, SearcherPool, create_search_client_from_channel, start_searcher_service, }; -use quickwit_storage::{RamStorage, SplitCache, StorageResolver}; +use quickwit_storage::{SplitCache, StorageResolver}; use tcp_listener::TcpListenerResolver; use tokio::sync::oneshot; use tonic::codec::CompressionEncoding; @@ -795,10 +795,7 @@ pub async fn serve_quickwit( let compaction_root_directory = quickwit_common::temp_dir::Builder::default() .tempdir_in(&compaction_dir) .context("failed to create compaction temp directory")?; - // TODO: Real split store - let split_store = IndexingSplitStore::create_without_local_store_for_test(Arc::new( - RamStorage::default(), - )); + let split_cache = Arc::new(quickwit_indexing::IndexingSplitCache::no_caching()); let compaction_client = compaction_service_client_opt .clone() .expect("compactor service enabled but no compaction client available"); @@ -807,7 +804,7 @@ pub async fn serve_quickwit( cluster.self_node_id().into(), compaction_client, &node_config.compactor_config, - split_store, + split_cache, metastore_client.clone(), storage_resolver.clone(), event_broker.clone(),