diff --git a/Cargo.lock b/Cargo.lock index b65c61ca8..da131d79d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -537,9 +537,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.7.4" +version = "1.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77ed8e8c52d2dc2390ad9f15647fe663f71e9780b4262c190fbb823a32721566" +checksum = "6c9b9de216a988dd54b754a82a7660cfe14cee4f6782ae4524470972fa0ccb39" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -599,6 +599,31 @@ dependencies = [ "url", ] +[[package]] +name = "aws-sdk-sqs" +version = "1.102.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0246bf049cfc003ce44599dff955b9353758de3afa68a053da9b2c7de20a07d8" +dependencies = [ + "arc-swap", + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-observability", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "http 1.4.1", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sdk-sts" version = "1.106.0" @@ -682,9 +707,9 @@ dependencies = [ [[package]] name = "aws-smithy-eventstream" -version = "0.60.20" +version = "0.60.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "faf09d74e5e32f76b8762da505a3cd59303e367a664ca67295387baa8c1d7548" +checksum = "78d8391e65fcea47c586a22e1a41f173b38615b112b2c6b7a44e80cec3e6b706" dependencies = [ "aws-smithy-types", "bytes", @@ -835,9 +860,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.4.9" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53f93074121a1be41317b9aa607143ae17900631f7f59a99f2b905d519d6783b" +checksum = "32b42fcf341259d85ca10fac9a2f6448a8ec691c6955a18e45bc3b71a85fab85" dependencies = [ "base64-simd", "bytes", @@ -1235,8 +1260,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aa79e62e7697b8e29b513a68abacf485adcd1fe8284a4316c5ae868e6633327" dependencies = [ "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-link", ] @@ -2077,6 +2104,16 @@ dependencies = [ "tokio", ] +[[package]] +name = "docs_rs_crates_io" +version = "0.1.0" +dependencies = [ + "chrono", + "semver", + "serde", + "serde_json", +] + [[package]] name = "docs_rs_database" version = "0.0.0" @@ -2416,12 +2453,15 @@ name = "docs_rs_watcher" version = "0.6.0" dependencies = [ "anyhow", + "aws-config", + "aws-sdk-sqs", "clap", "crates-index", "crates-index-diff", "docs_rs_build_queue", "docs_rs_config", "docs_rs_context", + "docs_rs_crates_io", "docs_rs_database", "docs_rs_env_vars", "docs_rs_fastly", @@ -2438,10 +2478,12 @@ dependencies = [ "opentelemetry", "pretty_assertions", "rayon", + "serde_json", "sqlx", "test-case", "tokio", "tracing", + "url", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 1d178d433..3f7d1a58b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,11 @@ edition = "2024" anyhow = { version = "1.0.42", features = ["backtrace"] } askama = "0.16.0" async-stream = "0.3.5" +# The default `rustls` feature pulls in the legacy hyper 0.14 + rustls 0.21 +# stack via `aws-smithy-runtime/tls-rustls`, which includes the vulnerable +# `rustls-webpki` v0.101.x. Using only `default-https-client` avoids this by +# using the modern rustls 0.23 + hyper 1.x stack instead. +aws-config = { version = "1.0.0", default-features = false, features = ["default-https-client", "rt-tokio"] } axum-extra = { version = "0.12.0", features = ["middleware", "routing", "typed-header"] } base64 = "0.22" bon = { version = "3.8.1", features = ["experimental-overwritable"] } diff --git a/crates/bin/docs_rs_watcher/Cargo.toml b/crates/bin/docs_rs_watcher/Cargo.toml index 0b0f9ba6a..4b24a2d55 100644 --- a/crates/bin/docs_rs_watcher/Cargo.toml +++ b/crates/bin/docs_rs_watcher/Cargo.toml @@ -8,6 +8,8 @@ edition.workspace = true [dependencies] anyhow = { workspace = true } +aws-config = { workspace = true } +aws-sdk-sqs = { version = "1.99.0", default-features = false, features = ["default-https-client", "rt-tokio"] } clap = { workspace = true } # NOTE: on the new infra, switch back from `git-https-reqwest` to `git-https` (curl) once the curl version is new enough crates-index = { version = "3.0.0", default-features = false, features = ["git", "git-https-reqwest", "git-performance", "parallel"] } @@ -15,6 +17,7 @@ crates-index = { version = "3.0.0", default-features = false, features = ["git", crates-index-diff = { version = "30.0.0", default-features = false, features = ["http-reqwest", "max-performance", "semver"] } docs_rs_build_queue = { path = "../../lib/docs_rs_build_queue" } docs_rs_config = { path = "../../lib/docs_rs_config" } +docs_rs_crates_io = { path = "../../lib/docs_rs_crates_io" } docs_rs_context = { path = "../../lib/docs_rs_context" } docs_rs_database = { path = "../../lib/docs_rs_database" } docs_rs_env_vars = { path = "../../lib/docs_rs_env_vars" } @@ -27,11 +30,13 @@ docs_rs_types = { path = "../../lib/docs_rs_types" } docs_rs_utils = { path = "../../lib/docs_rs_utils" } futures-util = { workspace = true } itertools = { workspace = true } +serde_json = { workspace = true } opentelemetry = { workspace = true } rayon = "1.6.1" sqlx = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +url = { workspace = true } [dev-dependencies] docs_rs_config = { path = "../../lib/docs_rs_config", features = ["testing"] } diff --git a/crates/bin/docs_rs_watcher/src/config.rs b/crates/bin/docs_rs_watcher/src/config.rs index 7b5f17976..388eaa5db 100644 --- a/crates/bin/docs_rs_watcher/src/config.rs +++ b/crates/bin/docs_rs_watcher/src/config.rs @@ -2,11 +2,15 @@ use anyhow::Result; use docs_rs_config::AppConfig; use docs_rs_env_vars::{env, maybe_env, require_env}; use std::{path::PathBuf, time::Duration}; +use url::Url; #[derive(Debug)] pub struct Config { pub registry_index_path: PathBuf, pub registry_url: Option, + pub sqs_queue_url: Option, + pub sqs_region: Option, + pub aws_sdk_max_retries: u32, /// How long to wait between registry checks pub delay_between_registry_fetches: Duration, @@ -29,6 +33,11 @@ impl AppConfig for Config { Ok(Self { registry_index_path: env("REGISTRY_INDEX_PATH", prefix.join("crates.io-index"))?, registry_url: maybe_env("REGISTRY_URL")?, + + sqs_queue_url: maybe_env("DOCSRS_SQS_QUEUE_URL")?, + sqs_region: maybe_env("DOCSRS_SQS_REGION")?, + aws_sdk_max_retries: env("DOCSRS_AWS_SDK_MAX_RETRIES", 6u32)?, + delay_between_registry_fetches: Duration::from_secs(env::( "DOCSRS_DELAY_BETWEEN_REGISTRY_FETCHES", 60, diff --git a/crates/bin/docs_rs_watcher/src/db/delete.rs b/crates/bin/docs_rs_watcher/src/db/delete.rs index 29fbe3796..4b97be297 100644 --- a/crates/bin/docs_rs_watcher/src/db/delete.rs +++ b/crates/bin/docs_rs_watcher/src/db/delete.rs @@ -67,7 +67,13 @@ pub async fn delete_version( return Ok(()); }; - let is_library = delete_version_from_database(conn, config, name, crate_id, version).await?; + let Some(is_library) = + delete_version_from_database(conn, config, name, crate_id, version).await? + else { + // release doesn't exist + return Ok(()); + }; + let paths = if is_library { LIBRARY_STORAGE_PATHS_TO_DELETE } else { @@ -133,7 +139,18 @@ async fn delete_version_from_database( name: &KrateName, crate_id: CrateId, version: &Version, -) -> Result { +) -> Result> { + let Some(release_id) = sqlx::query_scalar!( + "SELECT id FROM releases WHERE crate_id = $1 AND version = $2", + crate_id as _, + version as _ + ) + .fetch_optional(&mut *conn) + .await? + else { + return Ok(None); + }; + let mut transaction = conn.begin().await?; let delete_lock_timeout = format!("{}ms", config.delete_lock_timeout.as_millis()); @@ -157,23 +174,23 @@ async fn delete_version_from_database( sqlx::query!( "DELETE FROM builds_logs bl USING builds b - JOIN releases r ON b.rid = r.id - WHERE bl.build_id = b.id AND r.crate_id = $1 AND r.version = $2;", - crate_id as _, - version as _ + WHERE bl.build_id = b.id AND b.rid = $1;", + release_id as _, ) .execute(&mut *transaction) .await?; for &(table, column) in METADATA { - sqlx::query(sqlx::AssertSqlSafe( - format!("DELETE FROM {table} WHERE {column} IN (SELECT id FROM releases WHERE crate_id = $1 AND version = $2)"))) - .bind(crate_id).bind(version).execute(&mut *transaction).await?; + sqlx::query(sqlx::AssertSqlSafe(format!( + "DELETE FROM {table} WHERE {column} = $1" + ))) + .bind(release_id) + .execute(&mut *transaction) + .await?; } let is_library: bool = sqlx::query_scalar!( - "DELETE FROM releases WHERE crate_id = $1 AND version = $2 RETURNING is_library", - crate_id.0, - version as _, + "DELETE FROM releases WHERE id = $1 RETURNING is_library", + release_id as _, ) .fetch_one(&mut *transaction) .await? @@ -190,7 +207,7 @@ async fn delete_version_from_database( update_latest_version_id(&mut transaction, crate_id).await?; transaction.commit().await?; - Ok(is_library) + Ok(Some(is_library)) } /// Returns whether any release in this crate was a library @@ -449,6 +466,13 @@ mod tests { ); } + // running delete-crate again doesn't error. + assert!( + delete_crate(&mut conn, storage, env.config(), &FOO) + .await + .is_ok() + ); + Ok(()) } @@ -613,6 +637,13 @@ mod tests { vec!["Peter Rabbit".to_string()] ); + // running delete-version again doesn't fail. + assert!( + delete_version(&mut conn, storage, env.config(), &KRATE, &V1) + .await + .is_ok() + ); + // FIXME: remove for now until test frontend is async // let web = env.frontend(); // assert_success("/a/2.0.0/a/", web)?; @@ -691,6 +722,32 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "multi_thread")] + async fn test_delete_already_deleted_version_doesnt_error() -> Result<()> { + let env = TestEnvironment::new().await?; + let mut conn = env.async_conn().await?; + + env.fake_release() + .await + .name(&KRATE) + .version(V1) + .create() + .await?; + env.fake_release() + .await + .name(&KRATE) + .version(V2) + .create() + .await?; + + delete_version(&mut conn, env.storage()?, env.config(), &KRATE, &V1).await?; + delete_version(&mut conn, env.storage()?, env.config(), &KRATE, &V1).await?; + + assert!(crate_exists(&mut conn, &KRATE).await?); + + Ok(()) + } + #[tokio::test(flavor = "multi_thread")] async fn test_delete_version_waits_for_locked_queue_rows() -> Result<()> { let env = TestEnvironment::new().await?; diff --git a/crates/bin/docs_rs_watcher/src/index_watcher.rs b/crates/bin/docs_rs_watcher/src/index_watcher.rs index 68b964d79..87e00cd50 100644 --- a/crates/bin/docs_rs_watcher/src/index_watcher.rs +++ b/crates/bin/docs_rs_watcher/src/index_watcher.rs @@ -2,6 +2,7 @@ use crate::{ Config, db::{delete_crate, delete_version}, index::Index, + synchronization::CrateLocks, }; use anyhow::{Context as _, Result}; use crates_index_diff::Change; @@ -45,6 +46,30 @@ impl TryFrom for CrateVersion { } } +impl TryFrom<&docs_rs_crates_io::events::CrateVersion> for CrateVersion { + type Error = anyhow::Error; + + fn try_from(value: &docs_rs_crates_io::events::CrateVersion) -> Result { + Ok(Self { + name: value.name.parse()?, + version: value.version.clone().into(), + yanked: value.yanked, + }) + } +} + +impl TryFrom for CrateVersion { + type Error = anyhow::Error; + + fn try_from(value: docs_rs_crates_io::events::CrateVersion) -> Result { + Ok(Self { + name: value.name.parse()?, + version: value.version.into(), + yanked: value.yanked, + }) + } +} + #[cfg(test)] impl From for crates_index_diff::CrateVersion { fn from(value: CrateVersion) -> Self { @@ -92,6 +117,7 @@ async fn queue_crate_invalidation(krate: &KrateName, cdn: Option<&Cdn>) { /// Returns the number of crates added pub(crate) async fn get_new_crates( context: &Context, + locks: &CrateLocks, index: &Index, config: &Config, ) -> Result { @@ -115,7 +141,7 @@ pub(crate) async fn get_new_crates( debug!(last_seen_reference=%last_seen_reference, new_reference=%new_reference, "queueing changes"); - let crates_added = process_changes(context, &changes, config).await; + let crates_added = process_changes(context, locks, &changes, config).await; if let Err(err) = context.build_queue()?.deprioritize_workspaces().await { error!(?err, "error deprioritizing workspaces"); @@ -129,11 +155,16 @@ pub(crate) async fn get_new_crates( Ok(crates_added) } -async fn process_changes(context: &Context, changes: &Vec, config: &Config) -> usize { +async fn process_changes( + context: &Context, + locks: &CrateLocks, + changes: &Vec, + config: &Config, +) -> usize { let mut crates_added = 0; for change in changes { - match process_change(context, change, config).await { + match process_change(context, locks, change, config).await { Ok(added) => { if added { crates_added += 1; @@ -148,7 +179,12 @@ async fn process_changes(context: &Context, changes: &Vec, config: &Conf } /// Process a crate change, returning whether the change was a crate addition or not. -async fn process_change(context: &Context, change: &Change, config: &Config) -> Result { +pub(crate) async fn process_change( + context: &Context, + locks: &CrateLocks, + change: &Change, + config: &Config, +) -> Result { let crate_version: CrateVersion = change .versions() .first() @@ -156,6 +192,8 @@ async fn process_change(context: &Context, change: &Change, config: &Config) -> .clone() .try_into()?; + let _guard = locks.lock(crate_version.name.to_string()).await; + match change { Change::Added(_release) => process_version_added(context, &crate_version).await?, Change::AddedAndYanked(_release) => { @@ -177,7 +215,10 @@ async fn process_change(context: &Context, change: &Change, config: &Config) -> } /// Processes crate changes, whether they got yanked or unyanked. -async fn process_version_yank_status(context: &Context, release: &CrateVersion) -> Result<()> { +pub(crate) async fn process_version_yank_status( + context: &Context, + release: &CrateVersion, +) -> Result<()> { // FIXME: delay yanks of crates that have not yet finished building // https://github.com/rust-lang/docs.rs/issues/1934 set_yanked(context, &release.name, &release.version, release.yanked).await?; @@ -185,7 +226,7 @@ async fn process_version_yank_status(context: &Context, release: &CrateVersion) Ok(()) } -async fn process_version_added(context: &Context, release: &CrateVersion) -> Result<()> { +pub(crate) async fn process_version_added(context: &Context, release: &CrateVersion) -> Result<()> { let mut conn = context.pool()?.get_async().await?; let priority = get_crate_priority(&mut conn, &release.name).await?; context @@ -216,7 +257,7 @@ async fn process_version_added(context: &Context, release: &CrateVersion) -> Res Ok(()) } -async fn process_version_deleted( +pub(crate) async fn process_version_deleted( context: &Context, config: &Config, release: &CrateVersion, @@ -250,7 +291,7 @@ async fn process_version_deleted( Ok(()) } -async fn process_crate_deleted( +pub(crate) async fn process_crate_deleted( context: &Context, config: &Config, krate: &KrateName, @@ -517,8 +558,10 @@ mod tests { version: V2, ..Default::default() }; + let locks = CrateLocks::new(); let added = process_changes( &env, + &locks, &vec![ // Should be added correctly Change::Added(krate1.into()), diff --git a/crates/bin/docs_rs_watcher/src/lib.rs b/crates/bin/docs_rs_watcher/src/lib.rs index 833a6c688..05cf5cc68 100644 --- a/crates/bin/docs_rs_watcher/src/lib.rs +++ b/crates/bin/docs_rs_watcher/src/lib.rs @@ -5,6 +5,8 @@ mod index; pub mod index_watcher; mod rebuilds; mod service_metrics; +pub mod subscriber; +pub mod synchronization; #[cfg(test)] mod testing; @@ -13,7 +15,9 @@ pub use db::{delete_crate, delete_version}; pub use index::Index; pub use rebuilds::queue_rebuilds; -use crate::{index_watcher::get_new_crates, service_metrics::OtelServiceMetrics}; +use crate::{ + index_watcher::get_new_crates, service_metrics::OtelServiceMetrics, synchronization::CrateLocks, +}; use anyhow::Result; use docs_rs_context::Context; use docs_rs_utils::start_async_cron; @@ -24,7 +28,7 @@ use tracing::{debug, error, info, trace}; /// Run the registry watcher /// NOTE: this should only be run once, otherwise crates would be added /// to the queue multiple times. -pub async fn watch_registry(config: &Config, context: &Context) -> Result<()> { +pub async fn watch_registry(config: &Config, context: &Context, locks: &CrateLocks) -> Result<()> { let mut last_gc = Instant::now(); let queue = context.build_queue()?; @@ -36,7 +40,7 @@ pub async fn watch_registry(config: &Config, context: &Context) -> Result<()> { debug!("Checking new crates"); let index = Index::from_config(config).await?; - match get_new_crates(context, &index, config).await { + match get_new_crates(context, locks, &index, config).await { Ok(n) => debug!("{} crates added to queue", n), Err(e) => { error!(?e, "Failed to get new crates"); diff --git a/crates/bin/docs_rs_watcher/src/main.rs b/crates/bin/docs_rs_watcher/src/main.rs index ba133b8e1..9ee93819c 100644 --- a/crates/bin/docs_rs_watcher/src/main.rs +++ b/crates/bin/docs_rs_watcher/src/main.rs @@ -3,7 +3,7 @@ use clap::{Parser, Subcommand}; use docs_rs_config::AppConfig as _; use docs_rs_context::Context; use docs_rs_types::{KrateName, Version}; -use docs_rs_watcher::{Config, Index, index_watcher}; +use docs_rs_watcher::{Config, Index, index_watcher, synchronization::CrateLocks}; use std::sync::Arc; #[tokio::main] @@ -81,7 +81,12 @@ impl CommandLine { // which should only run once, and all the time. docs_rs_watcher::start_background_service_metric_collector(&ctx).await?; - docs_rs_watcher::watch_registry(&config, &ctx).await?; + let locks = CrateLocks::new(); + // FIXME: we don't want to exit in error case, do we? + tokio::try_join!( + docs_rs_watcher::watch_registry(&config, &ctx, &locks), + docs_rs_watcher::subscriber::listen(&config, &ctx, &locks), + )?; } Self::Queue { subcommand } => subcommand.handle_args(config, ctx).await?, Self::Database { subcommand } => subcommand.handle_args(config, ctx).await?, diff --git a/crates/bin/docs_rs_watcher/src/subscriber.rs b/crates/bin/docs_rs_watcher/src/subscriber.rs new file mode 100644 index 000000000..78e76df5f --- /dev/null +++ b/crates/bin/docs_rs_watcher/src/subscriber.rs @@ -0,0 +1,192 @@ +use crate::{ + Config, + index_watcher::{ + process_crate_deleted, process_version_added, process_version_deleted, + process_version_yank_status, + }, + synchronization::CrateLocks, +}; +use anyhow::{Context as _, Result, bail}; +use aws_config::{BehaviorVersion, Region, retry::RetryConfig}; +use aws_sdk_sqs::Client; +use docs_rs_context::Context; +use docs_rs_crates_io::events::{IndexChangeEventV1, IndexChangeV1}; +use docs_rs_types::KrateName; +use docs_rs_utils::retry_async; +use std::time::Duration; +use tokio::time; +use tracing::{debug, error, instrument, warn}; + +/// visibility timeout: +/// should be longer than the longest time our server takes to handle a message. +/// +/// if we fetch a message, and don't delete it in this time, it will be redelivered. +const VISIBILITY_TIMEOUT: Duration = Duration::from_secs(60); + +/// wait-time (long polling): +/// +/// How long should the request be kept open when there are no messages. +const WAIT_TIME: Duration = Duration::from_secs(30); + +/// when one long-polling request is finished, how long to sleep before starting the next? +const SLEEP_BETWEEN_REQUESTS: Duration = Duration::from_secs(1); + +/// when we have an error handling a message, how long should SQS wait until +/// it redelivers this message. +const RETRY_DELAY: Duration = Duration::from_secs(30); + +pub async fn listen(config: &Config, context: &Context, locks: &CrateLocks) -> Result<()> { + let (Some(region), Some(queue_url)) = (&config.sqs_region, &config.sqs_queue_url) else { + bail!("missing sqs region or url, disabling crates.io subscriber"); + }; + let queue_url = queue_url.to_string(); + + let shared_config = aws_config::load_defaults(BehaviorVersion::latest()).await; + let client = Client::from_conf( + aws_sdk_sqs::config::Builder::from(&shared_config) + .retry_config(RetryConfig::standard().with_max_attempts(config.aws_sdk_max_retries)) + .region(Region::new(region.clone())) + .build(), + ); + + let queue = context.build_queue()?; + + loop { + if queue.is_locked().await? { + debug!("Queue is locked, skipping checking new crates"); + time::sleep(WAIT_TIME).await; + continue; + } + + let response = match client + .receive_message() + .queue_url(queue_url.clone()) + .max_number_of_messages(10) + .wait_time_seconds(WAIT_TIME.as_secs() as i32) + .visibility_timeout(VISIBILITY_TIMEOUT.as_secs() as i32) + .send() + .await + { + Ok(response) => response, + Err(err) => { + error!( + ?err, + queue_url, "error receiving messages from sqs, retrying" + ); + time::sleep(WAIT_TIME).await; + continue; + } + }; + + for message in response.messages() { + let Some(body) = message.body() else { + continue; + }; + + match retry_async( + || async move { process_message(context, config, locks, body).await }, + 3, + ) + .await + { + Ok(_) => { + if let Some(receipt_handle) = message.receipt_handle() { + // mark the message as "done" + if let Err(err) = client + .delete_message() + .queue_url(queue_url.clone()) + .receipt_handle(receipt_handle) + .send() + .await + { + // sqs will redeliver the message after the visibility timeout passed + error!( + ?err, + receipt_handle, queue_url, "error deleting message from queue" + ); + } + } + } + Err(err) => { + error!( + ?err, + ?message, + ?RETRY_DELAY, + body, + "error handling message. Retrying." + ); + + if let Some(receipt_handle) = message.receipt_handle() { + // Don't delete the message. + // It will become visible again after the visibility timeout. + if let Err(err) = client + .change_message_visibility() + .queue_url(queue_url.clone()) + .receipt_handle(receipt_handle) + // retry after some time + .visibility_timeout(RETRY_DELAY.as_secs() as i32) // retry + .send() + .await + { + // this error doesn't really matter, without the changed visibility + // timeout sqs will redeliver after the default visibility timeout. + warn!( + ?err, + receipt_handle, + queue_url, + "error setting visibility_timeout for retry" + ); + } + } + } + } + } + + time::sleep(SLEEP_BETWEEN_REQUESTS).await; + } +} + +#[instrument(skip(context, config, locks))] +async fn process_message( + context: &Context, + config: &Config, + locks: &CrateLocks, + body: &str, +) -> Result<()> { + let event: IndexChangeEventV1 = + serde_json::from_str(body).context("error parsing event from json")?; + + debug!(?event, "received event from sqs"); + + let _guard = locks.lock(event.change.name()).await; + + process_change(context, &event.change, config) + .await + .context("error processing change")?; + + Ok(()) +} + +/// Process a crate change, returning whether the change was a crate addition or not. +pub(crate) async fn process_change( + context: &Context, + change: &IndexChangeV1, + config: &Config, +) -> Result { + match change { + IndexChangeV1::Added(crate_version) => { + process_version_added(context, &crate_version.try_into().unwrap()).await? + } + IndexChangeV1::Unyanked(crate_version) | IndexChangeV1::Yanked(crate_version) => { + process_version_yank_status(context, &crate_version.try_into().unwrap()).await? + } + IndexChangeV1::CrateDeleted { name, .. } => { + let name: KrateName = name.parse()?; + process_crate_deleted(context, config, &name).await? + } + IndexChangeV1::VersionDeleted(crate_version) => { + process_version_deleted(context, config, &crate_version.try_into().unwrap()).await? + } + }; + Ok(change.added().is_some()) +} diff --git a/crates/bin/docs_rs_watcher/src/synchronization.rs b/crates/bin/docs_rs_watcher/src/synchronization.rs new file mode 100644 index 000000000..d80be40c6 --- /dev/null +++ b/crates/bin/docs_rs_watcher/src/synchronization.rs @@ -0,0 +1,27 @@ +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::{Mutex, OwnedMutexGuard}; + +#[derive(Clone, Default)] +pub struct CrateLocks { + locks: Arc>>>>, +} + +impl CrateLocks { + pub fn new() -> Self { + Self { + locks: Arc::new(Mutex::new(HashMap::new())), + } + } + + pub async fn lock(&self, crate_name: impl Into) -> OwnedMutexGuard<()> { + let lock = { + let mut locks = self.locks.lock().await; + locks + .entry(crate_name.into()) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .clone() + }; + + lock.lock_owned().await + } +} diff --git a/crates/lib/docs_rs_crates_io/Cargo.toml b/crates/lib/docs_rs_crates_io/Cargo.toml new file mode 100644 index 000000000..c6f9224b3 --- /dev/null +++ b/crates/lib/docs_rs_crates_io/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "docs_rs_crates_io" +version = "0.1.0" +description = "types & logic for the direct integration between docs.rs & crates.io" + +authors.workspace = true +license.workspace = true +repository.workspace = true +edition.workspace = true + +[dependencies] +chrono = { version = "0.4", features = ["serde"] } +semver = { version = "1", features = ["serde"] } +serde = { version = "1", features = ["derive"] } + +[dev-dependencies] +serde_json = "1.0" + +[lints] +workspace = true diff --git a/crates/lib/docs_rs_crates_io/src/events.rs b/crates/lib/docs_rs_crates_io/src/events.rs new file mode 100644 index 000000000..4b9dc4ea3 --- /dev/null +++ b/crates/lib/docs_rs_crates_io/src/events.rs @@ -0,0 +1,259 @@ +#![allow(clippy::disallowed_types)] + +use chrono::{DateTime, Utc}; +use std::fmt; + +/// A change that can happen to a crate on our index. +#[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] +#[serde(tag = "type", content = "payload", rename_all = "snake_case")] +pub enum IndexChangeV1 { + /// A crate version was added. + Added(CrateVersion), + /// A crate version was unyanked. + Unyanked(CrateVersion), + /// A crate version was yanked. + Yanked(CrateVersion), + /// The name of the crate whose file was deleted, which implies all versions were deleted as well. + CrateDeleted { name: String }, + /// A crate version was deleted. + VersionDeleted(CrateVersion), +} + +impl IndexChangeV1 { + /// Return the added crate, if this is this kind of change. + pub fn added(&self) -> Option<&CrateVersion> { + match self { + IndexChangeV1::Added(v) => Some(v), + _ => None, + } + } + + /// Return the yanked crate, if this is this kind of change. + pub fn yanked(&self) -> Option<&CrateVersion> { + match self { + IndexChangeV1::Yanked(v) => Some(v), + _ => None, + } + } + + /// Return the unyanked crate, if this is this kind of change. + pub fn unyanked(&self) -> Option<&CrateVersion> { + match self { + IndexChangeV1::Unyanked(v) => Some(v), + _ => None, + } + } + + /// Return the deleted crate, if this is this kind of change. + pub fn crate_deleted(&self) -> Option<&str> { + match self { + IndexChangeV1::CrateDeleted { name } => Some(name.as_str()), + _ => None, + } + } + + /// Return the deleted version crate, if this is this kind of change. + pub fn version_deleted(&self) -> Option<&CrateVersion> { + match self { + IndexChangeV1::VersionDeleted(v) => Some(v), + _ => None, + } + } + + pub fn name(&self) -> &str { + match self { + IndexChangeV1::Added(crate_version) => &crate_version.name, + IndexChangeV1::Unyanked(crate_version) => &crate_version.name, + IndexChangeV1::Yanked(crate_version) => &crate_version.name, + IndexChangeV1::CrateDeleted { name } => name, + IndexChangeV1::VersionDeleted(crate_version) => &crate_version.name, + } + } +} + +impl fmt::Display for IndexChangeV1 { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{}", + match *self { + IndexChangeV1::Added(_) => "added", + IndexChangeV1::Yanked(_) => "yanked", + IndexChangeV1::CrateDeleted { .. } => "crate deleted", + IndexChangeV1::VersionDeleted(_) => "version deleted", + IndexChangeV1::Unyanked(_) => "unyanked", + } + ) + } +} + +/// A conventional event envelope for our events between crates.io & docs.rs +#[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] +pub struct Event { + /// Unique event identifier for deduplication and tracing. + pub id: String, + /// Timestamp when the event occured + pub occurred_at: DateTime, + /// The typed payload. + #[serde(flatten)] + pub change: T, +} + +/// The first version of the public event wire format. +pub type IndexChangeEventV1 = Event; + +/// Pack all information we know about a change made to a version of a crate. +#[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] +pub struct CrateVersion { + /// The crate name, i.e. `clap`. + pub name: String, + /// is the release yanked? + pub yanked: bool, + /// The semantic version of the crate. + #[serde(rename = "vers")] + pub version: semver::Version, +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + fn crate_version() -> CrateVersion { + CrateVersion { + name: "clap".into(), + yanked: false, + version: semver::Version::new(4, 5, 0), + } + } + + fn event(change: IndexChangeV1) -> IndexChangeEventV1 { + IndexChangeEventV1 { + id: "evt_123".into(), + occurred_at: DateTime::parse_from_rfc3339("2026-05-22T12:34:56Z") + .unwrap() + .with_timezone(&Utc), + change, + } + } + + #[test] + fn crate_version_serializes_with_vers_field() { + let event = crate_version(); + + assert_eq!( + serde_json::to_value(&event).unwrap(), + json!({ + "name": "clap", + "yanked": false, + "vers": "4.5.0", + }) + ); + } + + #[test] + fn change_serializes_with_expected_variant_shapes() { + let crate_version = crate_version(); + + let cases = [ + ( + IndexChangeV1::Added(crate_version.clone()), + json!({ + "type": "added", + "payload": { + "name": "clap", + "yanked": false, + "vers": "4.5.0", + } + }), + ), + ( + IndexChangeV1::Unyanked(crate_version.clone()), + json!({ + "type": "unyanked", + "payload": { + "name": "clap", + "yanked": false, + "vers": "4.5.0", + } + }), + ), + ( + IndexChangeV1::Yanked(crate_version.clone()), + json!({ + "type": "yanked", + "payload": { + "name": "clap", + "yanked": false, + "vers": "4.5.0", + } + }), + ), + ( + IndexChangeV1::CrateDeleted { + name: "old-crate".into(), + }, + json!({ + "type": "crate_deleted", + "payload": { + "name": "old-crate" + } + }), + ), + ( + IndexChangeV1::VersionDeleted(crate_version), + json!({ + "type": "version_deleted", + "payload": { + "name": "clap", + "yanked": false, + "vers": "4.5.0", + } + }), + ), + ]; + + for (event, expected) in cases { + assert_eq!(serde_json::to_value(&event).unwrap(), expected); + } + } + + #[test] + fn event_serializes_with_minimum_metadata() { + let event = event(IndexChangeV1::CrateDeleted { + name: "old-crate".into(), + }); + + assert_eq!( + serde_json::to_value(&event).unwrap(), + json!({ + "id": "evt_123", + "occurred_at": "2026-05-22T12:34:56Z", + "type": "crate_deleted", + "payload": { + "name": "old-crate" + } + }) + ); + } + + #[test] + fn event_deserializes_rfc3339_occurred_at() { + let event: IndexChangeEventV1 = serde_json::from_value(json!({ + "id": "evt_123", + "occurred_at": "2026-05-22T12:34:56Z", + "type": "crate_deleted", + "payload": { + "name": "old-crate" + } + })) + .unwrap(); + + assert_eq!( + event.occurred_at, + DateTime::parse_from_rfc3339("2026-05-22T12:34:56Z") + .unwrap() + .with_timezone(&Utc) + ); + } +} diff --git a/crates/lib/docs_rs_crates_io/src/lib.rs b/crates/lib/docs_rs_crates_io/src/lib.rs new file mode 100644 index 000000000..a9970c28f --- /dev/null +++ b/crates/lib/docs_rs_crates_io/src/lib.rs @@ -0,0 +1 @@ +pub mod events; diff --git a/crates/lib/docs_rs_storage/Cargo.toml b/crates/lib/docs_rs_storage/Cargo.toml index 153de2489..6b0fb85ea 100644 --- a/crates/lib/docs_rs_storage/Cargo.toml +++ b/crates/lib/docs_rs_storage/Cargo.toml @@ -16,11 +16,7 @@ testing = [ anyhow = { workspace = true } async-compression = { version = "0.4.32", features = ["bzip2", "deflate", "gzip", "tokio", "zstd"] } async-stream = { workspace = true } -# The default `rustls` feature pulls in the legacy hyper 0.14 + rustls 0.21 -# stack via `aws-smithy-runtime/tls-rustls`, which includes the vulnerable -# `rustls-webpki` v0.101.x. Using only `default-https-client` avoids this by -# using the modern rustls 0.23 + hyper 1.x stack instead. -aws-config = { version = "1.0.0", default-features = false, features = ["default-https-client", "rt-tokio"] } +aws-config = { workspace = true } aws-sdk-s3 = { version = "1.3.0", default-features = false, features = ["default-https-client", "rt-tokio"] } aws-smithy-types-convert = { version = "0.60.0", features = ["convert-chrono"] } base64 = { workspace = true }