diff --git a/.changeset/blob-upload-off-do.md b/.changeset/blob-upload-off-do.md new file mode 100644 index 00000000..580ad9dd --- /dev/null +++ b/.changeset/blob-upload-off-do.md @@ -0,0 +1,7 @@ +--- +"@getcirrus/pds": patch +--- + +Fix blob uploads intermittently desyncing the PDS from the relay. + +Uploading a blob (commonly a link-card thumbnail) could occasionally fail and leave the relay no longer tracking the repo, so new posts stopped federating until a manual crawl request. Blob uploads are now reliable and no longer drop the firehose connection. diff --git a/packages/pds/src/account-do.ts b/packages/pds/src/account-do.ts index c15d8e73..3bfb5f92 100644 --- a/packages/pds/src/account-do.ts +++ b/packages/pds/src/account-do.ts @@ -28,7 +28,7 @@ import { type SeqIdentityEvent, type CommitData, } from "./sequencer"; -import { BlobStore, type BlobRef } from "./blobs"; +import { BlobStore } from "./blobs"; import { jsonToLex } from "@atproto/lex-json"; import type { PDSEnv } from "./types"; import { RecordAlreadyExistsError, type ValidationStatus } from "./validation"; @@ -1038,28 +1038,23 @@ export class AccountDurableObject extends DurableObject { } /** - * RPC method: Upload a blob to R2 + * RPC method: Record an already-stored blob's metadata. + * + * The blob bytes are written to R2 by the stateless Worker, not here. + * This DO is single-threaded and also holds the relay's firehose + * WebSocket; awaiting an R2 put inside it (R2 latency is independent of + * object size — even a small image can stall) pins the input gate, and + * Cloudflare resets the object when a storage op can't complete in time, + * dropping the firehose and desyncing the relay. Only the tiny tracking + * row needs the DO's SQLite. */ - async rpcUploadBlob(bytes: Uint8Array, mimeType: string): Promise { - if (!this.blobStore) { - throw new Error("Blob storage not configured"); - } - - // Enforce size limit (60MB) - const MAX_BLOB_SIZE = 60 * 1024 * 1024; - if (bytes.length > MAX_BLOB_SIZE) { - throw new Error( - `Blob too large: ${bytes.length} bytes (max ${MAX_BLOB_SIZE})`, - ); - } - - const blobRef = await this.blobStore.putBlob(bytes, mimeType); - - // Track the imported blob for migration progress + async rpcTrackBlob( + cid: string, + size: number, + mimeType: string, + ): Promise { const storage = await this.getStorage(); - storage.trackImportedBlob(blobRef.ref.$link, bytes.length, mimeType); - - return blobRef; + storage.trackImportedBlob(cid, size, mimeType); } /** diff --git a/packages/pds/src/xrpc/repo.ts b/packages/pds/src/xrpc/repo.ts index e20206c6..f1c77479 100644 --- a/packages/pds/src/xrpc/repo.ts +++ b/packages/pds/src/xrpc/repo.ts @@ -10,6 +10,7 @@ import { type ValidationStatus, } from "../validation.js"; import { detectContentType } from "../format.js"; +import { BlobStore } from "../blobs.js"; import { buildScopeChecker, requireScope } from "../middleware/auth.js"; function invalidRecordError( @@ -680,24 +681,30 @@ export async function uploadBlob( ); } - try { - const blobRef = await accountDO.rpcUploadBlob(bytes, contentType); - return c.json({ blob: blobRef }); - } catch (err) { - if ( - err instanceof Error && - err.message.includes("Blob storage not configured") - ) { - return c.json( - { - error: "ServiceUnavailable", - message: "Blob storage is not configured", - }, - 503, - ); - } - throw err; + if (!c.env.BLOBS) { + return c.json( + { + error: "ServiceUnavailable", + message: "Blob storage is not configured", + }, + 503, + ); } + + // Store the blob from the stateless Worker, not the DO. The DO is + // single-threaded and holds the relay's firehose WebSocket; awaiting an + // R2 put inside it (R2 latency is independent of object size) pins the + // input gate, and Cloudflare resets the object when a storage op times + // out, dropping the firehose and desyncing the relay. The DO only + // records the tracking metadata. This mirrors sync.getBlob. + const blobStore = new BlobStore(c.env.BLOBS, c.env.DID); + const blobRef = await blobStore.putBlob(bytes, contentType); + await accountDO.rpcTrackBlob( + blobRef.ref.$link, + blobRef.size, + blobRef.mimeType, + ); + return c.json({ blob: blobRef }); } export async function importRepo(