Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/blob-upload-off-do.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@getcirrus/pds": patch
---

Fix blob uploads intermittently desyncing the PDS from the relay.

Uploading a blob (commonly a link-card thumbnail) could occasionally fail and leave the relay no longer tracking the repo, so new posts stopped federating until a manual crawl request. Blob uploads are now reliable and no longer drop the firehose connection.
37 changes: 16 additions & 21 deletions packages/pds/src/account-do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import {
type SeqIdentityEvent,
type CommitData,
} from "./sequencer";
import { BlobStore, type BlobRef } from "./blobs";
import { BlobStore } from "./blobs";
import { jsonToLex } from "@atproto/lex-json";
import type { PDSEnv } from "./types";
import { RecordAlreadyExistsError, type ValidationStatus } from "./validation";
Expand Down Expand Up @@ -1038,28 +1038,23 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
}

/**
* RPC method: Upload a blob to R2
* RPC method: Record an already-stored blob's metadata.
*
* The blob bytes are written to R2 by the stateless Worker, not here.
* This DO is single-threaded and also holds the relay's firehose
* WebSocket; awaiting an R2 put inside it (R2 latency is independent of
* object size — even a small image can stall) pins the input gate, and
* Cloudflare resets the object when a storage op can't complete in time,
* dropping the firehose and desyncing the relay. Only the tiny tracking
* row needs the DO's SQLite.
*/
async rpcUploadBlob(bytes: Uint8Array, mimeType: string): Promise<BlobRef> {
if (!this.blobStore) {
throw new Error("Blob storage not configured");
}

// Enforce size limit (60MB)
const MAX_BLOB_SIZE = 60 * 1024 * 1024;
if (bytes.length > MAX_BLOB_SIZE) {
throw new Error(
`Blob too large: ${bytes.length} bytes (max ${MAX_BLOB_SIZE})`,
);
}

const blobRef = await this.blobStore.putBlob(bytes, mimeType);

// Track the imported blob for migration progress
async rpcTrackBlob(
cid: string,
size: number,
mimeType: string,
): Promise<void> {
const storage = await this.getStorage();
storage.trackImportedBlob(blobRef.ref.$link, bytes.length, mimeType);

return blobRef;
storage.trackImportedBlob(cid, size, mimeType);
}

/**
Expand Down
41 changes: 24 additions & 17 deletions packages/pds/src/xrpc/repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
type ValidationStatus,
} from "../validation.js";
import { detectContentType } from "../format.js";
import { BlobStore } from "../blobs.js";
import { buildScopeChecker, requireScope } from "../middleware/auth.js";

function invalidRecordError(
Expand Down Expand Up @@ -680,24 +681,30 @@ export async function uploadBlob(
);
}

try {
const blobRef = await accountDO.rpcUploadBlob(bytes, contentType);
return c.json({ blob: blobRef });
} catch (err) {
if (
err instanceof Error &&
err.message.includes("Blob storage not configured")
) {
return c.json(
{
error: "ServiceUnavailable",
message: "Blob storage is not configured",
},
503,
);
}
throw err;
if (!c.env.BLOBS) {
return c.json(
{
error: "ServiceUnavailable",
message: "Blob storage is not configured",
},
503,
);
}

// Store the blob from the stateless Worker, not the DO. The DO is
// single-threaded and holds the relay's firehose WebSocket; awaiting an
// R2 put inside it (R2 latency is independent of object size) pins the
// input gate, and Cloudflare resets the object when a storage op times
// out, dropping the firehose and desyncing the relay. The DO only
// records the tracking metadata. This mirrors sync.getBlob.
const blobStore = new BlobStore(c.env.BLOBS, c.env.DID);
const blobRef = await blobStore.putBlob(bytes, contentType);
await accountDO.rpcTrackBlob(
blobRef.ref.$link,
blobRef.size,
blobRef.mimeType,
);
return c.json({ blob: blobRef });
}

export async function importRepo(
Expand Down
Loading