Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions quickwit/quickwit-compaction/src/compaction_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ impl CompactionPipeline {
}
}

pub fn status(&self) -> &PipelineStatus {
&self.status
}

fn supervisables(&self) -> Vec<&dyn Supervisable> {
let Some(handles) = &self.handles else {
return Vec::new();
Expand Down
306 changes: 292 additions & 14 deletions quickwit/quickwit-compaction/src/compactor_supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,30 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler};
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, SpawnContext};
use quickwit_common::io::Limiter;
use quickwit_common::pubsub::EventBroker;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_indexing::IndexingSplitStore;
use quickwit_common::uri::Uri;
use quickwit_config::{IndexingSettings, RetentionPolicy, SearchSettings, build_doc_mapper};
use quickwit_doc_mapper::DocMapping;
use quickwit_indexing::merge_policy::{MergeOperation, merge_policy_from_settings};
use quickwit_indexing::{IndexingSplitCache, IndexingSplitStore};
use quickwit_metastore::SplitMetadata;
use quickwit_proto::compaction::{
CompactionFailure, CompactionInProgress, CompactionPlannerServiceClient, CompactionSuccess,
ReportStatusRequest,
CompactionFailure, CompactionInProgress, CompactionPlannerService,
CompactionPlannerServiceClient, CompactionSuccess, MergeTaskAssignment, ReportStatusRequest,
};
use quickwit_proto::indexing::MergePipelineId;
use quickwit_proto::metastore::MetastoreServiceClient;
use quickwit_proto::types::NodeId;
use quickwit_storage::StorageResolver;
use tracing::info;
use tracing::{error, info, warn};

use crate::compaction_pipeline::{CompactionPipeline, PipelineStatus, PipelineStatusUpdate};

Expand All @@ -47,7 +55,7 @@ pub struct CompactorSupervisor {

// Shared resources distributed to pipelines when spawning actor chains.
io_throughput_limiter: Option<Limiter>,
split_store: IndexingSplitStore,
split_cache: Arc<IndexingSplitCache>,
metastore: MetastoreServiceClient,
storage_resolver: StorageResolver,
max_concurrent_split_uploads: usize,
Expand All @@ -64,7 +72,7 @@ impl CompactorSupervisor {
planner_client: CompactionPlannerServiceClient,
num_pipeline_slots: usize,
io_throughput_limiter: Option<Limiter>,
split_store: IndexingSplitStore,
split_cache: Arc<IndexingSplitCache>,
metastore: MetastoreServiceClient,
storage_resolver: StorageResolver,
max_concurrent_split_uploads: usize,
Expand All @@ -77,7 +85,7 @@ impl CompactorSupervisor {
planner_client,
pipelines,
io_throughput_limiter,
split_store,
split_cache,
metastore,
storage_resolver,
max_concurrent_split_uploads,
Expand All @@ -97,6 +105,100 @@ impl CompactorSupervisor {
statuses
}

async fn process_new_tasks(
&mut self,
assignments: Vec<MergeTaskAssignment>,
spawn_ctx: &SpawnContext,
) {
for assignment in assignments {
let task_id = assignment.task_id.clone();
if let Err(err) = self.spawn_task(assignment, spawn_ctx).await {
error!(task_id=%task_id, error=%err, "failed to spawn compaction task");
}
}
}

async fn spawn_task(
&mut self,
assignment: MergeTaskAssignment,
spawn_ctx: &SpawnContext,
) -> anyhow::Result<()> {
let slot_idx = self
.pipelines
.iter()
.position(|slot| match slot {
None => true,
Some(p) => matches!(
p.status(),
PipelineStatus::Completed | PipelineStatus::Failed { .. }
),
})
.ok_or_else(|| anyhow::anyhow!("no free pipeline slot"))?;
let scratch_directory = self
.compaction_root_directory
.named_temp_child(&assignment.task_id)?;
let mut pipeline = self
.build_compaction_pipeline(assignment, scratch_directory)
.await?;
pipeline.spawn_pipeline(spawn_ctx)?;
self.pipelines[slot_idx] = Some(pipeline);
Ok(())
}

async fn build_compaction_pipeline(
&self,
assignment: MergeTaskAssignment,
scratch_directory: TempDirectory,
) -> anyhow::Result<CompactionPipeline> {
let splits: Vec<SplitMetadata> = assignment
.splits_metadata_json
.iter()
.map(|json| serde_json::from_str(json))
.collect::<Result<Vec<SplitMetadata>, serde_json::Error>>()?;
let doc_mapping: DocMapping = serde_json::from_str(&assignment.doc_mapping_json)?;
let search_settings: SearchSettings =
serde_json::from_str(&assignment.search_settings_json)?;
let indexing_settings: IndexingSettings =
serde_json::from_str(&assignment.indexing_settings_json)?;
let retention_policy: Option<RetentionPolicy> =
if assignment.retention_policy_json.is_empty() {
None
} else {
Some(serde_json::from_str(&assignment.retention_policy_json)?)
};
let index_uid = assignment
.index_uid
.ok_or_else(|| anyhow::anyhow!("missing index_uid in MergeTaskAssignment"))?;

let index_storage_uri = Uri::from_str(&assignment.index_storage_uri)?;
let index_storage = self.storage_resolver.resolve(&index_storage_uri).await?;
let split_store = IndexingSplitStore::new(index_storage, self.split_cache.clone());

let doc_mapper = build_doc_mapper(&doc_mapping, &search_settings)?;
let merge_policy = merge_policy_from_settings(&indexing_settings);
let merge_operation = MergeOperation::new_merge_operation(splits);
let pipeline_id = MergePipelineId {
node_id: self.node_id.clone(),
index_uid,
source_id: assignment.source_id,
};

Ok(CompactionPipeline::new(
assignment.task_id,
scratch_directory,
merge_operation,
pipeline_id,
doc_mapper,
merge_policy,
retention_policy,
self.metastore.clone(),
split_store,
self.io_throughput_limiter.clone(),
self.max_concurrent_split_uploads,
self.event_broker.clone(),
))
}

fn build_report_status_request(
&self,
statuses: &[PipelineStatusUpdate],
Expand Down Expand Up @@ -176,8 +278,16 @@ impl Handler<CheckPipelineStatuses> for CompactorSupervisor {
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
let statuses = self.check_pipeline_statuses();
let _request = self.build_report_status_request(&statuses);
// TODO: send request to planner via gRPC, clear completed/failed slots on success.
let request = self.build_report_status_request(&statuses);
match self.planner_client.report_status(request).await {
Ok(response) => {
self.process_new_tasks(response.new_tasks, ctx.spawn_ctx())
.await;
}
Err(err) => {
warn!(error=%err, "failed to report status to compaction planner");
}
}
ctx.schedule_self_msg(CHECK_PIPELINE_STATUSES_INTERVAL, CheckPipelineStatuses);
Ok(())
}
Expand All @@ -194,14 +304,12 @@ mod tests {
};
use quickwit_proto::metastore::{MetastoreServiceClient, MockMetastoreService};
use quickwit_proto::types::NodeId;
use quickwit_storage::{RamStorage, StorageResolver};
use quickwit_storage::StorageResolver;

use super::*;
use crate::compaction_pipeline::tests::test_pipeline;

fn test_supervisor(num_slots: usize) -> CompactorSupervisor {
let storage = Arc::new(RamStorage::default());
let split_store = IndexingSplitStore::create_without_local_store_for_test(storage);
let metastore = MetastoreServiceClient::from_mock(MockMetastoreService::new());
let compaction_client =
CompactionPlannerServiceClient::from_mock(MockCompactionPlannerService::new());
Expand All @@ -210,7 +318,7 @@ mod tests {
compaction_client,
num_slots,
None,
split_store,
Arc::new(IndexingSplitCache::no_caching()),
metastore,
StorageResolver::for_test(),
2,
Expand Down Expand Up @@ -282,6 +390,176 @@ mod tests {
assert!(request.failures.is_empty());
}

fn test_assignment(task_id: &str, split_ids: &[&str]) -> MergeTaskAssignment {
let index_metadata =
quickwit_metastore::IndexMetadata::for_test("test-index", "ram:///test-index");
let config = &index_metadata.index_config;
let splits: Vec<quickwit_metastore::SplitMetadata> = split_ids
.iter()
.map(|id| quickwit_metastore::SplitMetadata::for_test(id.to_string()))
.collect();
MergeTaskAssignment {
task_id: task_id.to_string(),
splits_metadata_json: splits
.iter()
.map(|s| serde_json::to_string(s).unwrap())
.collect(),
doc_mapping_json: serde_json::to_string(&config.doc_mapping).unwrap(),
search_settings_json: serde_json::to_string(&config.search_settings).unwrap(),
indexing_settings_json: serde_json::to_string(&config.indexing_settings).unwrap(),
retention_policy_json: String::new(),
index_uid: Some(index_metadata.index_uid.clone()),
source_id: "test-source".to_string(),
index_storage_uri: config.index_uri.to_string(),
}
}

#[tokio::test]
async fn test_build_compaction_pipeline_deserialization_errors() {
let supervisor = test_supervisor(4);
let scratch = TempDirectory::for_test;

// Bad splits JSON.
let mut assignment = test_assignment("t", &["s1"]);
assignment.splits_metadata_json = vec!["not json".to_string()];
assert!(
supervisor
.build_compaction_pipeline(assignment, scratch())
.await
.is_err()
);

// Bad doc mapping JSON.
let mut assignment = test_assignment("t", &["s1"]);
assignment.doc_mapping_json = "not json".to_string();
assert!(
supervisor
.build_compaction_pipeline(assignment, scratch())
.await
.is_err()
);

// Bad search settings JSON.
let mut assignment = test_assignment("t", &["s1"]);
assignment.search_settings_json = "not json".to_string();
assert!(
supervisor
.build_compaction_pipeline(assignment, scratch())
.await
.is_err()
);

// Bad indexing settings JSON.
let mut assignment = test_assignment("t", &["s1"]);
assignment.indexing_settings_json = "not json".to_string();
assert!(
supervisor
.build_compaction_pipeline(assignment, scratch())
.await
.is_err()
);

// Bad retention policy JSON (non-empty but invalid).
let mut assignment = test_assignment("t", &["s1"]);
assignment.retention_policy_json = "not json".to_string();
assert!(
supervisor
.build_compaction_pipeline(assignment, scratch())
.await
.is_err()
);

// Missing index_uid.
let mut assignment = test_assignment("t", &["s1"]);
assignment.index_uid = None;
assert!(
supervisor
.build_compaction_pipeline(assignment, scratch())
.await
.is_err()
);
}

#[tokio::test]
async fn test_spawn_task_fails_when_all_slots_occupied() {
let universe = Universe::new();
let mut supervisor = test_supervisor(2);

supervisor
.spawn_task(test_assignment("task-1", &["s1"]), universe.spawn_ctx())
.await
.unwrap();
supervisor
.spawn_task(test_assignment("task-2", &["s2"]), universe.spawn_ctx())
.await
.unwrap();

// Both slots InProgress — no room.
assert!(
supervisor
.spawn_task(test_assignment("task-3", &["s3"]), universe.spawn_ctx())
.await
.is_err()
);

universe.assert_quit().await;
}

#[tokio::test]
async fn test_end_to_end_report_status_and_spawn() {
let universe = Universe::new();

// Mock planner that returns one assignment on the first call.
let assignment = test_assignment("planner-task-1", &["s1", "s2"]);
let assignments = vec![assignment];
let assignments_clone = assignments.clone();
let mut mock = quickwit_proto::compaction::MockCompactionPlannerService::new();
mock.expect_report_status().times(1).returning(move |_req| {
Ok(quickwit_proto::compaction::ReportStatusResponse {
new_tasks: assignments_clone.clone(),
})
});

let metastore = MetastoreServiceClient::from_mock(MockMetastoreService::new());
let client = CompactionPlannerServiceClient::from_mock(mock);
let mut supervisor = CompactorSupervisor::new(
NodeId::from("test-node"),
client,
3,
None,
Arc::new(IndexingSplitCache::no_caching()),
metastore,
StorageResolver::for_test(),
2,
EventBroker::default(),
TempDirectory::for_test(),
);

// Simulate what the handler does: collect statuses, report, process response.
let statuses = supervisor.check_pipeline_statuses();
let request = supervisor.build_report_status_request(&statuses);
assert_eq!(request.available_slots, 3);

let response = supervisor
.planner_client
.report_status(request)
.await
.unwrap();
supervisor
.process_new_tasks(response.new_tasks, universe.spawn_ctx())
.await;

// Verify the pipeline was spawned.
let statuses = supervisor.check_pipeline_statuses();
let request = supervisor.build_report_status_request(&statuses);
assert_eq!(request.in_progress.len(), 1);
assert_eq!(request.in_progress[0].task_id, "planner-task-1");
assert_eq!(request.in_progress[0].split_ids.len(), 2);
assert_eq!(request.available_slots, 2);

universe.assert_quit().await;
}

#[test]
fn test_build_report_status_request_mixed_statuses() {
let supervisor = test_supervisor(4);
Expand Down
Loading
Loading