-
Notifications
You must be signed in to change notification settings - Fork 0
add pgcron + scheduling primitives to Durable #75
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
422b50a
45fbd3b
67d5cbc
521d757
d6dbf75
c468c46
5f45860
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,6 +14,7 @@ Use at your own risk. | |
| - **Await events** - Pause until external events arrive (with optional timeouts) | ||
| - **Retry on failure** - Configurable retry strategies with exponential backoff | ||
| - **Scale horizontally** - Multiple workers can process tasks concurrently | ||
| - **Cron scheduling** - Run tasks on recurring schedules via pg_cron integration | ||
|
|
||
| Unlike exception-based durable execution systems (Python, TypeScript), this SDK uses Rust's `Result` type for suspension control flow, making it idiomatic and type-safe. | ||
|
|
||
|
|
@@ -260,6 +261,101 @@ client.emit_event( | |
| ).await?; | ||
| ``` | ||
|
|
||
| ### Cron Scheduling | ||
|
|
||
| Schedule tasks to run on a recurring basis using [pg_cron](https://github.com/citusdata/pg_cron). Durable manages the pg_cron jobs and maintains a registry for discovery and filtering. | ||
|
|
||
| **Setup** - Enable pg_cron once at startup: | ||
|
|
||
| ```rust | ||
| use durable::setup_pgcron; | ||
|
|
||
| // Enable the pg_cron extension (requires superuser or appropriate privileges) | ||
| setup_pgcron(client.pool()).await?; | ||
| ``` | ||
|
|
||
| **Create a schedule:** | ||
|
|
||
| ```rust | ||
| use durable::ScheduleOptions; | ||
|
|
||
| let options = ScheduleOptions { | ||
| task_name: "process-payments".to_string(), | ||
| cron_expression: "*/5 * * * *".to_string(), // Every 5 minutes | ||
| params: json!({"batch_size": 100}), | ||
| spawn_options: SpawnOptions::default(), | ||
| metadata: Some(HashMap::from([ | ||
| ("team".to_string(), json!("payments")), | ||
| ("env".to_string(), json!("production")), | ||
| ])), | ||
| }; | ||
|
|
||
| // Creates or updates the schedule (upsert semantics) | ||
| client.create_schedule("payment-schedule", options).await?; | ||
| ``` | ||
|
|
||
| **List and filter schedules:** | ||
|
|
||
| ```rust | ||
| use durable::ScheduleFilter; | ||
|
|
||
| // List all schedules on this queue | ||
| let all = client.list_schedules(None).await?; | ||
|
|
||
| // Filter by task name | ||
| let filter = ScheduleFilter { | ||
| task_name: Some("process-payments".to_string()), | ||
| ..Default::default() | ||
| }; | ||
| let filtered = client.list_schedules(Some(filter)).await?; | ||
|
|
||
| // Filter by metadata (JSONB containment) | ||
| let filter = ScheduleFilter { | ||
| metadata: Some(HashMap::from([("team".to_string(), json!("payments"))])), | ||
| ..Default::default() | ||
| }; | ||
| let filtered = client.list_schedules(Some(filter)).await?; | ||
| ``` | ||
|
|
||
| **Delete a schedule:** | ||
|
|
||
| ```rust | ||
| // Removes the schedule and its pg_cron job. In-flight tasks are not cancelled. | ||
| client.delete_schedule("payment-schedule").await?; | ||
| ``` | ||
|
|
||
| **Key behaviors:** | ||
|
|
||
| - **pg_cron integration** - Schedules are backed by PostgreSQL's pg_cron extension. At each tick, pg_cron inserts a task into the queue via `durable.spawn_task()`, and workers pick it up normally. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. README uses "PostgreSQL" instead of "Postgres"Low Severity The newly added line uses "PostgreSQL's pg_cron extension" instead of the preferred "Postgres's pg_cron extension", violating the project convention to prefer "Postgres" over "PostgreSQL" in documentation. Triggered by team rule: Prefer "Postgres" instead of PostgreSQL" |
||
| - **Upsert semantics** - Calling `create_schedule` with an existing name updates the schedule in place. | ||
| - **Origin tracking** - Scheduled tasks automatically receive `durable::scheduled_by` and `durable::cron` headers, so tasks can identify how they were spawned. | ||
| - **Metadata filtering** - Attach arbitrary JSON metadata to schedules and filter with JSONB containment queries. | ||
| - **Queue cleanup** - Dropping a queue automatically unschedules all its cron jobs. | ||
|
|
||
| ### Polling Task Results | ||
|
|
||
| You can poll for the result of a spawned task without running a worker in the same process: | ||
|
|
||
| ```rust | ||
| use durable::TaskStatus; | ||
|
|
||
| let spawned = client.spawn::<MyTask>(params).await?; | ||
|
|
||
| // Poll for the result | ||
| let result = client.get_task_result(spawned.task_id).await?; | ||
|
|
||
| if let Some(poll) = result { | ||
| match poll.status { | ||
| TaskStatus::Completed => println!("Output: {:?}", poll.output), | ||
| TaskStatus::Failed => println!("Error: {:?}", poll.error), | ||
| TaskStatus::Pending | TaskStatus::Running | TaskStatus::Sleeping => { | ||
| println!("Still in progress...") | ||
| } | ||
| TaskStatus::Cancelled => println!("Task was cancelled"), | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| ### Transactional Spawning | ||
|
|
||
| You can atomically enqueue a task as part of a larger database transaction. This ensures that either both your write and the task spawn succeed, or neither does: | ||
|
|
@@ -318,11 +414,22 @@ This is useful when you need to guarantee that a task is only enqueued if relate | |
| | [`RetryStrategy`] | Retry behavior: `None`, `Fixed`, or `Exponential` | | ||
| | [`CancellationPolicy`] | Auto-cancel tasks based on delay or duration | | ||
|
|
||
| ### Cron Scheduling | ||
|
|
||
| | Type | Description | | ||
| |------|-------------| | ||
| | [`ScheduleOptions`] | Options for creating a cron schedule (task, expression, params, metadata) | | ||
| | [`ScheduleInfo`] | Information about an existing schedule | | ||
| | [`ScheduleFilter`] | Filter for listing schedules (by task name or metadata) | | ||
| | [`setup_pgcron()`] | Initialize the pg_cron extension | | ||
|
|
||
| ### Results | ||
|
|
||
| | Type | Description | | ||
| |------|-------------| | ||
| | [`SpawnResult`] | Returned when spawning a task (task_id, run_id, attempt) | | ||
| | [`TaskPollResult`] | Result of polling a task (status, output, error) | | ||
| | [`TaskStatus`] | Task state: `Pending`, `Running`, `Sleeping`, `Completed`, `Failed`, `Cancelled` | | ||
| | [`ControlFlow`] | Signals for suspension and cancellation | | ||
|
|
||
| ## Environment Variables | ||
|
|
||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to leave pgcron optional? we could make it mandatory later