Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Use at your own risk.
- **Await events** - Pause until external events arrive (with optional timeouts)
- **Retry on failure** - Configurable retry strategies with exponential backoff
- **Scale horizontally** - Multiple workers can process tasks concurrently
- **Cron scheduling** - Run tasks on recurring schedules via pg_cron integration

Unlike exception-based durable execution systems (Python, TypeScript), this SDK uses Rust's `Result` type for suspension control flow, making it idiomatic and type-safe.

Expand Down Expand Up @@ -260,6 +261,101 @@ client.emit_event(
).await?;
```

### Cron Scheduling

Schedule tasks to run on a recurring basis using [pg_cron](https://github.com/citusdata/pg_cron). Durable manages the pg_cron jobs and maintains a registry for discovery and filtering.

**Setup** - Enable pg_cron once at startup:

```rust
use durable::setup_pgcron;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to leave pgcron optional? we could make it mandatory later


// Enable the pg_cron extension (requires superuser or appropriate privileges)
setup_pgcron(client.pool()).await?;
```

**Create a schedule:**

```rust
use durable::ScheduleOptions;

let options = ScheduleOptions {
task_name: "process-payments".to_string(),
cron_expression: "*/5 * * * *".to_string(), // Every 5 minutes
params: json!({"batch_size": 100}),
spawn_options: SpawnOptions::default(),
metadata: Some(HashMap::from([
("team".to_string(), json!("payments")),
("env".to_string(), json!("production")),
])),
};

// Creates or updates the schedule (upsert semantics)
client.create_schedule("payment-schedule", options).await?;
```

**List and filter schedules:**

```rust
use durable::ScheduleFilter;

// List all schedules on this queue
let all = client.list_schedules(None).await?;

// Filter by task name
let filter = ScheduleFilter {
task_name: Some("process-payments".to_string()),
..Default::default()
};
let filtered = client.list_schedules(Some(filter)).await?;

// Filter by metadata (JSONB containment)
let filter = ScheduleFilter {
metadata: Some(HashMap::from([("team".to_string(), json!("payments"))])),
..Default::default()
};
let filtered = client.list_schedules(Some(filter)).await?;
```

**Delete a schedule:**

```rust
// Removes the schedule and its pg_cron job. In-flight tasks are not cancelled.
client.delete_schedule("payment-schedule").await?;
```

**Key behaviors:**

- **pg_cron integration** - Schedules are backed by PostgreSQL's pg_cron extension. At each tick, pg_cron inserts a task into the queue via `durable.spawn_task()`, and workers pick it up normally.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

README uses "PostgreSQL" instead of "Postgres"

Low Severity

The newly added line uses "PostgreSQL's pg_cron extension" instead of the preferred "Postgres's pg_cron extension", violating the project convention to prefer "Postgres" over "PostgreSQL" in documentation.

Fix in Cursor Fix in Web

Triggered by team rule: Prefer "Postgres" instead of PostgreSQL"

- **Upsert semantics** - Calling `create_schedule` with an existing name updates the schedule in place.
- **Origin tracking** - Scheduled tasks automatically receive `durable::scheduled_by` and `durable::cron` headers, so tasks can identify how they were spawned.
- **Metadata filtering** - Attach arbitrary JSON metadata to schedules and filter with JSONB containment queries.
- **Queue cleanup** - Dropping a queue automatically unschedules all its cron jobs.

### Polling Task Results

You can poll for the result of a spawned task without running a worker in the same process:

```rust
use durable::TaskStatus;

let spawned = client.spawn::<MyTask>(params).await?;

// Poll for the result
let result = client.get_task_result(spawned.task_id).await?;

if let Some(poll) = result {
match poll.status {
TaskStatus::Completed => println!("Output: {:?}", poll.output),
TaskStatus::Failed => println!("Error: {:?}", poll.error),
TaskStatus::Pending | TaskStatus::Running | TaskStatus::Sleeping => {
println!("Still in progress...")
}
TaskStatus::Cancelled => println!("Task was cancelled"),
}
}
```

### Transactional Spawning

You can atomically enqueue a task as part of a larger database transaction. This ensures that either both your write and the task spawn succeed, or neither does:
Expand Down Expand Up @@ -318,11 +414,22 @@ This is useful when you need to guarantee that a task is only enqueued if relate
| [`RetryStrategy`] | Retry behavior: `None`, `Fixed`, or `Exponential` |
| [`CancellationPolicy`] | Auto-cancel tasks based on delay or duration |

### Cron Scheduling

| Type | Description |
|------|-------------|
| [`ScheduleOptions`] | Options for creating a cron schedule (task, expression, params, metadata) |
| [`ScheduleInfo`] | Information about an existing schedule |
| [`ScheduleFilter`] | Filter for listing schedules (by task name or metadata) |
| [`setup_pgcron()`] | Initialize the pg_cron extension |

### Results

| Type | Description |
|------|-------------|
| [`SpawnResult`] | Returned when spawning a task (task_id, run_id, attempt) |
| [`TaskPollResult`] | Result of polling a task (status, output, error) |
| [`TaskStatus`] | Task state: `Pending`, `Running`, `Sleeping`, `Completed`, `Failed`, `Cancelled` |
| [`ControlFlow`] | Signals for suspension and cancellation |

## Environment Variables
Expand Down
45 changes: 45 additions & 0 deletions sql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,26 @@ create table if not exists durable.queues (
created_at timestamptz not null default durable.current_time()
);

create table if not exists durable.cron_schedules (
schedule_name text not null,
queue_name text not null,
task_name text not null,
cron_expression text not null,
params jsonb not null default '{}'::jsonb,
spawn_options jsonb not null default '{}'::jsonb,
metadata jsonb not null default '{}'::jsonb,
pgcron_job_name text not null,
created_at timestamptz not null default durable.current_time(),
updated_at timestamptz not null default durable.current_time(),
primary key (queue_name, schedule_name)
);

create index if not exists idx_cron_schedules_metadata
on durable.cron_schedules using gin (metadata);

create index if not exists idx_cron_schedules_task_name
on durable.cron_schedules (queue_name, task_name);

create function durable.ensure_queue_tables (p_queue_name text)
returns void
language plpgsql
Expand Down Expand Up @@ -252,12 +272,15 @@ end;
$$;

-- Drop a queue if it exists.
-- Also cleans up any cron schedules and their pg_cron jobs for the queue.
create function durable.drop_queue (p_queue_name text)
returns void
language plpgsql
as $$
declare
v_existing_queue text;
v_rec record;
v_jobid bigint;
begin
select queue_name into v_existing_queue
from durable.queues
Expand All @@ -267,6 +290,28 @@ begin
return;
end if;

-- Clean up any cron schedules associated with this queue
for v_rec in
select pgcron_job_name
from durable.cron_schedules
where queue_name = p_queue_name
loop
begin
select jobid into v_jobid
from cron.job
where jobname = v_rec.pgcron_job_name;

if v_jobid is not null then
perform cron.unschedule(v_jobid);
end if;
exception when others then
-- pg_cron may not be installed; ignore errors
null;
end;
end loop;

delete from durable.cron_schedules where queue_name = p_queue_name;

execute format('drop table if exists durable.%I cascade', 'w_' || p_queue_name);
execute format('drop table if exists durable.%I cascade', 'e_' || p_queue_name);
execute format('drop table if exists durable.%I cascade', 'c_' || p_queue_name);
Expand Down
8 changes: 6 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl CancellationPolicyDb {
use crate::worker::Worker;

/// Validates that user-provided headers don't use reserved prefixes.
fn validate_headers(headers: &Option<HashMap<String, JsonValue>>) -> DurableResult<()> {
pub(crate) fn validate_headers(headers: &Option<HashMap<String, JsonValue>>) -> DurableResult<()> {
if let Some(headers) = headers {
for key in headers.keys() {
if key.starts_with("durable::") {
Expand Down Expand Up @@ -334,6 +334,10 @@ where
&self.state
}

pub(crate) fn spawn_defaults(&self) -> &SpawnDefaults {
&self.spawn_defaults
}

/// Register a task type. Required before spawning or processing.
///
/// Returns an error if a task with the same name is already registered.
Expand Down Expand Up @@ -578,7 +582,7 @@ where
})
}

fn serialize_spawn_options(
pub(crate) fn serialize_spawn_options(
options: &SpawnOptions,
max_attempts: u32,
) -> serde_json::Result<JsonValue> {
Expand Down
Loading
Loading