diff --git a/README.md b/README.md index 2d6e5e6..320c069 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ Use at your own risk. - **Await events** - Pause until external events arrive (with optional timeouts) - **Retry on failure** - Configurable retry strategies with exponential backoff - **Scale horizontally** - Multiple workers can process tasks concurrently +- **Cron scheduling** - Run tasks on recurring schedules via pg_cron integration Unlike exception-based durable execution systems (Python, TypeScript), this SDK uses Rust's `Result` type for suspension control flow, making it idiomatic and type-safe. @@ -260,6 +261,101 @@ client.emit_event( ).await?; ``` +### Cron Scheduling + +Schedule tasks to run on a recurring basis using [pg_cron](https://github.com/citusdata/pg_cron). Durable manages the pg_cron jobs and maintains a registry for discovery and filtering. + +**Setup** - Enable pg_cron once at startup: + +```rust +use durable::setup_pgcron; + +// Enable the pg_cron extension (requires superuser or appropriate privileges) +setup_pgcron(client.pool()).await?; +``` + +**Create a schedule:** + +```rust +use durable::ScheduleOptions; + +let options = ScheduleOptions { + task_name: "process-payments".to_string(), + cron_expression: "*/5 * * * *".to_string(), // Every 5 minutes + params: json!({"batch_size": 100}), + spawn_options: SpawnOptions::default(), + metadata: Some(HashMap::from([ + ("team".to_string(), json!("payments")), + ("env".to_string(), json!("production")), + ])), +}; + +// Creates or updates the schedule (upsert semantics) +client.create_schedule("payment-schedule", options).await?; +``` + +**List and filter schedules:** + +```rust +use durable::ScheduleFilter; + +// List all schedules on this queue +let all = client.list_schedules(None).await?; + +// Filter by task name +let filter = ScheduleFilter { + task_name: Some("process-payments".to_string()), + ..Default::default() +}; +let filtered = client.list_schedules(Some(filter)).await?; + +// Filter by metadata (JSONB containment) +let filter = ScheduleFilter { + metadata: Some(HashMap::from([("team".to_string(), json!("payments"))])), + ..Default::default() +}; +let filtered = client.list_schedules(Some(filter)).await?; +``` + +**Delete a schedule:** + +```rust +// Removes the schedule and its pg_cron job. In-flight tasks are not cancelled. +client.delete_schedule("payment-schedule").await?; +``` + +**Key behaviors:** + +- **pg_cron integration** - Schedules are backed by PostgreSQL's pg_cron extension. At each tick, pg_cron inserts a task into the queue via `durable.spawn_task()`, and workers pick it up normally. +- **Upsert semantics** - Calling `create_schedule` with an existing name updates the schedule in place. +- **Origin tracking** - Scheduled tasks automatically receive `durable::scheduled_by` and `durable::cron` headers, so tasks can identify how they were spawned. +- **Metadata filtering** - Attach arbitrary JSON metadata to schedules and filter with JSONB containment queries. +- **Queue cleanup** - Dropping a queue automatically unschedules all its cron jobs. + +### Polling Task Results + +You can poll for the result of a spawned task without running a worker in the same process: + +```rust +use durable::TaskStatus; + +let spawned = client.spawn::(params).await?; + +// Poll for the result +let result = client.get_task_result(spawned.task_id).await?; + +if let Some(poll) = result { + match poll.status { + TaskStatus::Completed => println!("Output: {:?}", poll.output), + TaskStatus::Failed => println!("Error: {:?}", poll.error), + TaskStatus::Pending | TaskStatus::Running | TaskStatus::Sleeping => { + println!("Still in progress...") + } + TaskStatus::Cancelled => println!("Task was cancelled"), + } +} +``` + ### Transactional Spawning You can atomically enqueue a task as part of a larger database transaction. This ensures that either both your write and the task spawn succeed, or neither does: @@ -318,11 +414,22 @@ This is useful when you need to guarantee that a task is only enqueued if relate | [`RetryStrategy`] | Retry behavior: `None`, `Fixed`, or `Exponential` | | [`CancellationPolicy`] | Auto-cancel tasks based on delay or duration | +### Cron Scheduling + +| Type | Description | +|------|-------------| +| [`ScheduleOptions`] | Options for creating a cron schedule (task, expression, params, metadata) | +| [`ScheduleInfo`] | Information about an existing schedule | +| [`ScheduleFilter`] | Filter for listing schedules (by task name or metadata) | +| [`setup_pgcron()`] | Initialize the pg_cron extension | + ### Results | Type | Description | |------|-------------| | [`SpawnResult`] | Returned when spawning a task (task_id, run_id, attempt) | +| [`TaskPollResult`] | Result of polling a task (status, output, error) | +| [`TaskStatus`] | Task state: `Pending`, `Running`, `Sleeping`, `Completed`, `Failed`, `Cancelled` | | [`ControlFlow`] | Signals for suspension and cancellation | ## Environment Variables diff --git a/sql/schema.sql b/sql/schema.sql index 33917f7..fc678bc 100644 --- a/sql/schema.sql +++ b/sql/schema.sql @@ -58,6 +58,26 @@ create table if not exists durable.queues ( created_at timestamptz not null default durable.current_time() ); +create table if not exists durable.cron_schedules ( + schedule_name text not null, + queue_name text not null, + task_name text not null, + cron_expression text not null, + params jsonb not null default '{}'::jsonb, + spawn_options jsonb not null default '{}'::jsonb, + metadata jsonb not null default '{}'::jsonb, + pgcron_job_name text not null, + created_at timestamptz not null default durable.current_time(), + updated_at timestamptz not null default durable.current_time(), + primary key (queue_name, schedule_name) +); + +create index if not exists idx_cron_schedules_metadata + on durable.cron_schedules using gin (metadata); + +create index if not exists idx_cron_schedules_task_name + on durable.cron_schedules (queue_name, task_name); + create function durable.ensure_queue_tables (p_queue_name text) returns void language plpgsql @@ -252,12 +272,15 @@ end; $$; -- Drop a queue if it exists. +-- Also cleans up any cron schedules and their pg_cron jobs for the queue. create function durable.drop_queue (p_queue_name text) returns void language plpgsql as $$ declare v_existing_queue text; + v_rec record; + v_jobid bigint; begin select queue_name into v_existing_queue from durable.queues @@ -267,6 +290,28 @@ begin return; end if; + -- Clean up any cron schedules associated with this queue + for v_rec in + select pgcron_job_name + from durable.cron_schedules + where queue_name = p_queue_name + loop + begin + select jobid into v_jobid + from cron.job + where jobname = v_rec.pgcron_job_name; + + if v_jobid is not null then + perform cron.unschedule(v_jobid); + end if; + exception when others then + -- pg_cron may not be installed; ignore errors + null; + end; + end loop; + + delete from durable.cron_schedules where queue_name = p_queue_name; + execute format('drop table if exists durable.%I cascade', 'w_' || p_queue_name); execute format('drop table if exists durable.%I cascade', 'e_' || p_queue_name); execute format('drop table if exists durable.%I cascade', 'c_' || p_queue_name); diff --git a/src/client.rs b/src/client.rs index ad8d8a2..0fae58d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -55,7 +55,7 @@ impl CancellationPolicyDb { use crate::worker::Worker; /// Validates that user-provided headers don't use reserved prefixes. -fn validate_headers(headers: &Option>) -> DurableResult<()> { +pub(crate) fn validate_headers(headers: &Option>) -> DurableResult<()> { if let Some(headers) = headers { for key in headers.keys() { if key.starts_with("durable::") { @@ -334,6 +334,10 @@ where &self.state } + pub(crate) fn spawn_defaults(&self) -> &SpawnDefaults { + &self.spawn_defaults + } + /// Register a task type. Required before spawning or processing. /// /// Returns an error if a task with the same name is already registered. @@ -578,7 +582,7 @@ where }) } - fn serialize_spawn_options( + pub(crate) fn serialize_spawn_options( options: &SpawnOptions, max_attempts: u32, ) -> serde_json::Result { diff --git a/src/cron.rs b/src/cron.rs new file mode 100644 index 0000000..aeff843 --- /dev/null +++ b/src/cron.rs @@ -0,0 +1,784 @@ +use chrono::{DateTime, Utc}; +use serde_json::Value as JsonValue; +use sqlx::{PgPool, Postgres, QueryBuilder}; +use std::collections::HashMap; + +use crate::client::{Durable, validate_headers}; +use crate::error::{DurableError, DurableResult}; +use crate::types::SpawnOptions; + +/// Options for creating a cron schedule. +#[derive(Debug, Clone)] +pub struct ScheduleOptions { + /// The task name to spawn on each cron tick. + pub task_name: String, + /// Standard 5-field cron expression (minute hour day-of-month month day-of-week). + pub cron_expression: String, + /// Parameters to pass to the spawned task (serialized as JSON). + pub params: JsonValue, + /// Spawn options (max_attempts, retry_strategy, cancellation, headers). + pub spawn_options: SpawnOptions, + /// Arbitrary user-defined metadata for categorization/filtering. + pub metadata: Option>, +} + +/// Information about an existing cron schedule. +#[derive(Debug, Clone)] +pub struct ScheduleInfo { + /// The schedule name (unique within the queue). + pub name: String, + /// The cron expression. + pub cron_expression: String, + /// The task name that gets spawned. + pub task_name: String, + /// The parameters passed to the spawned task. + pub params: JsonValue, + /// The serialized spawn options. + pub spawn_options: JsonValue, + /// User-defined metadata. + pub metadata: HashMap, + /// The pg_cron job name. + pub pgcron_job_name: String, + /// When the schedule was created. + pub created_at: DateTime, + /// When the schedule was last updated. + pub updated_at: DateTime, +} + +/// Filter for listing schedules. +#[derive(Debug, Clone, Default)] +pub struct ScheduleFilter { + /// Filter by task name (exact match). + pub task_name: Option, + /// Filter by metadata (JSONB `@>` containment). + /// e.g. `{"team": "payments"}` matches schedules whose metadata contains that key-value. + pub metadata: Option>, +} + +/// Set up the pg_cron extension in the database. +/// +/// Attempts to create the extension and verifies it is available. +/// Call this once during application startup before using cron scheduling. +/// +/// # Errors +/// +/// Returns [`DurableError::PgCronUnavailable`] if the extension cannot be created +/// or is not available. +pub async fn setup_pgcron(pool: &PgPool) -> DurableResult<()> { + // Attempt to create the extension, ignoring errors (user may not have privileges) + let _ = sqlx::query( + "DO $$ BEGIN CREATE EXTENSION IF NOT EXISTS pg_cron; EXCEPTION WHEN OTHERS THEN NULL; END $$" + ) + .execute(pool) + .await; + + // Verify it exists + let exists: (bool,) = + sqlx::query_as("SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'pg_cron')") + .fetch_one(pool) + .await?; + + if !exists.0 { + return Err(DurableError::PgCronUnavailable { + reason: "pg_cron extension is not installed and could not be created".to_string(), + }); + } + + Ok(()) +} + +impl Durable +where + State: Clone + Send + Sync + 'static, +{ + /// Create or update a cron schedule. + /// + /// If a schedule with the same name already exists in this queue, it will be updated + /// (upsert semantics). The pg_cron job is created/updated and the schedule metadata + /// is stored in `durable.cron_schedules`. + /// + /// # Arguments + /// + /// * `schedule_name` - Unique name for this schedule within the queue + /// (alphanumeric, hyphens, and underscores only). + /// * `options` - Schedule configuration including task name, cron expression, + /// params, spawn options, and metadata. + /// + /// # Errors + /// + /// Returns an error if: + /// - The schedule name is invalid + /// - The cron expression is invalid + /// - Headers use the reserved `durable::` prefix + /// - pg_cron is not available + pub async fn create_schedule( + &self, + schedule_name: &str, + options: ScheduleOptions, + ) -> DurableResult<()> { + // Validate inputs + validate_schedule_name(schedule_name)?; + validate_cron_expression(&options.cron_expression)?; + validate_headers(&options.spawn_options.headers)?; + + let pgcron_job_name = format!("durable::{}::{}", self.queue_name(), schedule_name); + + // Build spawn options with injected durable:: headers + let mut spawn_options = options.spawn_options.clone(); + let headers = spawn_options.headers.get_or_insert_with(HashMap::new); + headers.insert( + "durable::scheduled_by".to_string(), + JsonValue::String(schedule_name.to_string()), + ); + headers.insert( + "durable::cron".to_string(), + JsonValue::String(options.cron_expression.clone()), + ); + + let max_attempts = spawn_options + .max_attempts + .unwrap_or(self.spawn_defaults().max_attempts); + let spawn_options = SpawnOptions { + retry_strategy: spawn_options + .retry_strategy + .or_else(|| self.spawn_defaults().retry_strategy.clone()), + cancellation: spawn_options + .cancellation + .or_else(|| self.spawn_defaults().cancellation.clone()), + ..spawn_options + }; + let db_options = Self::serialize_spawn_options(&spawn_options, max_attempts) + .map_err(DurableError::Serialization)?; + + // Build the SQL command that pg_cron will execute + let spawn_sql = build_pgcron_spawn_sql( + self.queue_name(), + &options.task_name, + &options.params, + &db_options, + )?; + + let metadata_value = match &options.metadata { + Some(m) => serde_json::to_value(m).map_err(DurableError::Serialization)?, + None => serde_json::json!({}), + }; + + // Check pg_cron availability + let exists: (bool,) = + sqlx::query_as("SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'pg_cron')") + .fetch_one(self.pool()) + .await?; + + if !exists.0 { + return Err(DurableError::PgCronUnavailable { + reason: "pg_cron extension is not installed".to_string(), + }); + } + + // Execute in a transaction + let mut tx = self.pool().begin().await?; + + // Schedule the pg_cron job (has built-in upsert semantics) + sqlx::query("SELECT cron.schedule($1, $2, $3)") + .bind(&pgcron_job_name) + .bind(&options.cron_expression) + .bind(&spawn_sql) + .execute(&mut *tx) + .await + .map_err(|e| map_pgcron_error(e, "create"))?; + + // Upsert into our schedule registry + sqlx::query( + "INSERT INTO durable.cron_schedules + (schedule_name, queue_name, task_name, cron_expression, params, spawn_options, metadata, pgcron_job_name, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, durable.current_time(), durable.current_time()) + ON CONFLICT (queue_name, schedule_name) + DO UPDATE SET + task_name = EXCLUDED.task_name, + cron_expression = EXCLUDED.cron_expression, + params = EXCLUDED.params, + spawn_options = EXCLUDED.spawn_options, + metadata = EXCLUDED.metadata, + pgcron_job_name = EXCLUDED.pgcron_job_name, + updated_at = durable.current_time()" + ) + .bind(schedule_name) + .bind(self.queue_name()) + .bind(&options.task_name) + .bind(&options.cron_expression) + .bind(&options.params) + .bind(&db_options) + .bind(&metadata_value) + .bind(&pgcron_job_name) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(()) + } + + /// Delete a cron schedule. + /// + /// Removes the pg_cron job and the schedule registry entry. Any in-flight + /// tasks that were already spawned by this schedule are not cancelled. + /// + /// # Errors + /// + /// Returns [`DurableError::ScheduleNotFound`] if the schedule does not exist. + pub async fn delete_schedule(&self, schedule_name: &str) -> DurableResult<()> { + // Look up the pgcron_job_name + let row: Option<(String,)> = sqlx::query_as( + "SELECT pgcron_job_name FROM durable.cron_schedules + WHERE queue_name = $1 AND schedule_name = $2", + ) + .bind(self.queue_name()) + .bind(schedule_name) + .fetch_optional(self.pool()) + .await?; + + let (pgcron_job_name,) = row.ok_or_else(|| DurableError::ScheduleNotFound { + schedule_name: schedule_name.to_string(), + queue_name: self.queue_name().to_string(), + })?; + + let mut tx = self.pool().begin().await?; + + // Look up the jobid from cron.job and unschedule it + let job_row: Option<(i64,)> = + sqlx::query_as("SELECT jobid FROM cron.job WHERE jobname = $1") + .bind(&pgcron_job_name) + .fetch_optional(&mut *tx) + .await + .map_err(|e| map_pgcron_error(e, "delete"))?; + + if let Some((jobid,)) = job_row { + sqlx::query("SELECT cron.unschedule($1)") + .bind(jobid) + .execute(&mut *tx) + .await + .map_err(|e| map_pgcron_error(e, "delete"))?; + } + + // Delete from our registry + sqlx::query( + "DELETE FROM durable.cron_schedules + WHERE queue_name = $1 AND schedule_name = $2", + ) + .bind(self.queue_name()) + .bind(schedule_name) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(()) + } + + /// List cron schedules, optionally filtered. + /// + /// Returns all schedules for this queue. If a filter is provided, results + /// are narrowed by task name and/or metadata containment. + /// + /// This only queries the `durable.cron_schedules` table (no pg_cron queries), + /// so it works even if pg_cron is not installed. + pub async fn list_schedules( + &self, + filter: Option, + ) -> DurableResult> { + let filter = filter.unwrap_or_default(); + + let mut qb: QueryBuilder = QueryBuilder::new( + "SELECT schedule_name, cron_expression, task_name, params, spawn_options, metadata, pgcron_job_name, created_at, updated_at + FROM durable.cron_schedules + WHERE queue_name = ", + ); + qb.push_bind(self.queue_name()); + + if let Some(ref task_name) = filter.task_name { + qb.push(" AND task_name = ").push_bind(task_name.clone()); + } + + if let Some(ref metadata) = filter.metadata { + let metadata_json = + serde_json::to_value(metadata).map_err(DurableError::Serialization)?; + qb.push(" AND metadata @> ") + .push_bind(metadata_json) + .push("::jsonb"); + } + + qb.push(" ORDER BY schedule_name"); + + let rows: Vec = qb + .build_query_as::() + .fetch_all(self.pool()) + .await?; + + rows.into_iter() + .map(|row| { + let metadata: HashMap = + serde_json::from_value(row.metadata).unwrap_or_default(); + Ok(ScheduleInfo { + name: row.schedule_name, + cron_expression: row.cron_expression, + task_name: row.task_name, + params: row.params, + spawn_options: row.spawn_options, + metadata, + pgcron_job_name: row.pgcron_job_name, + created_at: row.created_at, + updated_at: row.updated_at, + }) + }) + .collect() + } +} + +// --- Internal types --- + +#[derive(Debug, sqlx::FromRow)] +struct ScheduleRow { + schedule_name: String, + cron_expression: String, + task_name: String, + params: JsonValue, + spawn_options: JsonValue, + metadata: JsonValue, + pgcron_job_name: String, + created_at: DateTime, + updated_at: DateTime, +} + +// --- Validation helpers --- + +/// Validate a 5-field standard cron expression. +fn validate_cron_expression(expr: &str) -> DurableResult<()> { + let fields: Vec<&str> = expr.split_whitespace().collect(); + if fields.len() != 5 { + return Err(DurableError::InvalidCronExpression { + expression: expr.to_string(), + reason: format!("expected 5 fields, got {}", fields.len()), + }); + } + + let field_names = ["minute", "hour", "day-of-month", "month", "day-of-week"]; + let field_ranges: [(u32, u32); 5] = [(0, 59), (0, 23), (1, 31), (1, 12), (0, 7)]; + + for (i, field) in fields.iter().enumerate() { + validate_cron_field( + expr, + field, + field_names[i], + field_ranges[i].0, + field_ranges[i].1, + )?; + } + + Ok(()) +} + +fn validate_cron_field( + expr: &str, + field: &str, + name: &str, + min: u32, + max: u32, +) -> DurableResult<()> { + if field.is_empty() { + return Err(DurableError::InvalidCronExpression { + expression: expr.to_string(), + reason: format!("{name} field is empty"), + }); + } + + // Split by comma for lists (e.g., "1,15,30") + for part in field.split(',') { + validate_cron_part(expr, part, name, min, max)?; + } + + Ok(()) +} + +fn validate_cron_part(expr: &str, part: &str, name: &str, min: u32, max: u32) -> DurableResult<()> { + // Handle step values (e.g., "*/5" or "1-30/5") + let (range_part, _step) = if let Some((range, step_str)) = part.split_once('/') { + let step: u32 = step_str + .parse() + .map_err(|_| DurableError::InvalidCronExpression { + expression: expr.to_string(), + reason: format!("invalid step value `{step_str}` in {name} field"), + })?; + if step == 0 { + return Err(DurableError::InvalidCronExpression { + expression: expr.to_string(), + reason: format!("step value cannot be 0 in {name} field"), + }); + } + (range, Some(step)) + } else { + (part, None) + }; + + // Handle wildcard + if range_part == "*" { + return Ok(()); + } + + // Handle range (e.g., "1-30") + if let Some((start_str, end_str)) = range_part.split_once('-') { + let start: u32 = start_str + .parse() + .map_err(|_| DurableError::InvalidCronExpression { + expression: expr.to_string(), + reason: format!("invalid range start `{start_str}` in {name} field"), + })?; + let end: u32 = end_str + .parse() + .map_err(|_| DurableError::InvalidCronExpression { + expression: expr.to_string(), + reason: format!("invalid range end `{end_str}` in {name} field"), + })?; + + if start < min || start > max { + return Err(DurableError::InvalidCronExpression { + expression: expr.to_string(), + reason: format!("{name} range start {start} out of range {min}-{max}"), + }); + } + if end < min || end > max { + return Err(DurableError::InvalidCronExpression { + expression: expr.to_string(), + reason: format!("{name} range end {end} out of range {min}-{max}"), + }); + } + if start > end { + return Err(DurableError::InvalidCronExpression { + expression: expr.to_string(), + reason: format!("{name} range start {start} is greater than end {end}"), + }); + } + + return Ok(()); + } + + // Handle single value + let value: u32 = range_part + .parse() + .map_err(|_| DurableError::InvalidCronExpression { + expression: expr.to_string(), + reason: format!("invalid value `{range_part}` in {name} field"), + })?; + + if value < min || value > max { + return Err(DurableError::InvalidCronExpression { + expression: expr.to_string(), + reason: format!("{name} value {value} out of range {min}-{max}"), + }); + } + + Ok(()) +} + +/// Validate a schedule name (alphanumeric, hyphens, underscores only; non-empty). +fn validate_schedule_name(name: &str) -> DurableResult<()> { + if name.is_empty() { + return Err(DurableError::InvalidScheduleName { + name: String::new(), + reason: "schedule name cannot be empty".to_string(), + }); + } + + if !name + .chars() + .all(|c| c.is_alphanumeric() || c == '-' || c == '_') + { + return Err(DurableError::InvalidScheduleName { + name: name.to_string(), + reason: + "contains invalid characters (only alphanumeric, hyphens, and underscores allowed)" + .to_string(), + }); + } + + Ok(()) +} + +// --- SQL escaping --- + +/// Dollar-quote a string using `$durable$` as the delimiter. +/// Returns an error if the content contains `$durable`, which would break the delimiter. +fn pg_literal(s: &str) -> Result { + if s.contains("$durable") { + return Err(DurableError::InvalidConfiguration { + reason: format!( + "string contains reserved delimiter sequence '$durable': {s}" + ), + }); + } + Ok(format!("$durable${s}$durable$")) +} + +/// Build the SQL command that pg_cron will execute to spawn a task. +fn build_pgcron_spawn_sql( + queue_name: &str, + task_name: &str, + params: &JsonValue, + spawn_options: &JsonValue, +) -> Result { + let params_str = params.to_string(); + let options_str = spawn_options.to_string(); + + Ok(format!( + "SELECT durable.spawn_task({}, {}, {}::jsonb, {}::jsonb)", + pg_literal(queue_name)?, + pg_literal(task_name)?, + pg_literal(¶ms_str)?, + pg_literal(&options_str)?, + )) +} + +/// Map pgcron-related SQL errors to more descriptive error messages. +fn map_pgcron_error(err: sqlx::Error, operation: &str) -> DurableError { + let err_str = err.to_string(); + if err_str.contains("cron") || err_str.contains("pg_cron") { + DurableError::PgCronUnavailable { + reason: format!("pg_cron error during {operation}: {err_str}"), + } + } else { + DurableError::Database(err) + } +} + +#[cfg(test)] +#[expect(clippy::unwrap_used, clippy::panic)] +mod tests { + use super::*; + + // --- Cron expression validation tests --- + + #[test] + fn test_valid_cron_every_minute() { + assert!(validate_cron_expression("* * * * *").is_ok()); + } + + #[test] + fn test_valid_cron_every_5_minutes() { + assert!(validate_cron_expression("*/5 * * * *").is_ok()); + } + + #[test] + fn test_valid_cron_range_with_step() { + assert!(validate_cron_expression("0-30/5 * * * *").is_ok()); + } + + #[test] + fn test_valid_cron_list() { + assert!(validate_cron_expression("1,15,30 * * * *").is_ok()); + } + + #[test] + fn test_valid_cron_specific_time() { + assert!(validate_cron_expression("0 9 * * 1").is_ok()); + } + + #[test] + fn test_valid_cron_complex() { + assert!(validate_cron_expression("30 2 1,15 * 0-5").is_ok()); + } + + #[test] + fn test_valid_cron_weekday_7() { + // 7 is valid for day-of-week (Sunday in some implementations) + assert!(validate_cron_expression("0 0 * * 7").is_ok()); + } + + #[test] + fn test_invalid_cron_too_few_fields() { + let result = validate_cron_expression("* * *"); + assert!(result.is_err()); + let err = result.unwrap_err(); + match err { + DurableError::InvalidCronExpression { reason, .. } => { + assert!(reason.contains("expected 5 fields")); + } + _ => panic!("unexpected error type"), + } + } + + #[test] + fn test_invalid_cron_too_many_fields() { + let result = validate_cron_expression("* * * * * *"); + assert!(result.is_err()); + let err = result.unwrap_err(); + match err { + DurableError::InvalidCronExpression { reason, .. } => { + assert!(reason.contains("expected 5 fields")); + } + _ => panic!("unexpected error type"), + } + } + + #[test] + fn test_invalid_cron_6_field_seconds() { + let result = validate_cron_expression("0 */5 * * * *"); + assert!(result.is_err()); + } + + #[test] + fn test_invalid_cron_out_of_range_minute() { + let result = validate_cron_expression("60 * * * *"); + assert!(result.is_err()); + let err = result.unwrap_err(); + match err { + DurableError::InvalidCronExpression { reason, .. } => { + assert!(reason.contains("out of range")); + } + _ => panic!("unexpected error type"), + } + } + + #[test] + fn test_invalid_cron_out_of_range_hour() { + let result = validate_cron_expression("0 24 * * *"); + assert!(result.is_err()); + } + + #[test] + fn test_invalid_cron_out_of_range_day() { + let result = validate_cron_expression("0 0 32 * *"); + assert!(result.is_err()); + } + + #[test] + fn test_invalid_cron_out_of_range_month() { + let result = validate_cron_expression("0 0 * 13 *"); + assert!(result.is_err()); + } + + #[test] + fn test_invalid_cron_out_of_range_weekday() { + let result = validate_cron_expression("0 0 * * 8"); + assert!(result.is_err()); + } + + #[test] + fn test_invalid_cron_empty() { + let result = validate_cron_expression(""); + assert!(result.is_err()); + } + + #[test] + fn test_invalid_cron_letters() { + let result = validate_cron_expression("abc * * * *"); + assert!(result.is_err()); + } + + #[test] + fn test_invalid_cron_zero_step() { + let result = validate_cron_expression("*/0 * * * *"); + assert!(result.is_err()); + let err = result.unwrap_err(); + match err { + DurableError::InvalidCronExpression { reason, .. } => { + assert!(reason.contains("step value cannot be 0")); + } + _ => panic!("unexpected error type"), + } + } + + #[test] + fn test_invalid_cron_reversed_range() { + let result = validate_cron_expression("30-10 * * * *"); + assert!(result.is_err()); + let err = result.unwrap_err(); + match err { + DurableError::InvalidCronExpression { reason, .. } => { + assert!(reason.contains("greater than end")); + } + _ => panic!("unexpected error type"), + } + } + + // --- pg_literal tests --- + + #[test] + fn test_pg_literal_simple() { + assert_eq!(pg_literal("hello").unwrap(), "$durable$hello$durable$"); + } + + #[test] + fn test_pg_literal_with_single_quotes() { + assert_eq!( + pg_literal("it's a test").unwrap(), + "$durable$it's a test$durable$" + ); + } + + #[test] + fn test_pg_literal_with_json() { + let json = r#"{"key": "value"}"#; + assert_eq!(pg_literal(json).unwrap(), format!("$durable${json}$durable$")); + } + + #[test] + fn test_pg_literal_rejects_delimiter() { + assert!(pg_literal("contains $durable$ in it").is_err()); + } + + #[test] + fn test_pg_literal_rejects_partial_delimiter() { + assert!(pg_literal("test$durable").is_err()); + assert!(pg_literal("$durablefoo").is_err()); + assert!(pg_literal("mid$durablemid").is_err()); + } + + // --- Schedule name validation tests --- + + #[test] + fn test_valid_schedule_names() { + assert!(validate_schedule_name("my-schedule").is_ok()); + assert!(validate_schedule_name("task_1").is_ok()); + assert!(validate_schedule_name("DailyReport").is_ok()); + assert!(validate_schedule_name("a").is_ok()); + assert!(validate_schedule_name("test-123_abc").is_ok()); + } + + #[test] + fn test_invalid_schedule_name_empty() { + let err = validate_schedule_name("").unwrap_err(); + assert!(matches!(err, DurableError::InvalidScheduleName { .. })); + } + + #[test] + fn test_invalid_schedule_name_spaces() { + let err = validate_schedule_name("my schedule").unwrap_err(); + assert!(matches!(err, DurableError::InvalidScheduleName { .. })); + } + + #[test] + fn test_invalid_schedule_name_semicolons() { + let err = validate_schedule_name("drop;table").unwrap_err(); + assert!(matches!(err, DurableError::InvalidScheduleName { .. })); + } + + #[test] + fn test_invalid_schedule_name_special_chars() { + let err = validate_schedule_name("name@here").unwrap_err(); + assert!(matches!(err, DurableError::InvalidScheduleName { .. })); + let err = validate_schedule_name("name.here").unwrap_err(); + assert!(matches!(err, DurableError::InvalidScheduleName { .. })); + let err = validate_schedule_name("name/here").unwrap_err(); + assert!(matches!(err, DurableError::InvalidScheduleName { .. })); + } + + // --- build_pgcron_spawn_sql tests --- + + #[test] + fn test_build_pgcron_spawn_sql() { + let params = serde_json::json!({"key": "value"}); + let options = serde_json::json!({"max_attempts": 3}); + let sql = build_pgcron_spawn_sql("my_queue", "my_task", ¶ms, &options).unwrap(); + assert!(sql.contains("durable.spawn_task")); + assert!(sql.contains("my_queue")); + assert!(sql.contains("my_task")); + assert!(sql.contains("::jsonb")); + } +} diff --git a/src/error.rs b/src/error.rs index 7c4a20d..06392c2 100644 --- a/src/error.rs +++ b/src/error.rs @@ -437,6 +437,40 @@ pub enum DurableError { /// The unrecognized state string. state: String, }, + + /// pg_cron extension is not available in the database. + #[error("pg_cron is not available: {reason}")] + PgCronUnavailable { + /// Why pg_cron is not available. + reason: String, + }, + + /// Cron expression failed validation. + #[error("invalid cron expression `{expression}`: {reason}")] + InvalidCronExpression { + /// The invalid cron expression. + expression: String, + /// Why the expression is invalid. + reason: String, + }, + + /// Schedule name failed validation. + #[error("invalid schedule name `{name}`: {reason}")] + InvalidScheduleName { + /// The invalid schedule name. + name: String, + /// Why the name is invalid. + reason: String, + }, + + /// Schedule not found. + #[error("schedule `{schedule_name}` not found in queue `{queue_name}`")] + ScheduleNotFound { + /// The schedule name that was not found. + schedule_name: String, + /// The queue name that was searched. + queue_name: String, + }, } /// Result type alias for Client API operations. diff --git a/src/lib.rs b/src/lib.rs index eef201c..fed9b2e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -96,6 +96,7 @@ mod client; mod context; +mod cron; mod error; mod task; #[cfg(feature = "telemetry")] @@ -106,6 +107,7 @@ mod worker; // Re-export public API pub use client::{Durable, DurableBuilder}; pub use context::TaskContext; +pub use cron::{ScheduleFilter, ScheduleInfo, ScheduleOptions, setup_pgcron}; pub use error::{ControlFlow, DurableError, DurableResult, TaskError, TaskResult}; pub use task::{ErasedTask, Task, TaskWrapper}; pub use types::{ diff --git a/src/postgres/migrations/20260228000000_add_cron_schedules.sql b/src/postgres/migrations/20260228000000_add_cron_schedules.sql new file mode 100644 index 0000000..223b0fd --- /dev/null +++ b/src/postgres/migrations/20260228000000_add_cron_schedules.sql @@ -0,0 +1,73 @@ +-- Cron schedule registry table. +-- Stores metadata for schedules managed by pg_cron via the Durable client API. +-- This table always exists (even without pg_cron installed), so list_schedules() works regardless. + +CREATE TABLE IF NOT EXISTS durable.cron_schedules ( + schedule_name TEXT NOT NULL, + queue_name TEXT NOT NULL, + task_name TEXT NOT NULL, + cron_expression TEXT NOT NULL, + params JSONB NOT NULL DEFAULT '{}'::jsonb, + spawn_options JSONB NOT NULL DEFAULT '{}'::jsonb, + metadata JSONB NOT NULL DEFAULT '{}'::jsonb, + pgcron_job_name TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT durable.current_time(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT durable.current_time(), + PRIMARY KEY (queue_name, schedule_name) +); + +CREATE INDEX IF NOT EXISTS idx_cron_schedules_metadata + ON durable.cron_schedules USING gin (metadata); + +CREATE INDEX IF NOT EXISTS idx_cron_schedules_task_name + ON durable.cron_schedules (queue_name, task_name); + +-- Override drop_queue to clean up cron schedules and their pg_cron jobs. +CREATE OR REPLACE FUNCTION durable.drop_queue (p_queue_name text) + returns void + language plpgsql +as $$ +declare + v_existing_queue text; + v_rec record; + v_jobid bigint; +begin + select queue_name into v_existing_queue + from durable.queues + where queue_name = p_queue_name; + + if v_existing_queue is null then + return; + end if; + + -- Clean up any cron schedules associated with this queue + for v_rec in + select pgcron_job_name + from durable.cron_schedules + where queue_name = p_queue_name + loop + begin + select jobid into v_jobid + from cron.job + where jobname = v_rec.pgcron_job_name; + + if v_jobid is not null then + perform cron.unschedule(v_jobid); + end if; + exception when others then + -- pg_cron may not be installed; ignore errors + null; + end; + end loop; + + delete from durable.cron_schedules where queue_name = p_queue_name; + + execute format('drop table if exists durable.%I cascade', 'w_' || p_queue_name); + execute format('drop table if exists durable.%I cascade', 'e_' || p_queue_name); + execute format('drop table if exists durable.%I cascade', 'c_' || p_queue_name); + execute format('drop table if exists durable.%I cascade', 'r_' || p_queue_name); + execute format('drop table if exists durable.%I cascade', 't_' || p_queue_name); + + delete from durable.queues where queue_name = p_queue_name; +end; +$$; diff --git a/tests/cron_test.rs b/tests/cron_test.rs new file mode 100644 index 0000000..856490e --- /dev/null +++ b/tests/cron_test.rs @@ -0,0 +1,489 @@ +#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] + +mod common; + +use durable::{ + Durable, DurableError, MIGRATOR, ScheduleFilter, ScheduleOptions, SpawnOptions, setup_pgcron, +}; +use serde_json::json; +use sqlx::PgPool; +use std::collections::HashMap; + +/// Check if pg_cron is available in this database. Tests that require pg_cron +/// should call this and return early if it's not installed. +async fn pgcron_available(pool: &PgPool) -> bool { + let result: Result<(bool,), _> = + sqlx::query_as("SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'pg_cron')") + .fetch_one(pool) + .await; + match result { + Ok((exists,)) => exists, + Err(_) => false, + } +} + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_setup_pgcron(pool: PgPool) { + // If pg_cron extension is available, setup should succeed + // If not, it should return PgCronUnavailable + let result = setup_pgcron(&pool).await; + if pgcron_available(&pool).await { + result.unwrap(); + } else { + let err = result.unwrap_err(); + assert!(matches!(err, DurableError::PgCronUnavailable { .. })); + } +} + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_create_and_list_schedule(pool: PgPool) { + if !pgcron_available(&pool).await { + eprintln!("skipping test_create_and_list_schedule: pg_cron not available"); + return; + } + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name("test_cron_create_list") + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + let mut metadata = HashMap::new(); + metadata.insert("team".to_string(), json!("payments")); + metadata.insert("env".to_string(), json!("production")); + + let options = ScheduleOptions { + task_name: "process-payments".to_string(), + cron_expression: "*/5 * * * *".to_string(), + params: json!({"batch_size": 100}), + spawn_options: SpawnOptions::default(), + metadata: Some(metadata), + }; + + durable + .create_schedule("payment-schedule", options) + .await + .unwrap(); + + let schedules = durable.list_schedules(None).await.unwrap(); + assert_eq!(schedules.len(), 1); + + let schedule = &schedules[0]; + assert_eq!(schedule.name, "payment-schedule"); + assert_eq!(schedule.cron_expression, "*/5 * * * *"); + assert_eq!(schedule.task_name, "process-payments"); + assert_eq!(schedule.params, json!({"batch_size": 100})); + assert_eq!(schedule.metadata.get("team"), Some(&json!("payments"))); + assert_eq!(schedule.metadata.get("env"), Some(&json!("production"))); + assert_eq!( + schedule.pgcron_job_name, + "durable::test_cron_create_list::payment-schedule" + ); + + // Cleanup + durable.delete_schedule("payment-schedule").await.unwrap(); +} + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_create_schedule_upsert(pool: PgPool) { + if !pgcron_available(&pool).await { + eprintln!("skipping test_create_schedule_upsert: pg_cron not available"); + return; + } + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name("test_cron_upsert") + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + // Create initial schedule + let options = ScheduleOptions { + task_name: "task-v1".to_string(), + cron_expression: "*/5 * * * *".to_string(), + params: json!({"version": 1}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + durable + .create_schedule("my-schedule", options) + .await + .unwrap(); + + // Update with same name + let options2 = ScheduleOptions { + task_name: "task-v2".to_string(), + cron_expression: "*/10 * * * *".to_string(), + params: json!({"version": 2}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + durable + .create_schedule("my-schedule", options2) + .await + .unwrap(); + + // Should still be just 1 schedule, but updated + let schedules = durable.list_schedules(None).await.unwrap(); + assert_eq!(schedules.len(), 1); + assert_eq!(schedules[0].task_name, "task-v2"); + assert_eq!(schedules[0].cron_expression, "*/10 * * * *"); + assert_eq!(schedules[0].params, json!({"version": 2})); + + // Cleanup + durable.delete_schedule("my-schedule").await.unwrap(); +} + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_delete_schedule(pool: PgPool) { + if !pgcron_available(&pool).await { + eprintln!("skipping test_delete_schedule: pg_cron not available"); + return; + } + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name("test_cron_delete") + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + let options = ScheduleOptions { + task_name: "cleanup-task".to_string(), + cron_expression: "0 0 * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + durable + .create_schedule("daily-cleanup", options) + .await + .unwrap(); + + // Verify it exists + let schedules = durable.list_schedules(None).await.unwrap(); + assert_eq!(schedules.len(), 1); + + // Delete it + durable.delete_schedule("daily-cleanup").await.unwrap(); + + // Verify it's gone + let schedules = durable.list_schedules(None).await.unwrap(); + assert_eq!(schedules.len(), 0); +} + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_delete_nonexistent_schedule(pool: PgPool) { + if !pgcron_available(&pool).await { + eprintln!("skipping test_delete_nonexistent_schedule: pg_cron not available"); + return; + } + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name("test_cron_delete_missing") + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + let result = durable.delete_schedule("nonexistent").await; + assert!(result.is_err()); + match result.unwrap_err() { + DurableError::ScheduleNotFound { + schedule_name, + queue_name, + } => { + assert_eq!(schedule_name, "nonexistent"); + assert_eq!(queue_name, "test_cron_delete_missing"); + } + other => panic!("expected ScheduleNotFound, got: {other:?}"), + } +} + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_create_schedule_invalid_cron(pool: PgPool) { + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name("test_cron_invalid") + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + let options = ScheduleOptions { + task_name: "my-task".to_string(), + cron_expression: "invalid cron".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + + let result = durable.create_schedule("bad-cron", options).await; + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + DurableError::InvalidCronExpression { .. } + )); +} + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_schedule_injects_metadata_headers(pool: PgPool) { + if !pgcron_available(&pool).await { + eprintln!("skipping test_schedule_injects_metadata_headers: pg_cron not available"); + return; + } + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name("test_cron_headers") + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + let options = ScheduleOptions { + task_name: "header-task".to_string(), + cron_expression: "0 0 * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + durable + .create_schedule("header-test", options) + .await + .unwrap(); + + // Verify the spawn_options in the registry contain the durable:: headers + let schedules = durable.list_schedules(None).await.unwrap(); + assert_eq!(schedules.len(), 1); + + let spawn_opts = &schedules[0].spawn_options; + let headers = spawn_opts.get("headers").expect("should have headers"); + assert_eq!( + headers.get("durable::scheduled_by"), + Some(&json!("header-test")) + ); + assert_eq!(headers.get("durable::cron"), Some(&json!("0 0 * * *"))); + + // Cleanup + durable.delete_schedule("header-test").await.unwrap(); +} + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_list_schedules_filter_by_metadata(pool: PgPool) { + if !pgcron_available(&pool).await { + eprintln!("skipping test_list_schedules_filter_by_metadata: pg_cron not available"); + return; + } + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name("test_cron_filter_meta") + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + // Create schedules with different metadata + let mut meta_payments = HashMap::new(); + meta_payments.insert("team".to_string(), json!("payments")); + + let mut meta_billing = HashMap::new(); + meta_billing.insert("team".to_string(), json!("billing")); + + let options1 = ScheduleOptions { + task_name: "task-a".to_string(), + cron_expression: "*/5 * * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: Some(meta_payments.clone()), + }; + durable + .create_schedule("schedule-a", options1) + .await + .unwrap(); + + let options2 = ScheduleOptions { + task_name: "task-b".to_string(), + cron_expression: "*/10 * * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: Some(meta_billing), + }; + durable + .create_schedule("schedule-b", options2) + .await + .unwrap(); + + // Filter by payments team + let filter = ScheduleFilter { + metadata: Some(meta_payments), + ..Default::default() + }; + let schedules = durable.list_schedules(Some(filter)).await.unwrap(); + assert_eq!(schedules.len(), 1); + assert_eq!(schedules[0].name, "schedule-a"); + + // No filter returns both + let all = durable.list_schedules(None).await.unwrap(); + assert_eq!(all.len(), 2); + + // Cleanup + durable.delete_schedule("schedule-a").await.unwrap(); + durable.delete_schedule("schedule-b").await.unwrap(); +} + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_list_schedules_filter_by_task_name(pool: PgPool) { + if !pgcron_available(&pool).await { + eprintln!("skipping test_list_schedules_filter_by_task_name: pg_cron not available"); + return; + } + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name("test_cron_filter_task") + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + let options1 = ScheduleOptions { + task_name: "process-orders".to_string(), + cron_expression: "*/5 * * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + durable.create_schedule("orders-1", options1).await.unwrap(); + + let options2 = ScheduleOptions { + task_name: "process-orders".to_string(), + cron_expression: "*/15 * * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + durable.create_schedule("orders-2", options2).await.unwrap(); + + let options3 = ScheduleOptions { + task_name: "send-reports".to_string(), + cron_expression: "0 9 * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + durable.create_schedule("reports", options3).await.unwrap(); + + // Filter by task name + let filter = ScheduleFilter { + task_name: Some("process-orders".to_string()), + ..Default::default() + }; + let schedules = durable.list_schedules(Some(filter)).await.unwrap(); + assert_eq!(schedules.len(), 2); + assert!(schedules.iter().all(|s| s.task_name == "process-orders")); + + // Filter by different task name + let filter = ScheduleFilter { + task_name: Some("send-reports".to_string()), + ..Default::default() + }; + let schedules = durable.list_schedules(Some(filter)).await.unwrap(); + assert_eq!(schedules.len(), 1); + assert_eq!(schedules[0].name, "reports"); + + // Cleanup + durable.delete_schedule("orders-1").await.unwrap(); + durable.delete_schedule("orders-2").await.unwrap(); + durable.delete_schedule("reports").await.unwrap(); +} + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_list_schedules_combined_filter(pool: PgPool) { + if !pgcron_available(&pool).await { + eprintln!("skipping test_list_schedules_combined_filter: pg_cron not available"); + return; + } + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name("test_cron_filter_combo") + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + let mut meta_payments = HashMap::new(); + meta_payments.insert("team".to_string(), json!("payments")); + + let mut meta_billing = HashMap::new(); + meta_billing.insert("team".to_string(), json!("billing")); + + // Same task, different metadata + let options1 = ScheduleOptions { + task_name: "process-data".to_string(), + cron_expression: "*/5 * * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: Some(meta_payments.clone()), + }; + durable + .create_schedule("data-payments", options1) + .await + .unwrap(); + + let options2 = ScheduleOptions { + task_name: "process-data".to_string(), + cron_expression: "*/10 * * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: Some(meta_billing.clone()), + }; + durable + .create_schedule("data-billing", options2) + .await + .unwrap(); + + // Different task, same metadata + let options3 = ScheduleOptions { + task_name: "send-alerts".to_string(), + cron_expression: "0 * * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: Some(meta_payments.clone()), + }; + durable + .create_schedule("alerts-payments", options3) + .await + .unwrap(); + + // Filter by task + metadata + let filter = ScheduleFilter { + task_name: Some("process-data".to_string()), + metadata: Some(meta_payments), + }; + let schedules = durable.list_schedules(Some(filter)).await.unwrap(); + assert_eq!(schedules.len(), 1); + assert_eq!(schedules[0].name, "data-payments"); + + // Cleanup + durable.delete_schedule("data-payments").await.unwrap(); + durable.delete_schedule("data-billing").await.unwrap(); + durable.delete_schedule("alerts-payments").await.unwrap(); +}