diff --git a/.changeset/lexicon-validation-parity.md b/.changeset/lexicon-validation-parity.md new file mode 100644 index 00000000..831458d2 --- /dev/null +++ b/.changeset/lexicon-validation-parity.md @@ -0,0 +1,19 @@ +--- +"@getcirrus/pds": minor +--- + +Lexicon validation now matches the reference PDS more closely: + +- `createRecord`, `putRecord`, and `applyWrites` honor the `validate` flag from the request body. `true` requires a known schema, `false` skips schema validation, `undefined` validates known schemas optimistically. +- Responses include `validationStatus` (`"valid"` for known, `"unknown"` for unknown collections; omitted when `validate: false`). Per-write `validationStatus` is returned in `applyWrites` results. +- The record's `$type` is filled in from `collection` when missing and rejected on mismatch. +- Generic record-key shape (`isRecordKey`) is enforced for any provided rkey, regardless of `validate` flag — closes a hole where empty-string and path-traversal-style rkeys could reach the repo. +- Schema-specific record keys are validated against the schema's `keySchema` for known collections (e.g. `app.bsky.feed.post` requires a TID, `app.bsky.actor.profile` requires `self`). +- Legacy `{ cid, mimeType }` blob refs are rejected. +- Bundled schema set broadened to include `com.atproto.lexicon.schema`, `app.bsky.actor.status`, `app.bsky.notification.declaration`, and `chat.bsky.actor.declaration`. +- The Durable Object is now the authoritative rkey allocator: when the client doesn't supply an rkey, the worker validates against a candidate (so restrictive `keySchema`s still reject early) and the DO picks the final rkey against its MST state, with a small retry loop to defeat any worker-isolate clockid collisions. +- Client-supplied rkey collisions return `409 RecordAlreadyExists` instead of a generic 500. +- Intra-batch duplicate rkeys in `applyWrites` return `400 InvalidRequest` (distinguished from the 409 above). +- Missing rkey for `applyWrites#update`/`#delete` returns `400 InvalidRequest`. +- Non-boolean `validate` flag values return `400 InvalidRequest`. +- Non-string `rkey` values (including `null`) return `400 InvalidRequest`. diff --git a/packages/pds/e2e/helpers.ts b/packages/pds/e2e/helpers.ts index c31b0aaf..d5d4e4ff 100644 --- a/packages/pds/e2e/helpers.ts +++ b/packages/pds/e2e/helpers.ts @@ -1,4 +1,5 @@ import { AtpAgent } from "@atproto/api"; +import { now as tidNow } from "@atcute/tid"; export function getPort(): number { return ( @@ -15,10 +16,12 @@ export function createAgent(): AtpAgent { } /** - * Generate a unique rkey for test isolation + * Generate a unique TID-format rkey for test isolation. Most app.bsky.* + * record collections constrain the rkey to TID format, so tests can't use + * arbitrary strings. */ export function uniqueRkey(): string { - return `test-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + return tidNow(); } export const TEST_DID = "did:web:test.local"; diff --git a/packages/pds/src/account-do.ts b/packages/pds/src/account-do.ts index 0559514e..136bfcb4 100644 --- a/packages/pds/src/account-do.ts +++ b/packages/pds/src/account-do.ts @@ -31,6 +31,7 @@ import { import { BlobStore, type BlobRef } from "./blobs"; import { jsonToLex } from "@atproto/lex-json"; import type { PDSEnv } from "./types"; +import { RecordAlreadyExistsError, type ValidationStatus } from "./validation"; /** * Account Durable Object - manages a single user's AT Protocol repository. @@ -322,16 +323,42 @@ export class AccountDurableObject extends DurableObject { collection: string, rkey: string | undefined, record: unknown, + validationStatus?: ValidationStatus, ): Promise<{ uri: string; cid: string; commit: { cid: string; rev: string }; + validationStatus?: ValidationStatus; }> { await this.ensureActive(); const repo = await this.getRepo(); const keypair = await this.getKeypair(); - const actualRkey = rkey || tidNow(); + // Auto-generate rkey here (not in the worker) so the candidate is + // chosen against this DO's authoritative MST state, eliminating the + // 1/1024 collision risk between worker isolates picking the same + // timestamp+clockid in the same ms. For client-supplied rkeys, throw + // a structured RecordAlreadyExistsError if it collides. Use the + // MST's CID lookup (data.get) instead of repo.getRecord to avoid + // fetching and decoding the full record block on every write. + const autoGenerated = rkey === undefined; + let actualRkey = rkey ?? tidNow(); + for (let attempt = 0; attempt < 5; attempt++) { + const existingCid = await repo.data.get(`${collection}/${actualRkey}`); + if (!existingCid) break; + if (!autoGenerated) { + throw new RecordAlreadyExistsError( + `${collection}/${actualRkey}`, + ); + } + if (attempt === 4) { + throw new Error( + `Failed to allocate unique rkey for ${collection} after 5 attempts`, + ); + } + actualRkey = tidNow(); + } + const createOp: RecordCreateOp = { action: WriteOpAction.Create, collection, @@ -394,6 +421,7 @@ export class AccountDurableObject extends DurableObject { cid: this.repo.cid.toString(), rev: this.repo.commit.rev, }, + ...(validationStatus !== undefined ? { validationStatus } : {}), }; } @@ -466,11 +494,12 @@ export class AccountDurableObject extends DurableObject { collection: string, rkey: string, record: unknown, + validationStatus?: ValidationStatus, ): Promise<{ uri: string; cid: string; commit: { cid: string; rev: string }; - validationStatus: string; + validationStatus?: ValidationStatus; }> { await this.ensureActive(); const repo = await this.getRepo(); @@ -548,7 +577,7 @@ export class AccountDurableObject extends DurableObject { cid: this.repo.cid.toString(), rev: this.repo.commit.rev, }, - validationStatus: "valid", + ...(validationStatus !== undefined ? { validationStatus } : {}), }; } @@ -561,6 +590,7 @@ export class AccountDurableObject extends DurableObject { collection: string; rkey?: string; value?: unknown; + validationStatus?: ValidationStatus; }>, ): Promise<{ commit: { cid: string; rev: string }; @@ -568,7 +598,7 @@ export class AccountDurableObject extends DurableObject { $type: string; uri?: string; cid?: string; - validationStatus?: string; + validationStatus?: ValidationStatus; }>; }> { await this.ensureActive(); @@ -581,15 +611,38 @@ export class AccountDurableObject extends DurableObject { $type: string; uri?: string; cid?: string; - validationStatus?: string; + validationStatus?: ValidationStatus; collection: string; rkey: string; action: WriteOpAction; }> = []; + // Track rkeys this batch will write so two auto-generated creates in + // the same batch don't pick the same rkey. + const reservedRkeys = new Set(); + for (const write of writes) { if (write.$type === "com.atproto.repo.applyWrites#create") { - const rkey = write.rkey || tidNow(); + const autoGenerated = write.rkey === undefined; + let rkey = write.rkey ?? tidNow(); + for (let attempt = 0; attempt < 5; attempt++) { + const composite = `${write.collection}/${rkey}`; + const collidesInBatch = reservedRkeys.has(composite); + const collidesInRepo = + !collidesInBatch && + (await repo.data.get(composite)) !== null; + if (!collidesInBatch && !collidesInRepo) break; + if (!autoGenerated) { + throw new RecordAlreadyExistsError(composite); + } + if (attempt === 4) { + throw new Error( + `Failed to allocate unique rkey for ${write.collection} after 5 attempts`, + ); + } + rkey = tidNow(); + } + reservedRkeys.add(`${write.collection}/${rkey}`); const op: RecordCreateOp = { action: WriteOpAction.Create, collection: write.collection, @@ -602,6 +655,7 @@ export class AccountDurableObject extends DurableObject { collection: write.collection, rkey, action: WriteOpAction.Create, + validationStatus: write.validationStatus, }); } else if (write.$type === "com.atproto.repo.applyWrites#update") { if (!write.rkey) { @@ -619,6 +673,7 @@ export class AccountDurableObject extends DurableObject { collection: write.collection, rkey: write.rkey, action: WriteOpAction.Update, + validationStatus: write.validationStatus, }); } else if (write.$type === "com.atproto.repo.applyWrites#delete") { if (!write.rkey) { @@ -657,7 +712,7 @@ export class AccountDurableObject extends DurableObject { $type: string; uri?: string; cid?: string; - validationStatus?: string; + validationStatus?: ValidationStatus; }> = []; const opsWithCids: Array = []; @@ -678,7 +733,9 @@ export class AccountDurableObject extends DurableObject { $type: result.$type, uri: `at://${this.repo.did}/${result.collection}/${result.rkey}`, cid: recordCid?.toString(), - validationStatus: "valid", + ...(result.validationStatus !== undefined + ? { validationStatus: result.validationStatus } + : {}), }); // Include the record CID in the op for the firehose opsWithCids.push({ ...op, cid: recordCid }); diff --git a/packages/pds/src/validation.ts b/packages/pds/src/validation.ts index a97911f8..90b66810 100644 --- a/packages/pds/src/validation.ts +++ b/packages/pds/src/validation.ts @@ -1,12 +1,20 @@ import { parse, + safeParse, ValidationError, - type BaseSchema, } from "@atcute/lexicons/validations"; +import type { + BaseSchema, + RecordSchema, + ObjectSchema, + RecordKeySchema, +} from "@atcute/lexicons/validations"; +import { isRecordKey } from "@atcute/lexicons/syntax"; +import { isLegacyBlobRef } from "@atproto/lex-data"; -// Import record schemas from @atcute/bluesky import { AppBskyActorProfile, + AppBskyActorStatus, AppBskyFeedGenerator, AppBskyFeedLike, AppBskyFeedPost, @@ -21,97 +29,191 @@ import { AppBskyGraphStarterpack, AppBskyGraphVerification, AppBskyLabelerService, + AppBskyNotificationDeclaration, + ChatBskyActorDeclaration, } from "@atcute/bluesky"; +import { ComAtprotoLexiconSchema } from "@atcute/atproto"; -/** - * Map of collection NSID to validation schema. - * Only includes record types that can be created in repositories. - */ -const recordSchemas: Record = { - "app.bsky.actor.profile": AppBskyActorProfile.mainSchema, - "app.bsky.feed.generator": AppBskyFeedGenerator.mainSchema, - "app.bsky.feed.like": AppBskyFeedLike.mainSchema, - "app.bsky.feed.post": AppBskyFeedPost.mainSchema, - "app.bsky.feed.postgate": AppBskyFeedPostgate.mainSchema, - "app.bsky.feed.repost": AppBskyFeedRepost.mainSchema, - "app.bsky.feed.threadgate": AppBskyFeedThreadgate.mainSchema, - "app.bsky.graph.block": AppBskyGraphBlock.mainSchema, - "app.bsky.graph.follow": AppBskyGraphFollow.mainSchema, - "app.bsky.graph.list": AppBskyGraphList.mainSchema, - "app.bsky.graph.listblock": AppBskyGraphListblock.mainSchema, - "app.bsky.graph.listitem": AppBskyGraphListitem.mainSchema, - "app.bsky.graph.starterpack": AppBskyGraphStarterpack.mainSchema, - "app.bsky.graph.verification": AppBskyGraphVerification.mainSchema, - "app.bsky.labeler.service": AppBskyLabelerService.mainSchema, +type AnyRecordSchema = RecordSchema; + +const recordSchemas: Record = { + "app.bsky.actor.profile": AppBskyActorProfile.mainSchema as AnyRecordSchema, + "app.bsky.actor.status": AppBskyActorStatus.mainSchema as AnyRecordSchema, + "app.bsky.feed.generator": AppBskyFeedGenerator.mainSchema as AnyRecordSchema, + "app.bsky.feed.like": AppBskyFeedLike.mainSchema as AnyRecordSchema, + "app.bsky.feed.post": AppBskyFeedPost.mainSchema as AnyRecordSchema, + "app.bsky.feed.postgate": AppBskyFeedPostgate.mainSchema as AnyRecordSchema, + "app.bsky.feed.repost": AppBskyFeedRepost.mainSchema as AnyRecordSchema, + "app.bsky.feed.threadgate": + AppBskyFeedThreadgate.mainSchema as AnyRecordSchema, + "app.bsky.graph.block": AppBskyGraphBlock.mainSchema as AnyRecordSchema, + "app.bsky.graph.follow": AppBskyGraphFollow.mainSchema as AnyRecordSchema, + "app.bsky.graph.list": AppBskyGraphList.mainSchema as AnyRecordSchema, + "app.bsky.graph.listblock": + AppBskyGraphListblock.mainSchema as AnyRecordSchema, + "app.bsky.graph.listitem": + AppBskyGraphListitem.mainSchema as AnyRecordSchema, + "app.bsky.graph.starterpack": + AppBskyGraphStarterpack.mainSchema as AnyRecordSchema, + "app.bsky.graph.verification": + AppBskyGraphVerification.mainSchema as AnyRecordSchema, + "app.bsky.labeler.service": + AppBskyLabelerService.mainSchema as AnyRecordSchema, + "app.bsky.notification.declaration": + AppBskyNotificationDeclaration.mainSchema as AnyRecordSchema, + "chat.bsky.actor.declaration": + ChatBskyActorDeclaration.mainSchema as AnyRecordSchema, + "com.atproto.lexicon.schema": + ComAtprotoLexiconSchema.mainSchema as AnyRecordSchema, }; +export type ValidationStatus = "valid" | "unknown"; + +export class InvalidRecordError extends Error { + override readonly name = "InvalidRecordError"; +} + /** - * Record validator for AT Protocol records. - * - * Validates records against official Bluesky lexicon schemas from @atcute/bluesky. - * Uses optimistic validation strategy: - * - If a schema is loaded for the collection, validate the record - * - If no schema is loaded, allow the record (fail-open) - * - * This allows the PDS to accept records for new or unknown collection types - * while still validating known types. + * Thrown when a write targets an rkey that already exists in the repo. + * Uses a `name` field rather than relying on class identity, so the error + * survives a Cloudflare DO RPC boundary (RPC preserves message + name + stack + * but not the prototype chain). */ -export class RecordValidator { - private strictMode: boolean; +export class RecordAlreadyExistsError extends Error { + override readonly name = "RecordAlreadyExistsError"; +} - constructor(options: { strict?: boolean } = {}) { - this.strictMode = options.strict ?? false; - } +export function isRecordAlreadyExistsError(err: unknown): err is Error { + if (!(err instanceof Error)) return false; + // Cloudflare DO RPC reconstructs errors as plain Error and folds the + // original `${name}: ${message}` into the wrapper's `.message`. So check + // both the preserved name (DO-local throws) and the message prefix + // (errors that crossed the RPC boundary). + return ( + err.name === "RecordAlreadyExistsError" || + err.message.startsWith("RecordAlreadyExistsError:") + ); +} +export type ValidateOptions = { + collection: string; + record: unknown; + rkey?: string; /** - * Validate a record against its lexicon schema. - * - * @param collection - The NSID of the record type (e.g., "app.bsky.feed.post") - * @param record - The record object to validate - * @throws {Error} If validation fails and schema is loaded + * `true` requires a known schema and rejects unknown collections. + * `false` skips schema validation but still reconciles `$type` and + * rejects legacy blob refs. + * `undefined` (default) validates known schemas optimistically and + * accepts unknown collections with status `"unknown"`. */ - validateRecord(collection: string, record: unknown): void { - const schema = recordSchemas[collection]; + validate?: boolean; +}; + +export type ValidationResult = { + record: Record; + status?: ValidationStatus; +}; + +export class RecordValidator { + validate(opts: ValidateOptions): ValidationResult { + const reconciled = reconcileType(opts.record, opts.collection); + rejectLegacyBlobRefs(reconciled); + // Generic rkey shape is a repo-structural concern, not a lex schema + // concern: it must hold even when the client passes `validate: false`, + // otherwise auto-generated profiles could land at an arbitrary TID + // rather than the literal "self" the schema would require. + if (opts.rkey !== undefined && !isRecordKey(opts.rkey)) { + throw new InvalidRecordError(`Invalid record key: ${opts.rkey}`); + } + + if (opts.validate === false) { + return { record: reconciled }; + } + + const schema = recordSchemas[opts.collection]; if (!schema) { - // Optimistic validation: if we don't have the schema, allow it - if (this.strictMode) { - throw new Error( - `No lexicon schema loaded for collection: ${collection}. Enable optimistic validation or add the schema.`, + if (opts.validate === true) { + throw new InvalidRecordError( + `Unknown lexicon type: ${opts.collection}`, + ); + } + return { record: reconciled, status: "unknown" }; + } + + if (opts.rkey !== undefined) { + const keyResult = safeParse(schema.key as BaseSchema, opts.rkey); + if (!keyResult.ok) { + throw new InvalidRecordError( + `Invalid record key for ${opts.collection}: ${keyResult.message}`, ); } - return; } try { - parse(schema, record); - } catch (error) { - if (error instanceof ValidationError) { - throw new Error( - `Lexicon validation failed for ${collection}: ${error.message}`, + parse(schema as BaseSchema, reconciled); + } catch (err) { + if (err instanceof ValidationError) { + throw new InvalidRecordError( + `Invalid ${opts.collection} record: ${err.message}`, ); } - throw error; + throw err; } + + return { record: reconciled, status: "valid" }; } - /** - * Check if a schema is loaded for a collection. - */ hasSchema(collection: string): boolean { return collection in recordSchemas; } - /** - * Get list of all loaded schema NSIDs. - */ getLoadedSchemas(): string[] { return Object.keys(recordSchemas); } } -/** - * Shared validator instance (singleton pattern). - * Uses optimistic validation by default (strict: false). - */ -export const validator = new RecordValidator({ strict: false }); +export const validator = new RecordValidator(); + +function reconcileType( + record: unknown, + collection: string, +): Record { + if (record === null || typeof record !== "object" || Array.isArray(record)) { + throw new InvalidRecordError("Record must be an object"); + } + const obj = record as Record; + const declared = obj.$type; + if (declared === undefined) { + return { ...obj, $type: collection }; + } + if (declared !== collection) { + throw new InvalidRecordError( + `Invalid $type: expected ${collection}, got ${String(declared)}`, + ); + } + return obj; +} + +function rejectLegacyBlobRefs(value: unknown): void { + const stack: unknown[] = [value]; + const visited = new WeakSet(); + while (stack.length > 0) { + const current = stack.pop(); + if (current === null || typeof current !== "object") continue; + if (visited.has(current)) continue; + visited.add(current); + if (Array.isArray(current)) { + for (const item of current) stack.push(item); + continue; + } + if (isLegacyBlobRef(current)) { + throw new InvalidRecordError( + `Legacy blobs are not allowed (${(current as { cid: string }).cid})`, + ); + } + for (const v of Object.values(current as Record)) { + if (v !== null && typeof v === "object") stack.push(v); + } + } +} diff --git a/packages/pds/src/xrpc/repo.ts b/packages/pds/src/xrpc/repo.ts index 426654ff..e20206c6 100644 --- a/packages/pds/src/xrpc/repo.ts +++ b/packages/pds/src/xrpc/repo.ts @@ -1,21 +1,47 @@ import type { Context } from "hono"; import { isDid } from "@atcute/lexicons/syntax"; +import { now as tidNow } from "@atcute/tid"; import { AccountDurableObject } from "../account-do.js"; import type { AppEnv, AuthedAppEnv } from "../types.js"; -import { validator } from "../validation.js"; +import { + InvalidRecordError, + isRecordAlreadyExistsError, + validator, + type ValidationStatus, +} from "../validation.js"; import { detectContentType } from "../format.js"; import { buildScopeChecker, requireScope } from "../middleware/auth.js"; function invalidRecordError( c: Context, - err: unknown, + err: InvalidRecordError, prefix?: string, ): Response { - const message = err instanceof Error ? err.message : String(err); return c.json( { error: "InvalidRecord", - message: prefix ? `${prefix}: ${message}` : message, + message: prefix ? `${prefix}: ${err.message}` : err.message, + }, + 400, + ); +} + +/** + * Validate the optional `validate` field from a write request body. + * Lexicon defines it as `boolean?`; reject anything else so clients can't + * silently fall through to optimistic mode by sending `"true"` or `1`. + */ +function checkValidateFlag( + c: Context, + value: unknown, +): boolean | undefined | Response { + if (value === undefined || typeof value === "boolean") { + return value; + } + return c.json( + { + error: "InvalidRequest", + message: "Parameter 'validate' must be a boolean if provided", }, 400, ); @@ -45,6 +71,27 @@ function checkAccountDeactivatedError( return null; } +function checkRecordAlreadyExistsError( + c: Context, + err: unknown, +): Response | null { + if (!isRecordAlreadyExistsError(err)) return null; + // Strip the "RecordAlreadyExistsError: " prefix that Cloudflare DO RPC + // folds into the wrapper Error's `.message` so the client only sees the + // rkey identifier, not the internal class name. + const prefix = "RecordAlreadyExistsError:"; + const message = err.message.startsWith(prefix) + ? err.message.slice(prefix.length).trimStart() + : err.message; + return c.json( + { + error: "RecordAlreadyExists", + message, + }, + 409, + ); +} + export async function describeRepo( c: Context, accountDO: DurableObjectStub, @@ -212,7 +259,7 @@ export async function createRecord( accountDO: DurableObjectStub, ): Promise { const body = await c.req.json(); - const { repo, collection, rkey, record } = body; + const { repo, collection, rkey, record, validate } = body; if (!repo || !collection || !record) { return c.json( @@ -239,19 +286,51 @@ export async function createRecord( ); if (scopeError) return scopeError; - // Validate record against lexicon schema + const validateChecked = checkValidateFlag(c, validate); + if (validateChecked instanceof Response) return validateChecked; + + // Lexicon types rkey as `string?` (omitted = auto-generate). Reject + // anything else, including `null`, so the contract matches across all + // three write handlers. + if (rkey !== undefined && typeof rkey !== "string") { + return c.json( + { error: "InvalidRequest", message: "rkey must be a string" }, + 400, + ); + } + + // Validate against a candidate TID so schemas with restrictive keySchemas + // (e.g. literal('self')) still reject unrkeyed requests. The DO picks the + // final rkey when the client didn't supply one, against authoritative MST + // state, to avoid worker-isolate clockid collisions. + const candidateRkey = rkey ?? tidNow(); + + let validated; try { - validator.validateRecord(collection, record); + validated = validator.validate({ + collection, + record, + rkey: candidateRkey, + validate: validateChecked, + }); } catch (err) { - return invalidRecordError(c, err); + if (err instanceof InvalidRecordError) return invalidRecordError(c, err); + throw err; } try { - const result = await accountDO.rpcCreateRecord(collection, rkey, record); + const result = await accountDO.rpcCreateRecord( + collection, + rkey, + validated.record, + validated.status, + ); return c.json(result); } catch (err) { const deactivatedError = checkAccountDeactivatedError(c, err); if (deactivatedError) return deactivatedError; + const conflictError = checkRecordAlreadyExistsError(c, err); + if (conflictError) return conflictError; throw err; } @@ -316,7 +395,7 @@ export async function putRecord( accountDO: DurableObjectStub, ): Promise { const body = await c.req.json(); - const { repo, collection, rkey, record } = body; + const { repo, collection, rkey, record, validate } = body; if (!repo || !collection || !rkey || !record) { return c.json( @@ -346,27 +425,36 @@ export async function putRecord( ); if (scopeError) return scopeError; - // Validate record against lexicon schema + const validateChecked = checkValidateFlag(c, validate); + if (validateChecked instanceof Response) return validateChecked; + + let validated; try { - validator.validateRecord(collection, record); + validated = validator.validate({ + collection, + record, + rkey, + validate: validateChecked, + }); } catch (err) { - return invalidRecordError(c, err); + if (err instanceof InvalidRecordError) return invalidRecordError(c, err); + throw err; } try { - const result = await accountDO.rpcPutRecord(collection, rkey, record); + const result = await accountDO.rpcPutRecord( + collection, + rkey, + validated.record, + validated.status, + ); return c.json(result); } catch (err) { const deactivatedError = checkAccountDeactivatedError(c, err); if (deactivatedError) return deactivatedError; - - return c.json( - { - error: "InvalidRequest", - message: err instanceof Error ? err.message : String(err), - }, - 400, - ); + const conflictError = checkRecordAlreadyExistsError(c, err); + if (conflictError) return conflictError; + throw err; } } @@ -375,7 +463,7 @@ export async function applyWrites( accountDO: DurableObjectStub, ): Promise { const body = await c.req.json(); - const { repo, writes } = body; + const { repo, writes, validate } = body; if (!repo || !writes || !Array.isArray(writes)) { return c.json( @@ -407,10 +495,28 @@ export async function applyWrites( ); } + const validateChecked = checkValidateFlag(c, validate); + if (validateChecked instanceof Response) return validateChecked; + // Build the scope checker once outside the loop — for a 200-write batch // this avoids re-parsing the token's scope string on every iteration. const checkScope = buildScopeChecker(c); + // Validate every write up-front so the DO either applies the whole batch + // or none of it. The reconciled record (with $type filled in) replaces + // the client-supplied value before being sent to the DO. + const preparedWrites: Array<{ + $type: string; + collection: string; + rkey?: string; + value?: unknown; + validationStatus?: ValidationStatus; + }> = []; + // Detect intra-batch rkey duplicates here (client error → 400) so they + // don't surface from the DO as 409 RecordAlreadyExists, which is reserved + // for collisions against existing repo state. + const seenRkeys = new Set(); + for (let i = 0; i < writes.length; i++) { const write = writes[i]; const action: "create" | "update" | "delete" | null = @@ -438,29 +544,87 @@ export async function applyWrites( if (scopeError) return scopeError; } - if (action !== "delete") { - try { - validator.validateRecord(write.collection, write.value); - } catch (err) { + if (action === "delete" || action === "update") { + if (typeof write.rkey !== "string" || write.rkey.length === 0) { + return c.json( + { + error: "InvalidRequest", + message: `Write ${i}: ${action} requires rkey`, + }, + 400, + ); + } + } else if (write.rkey !== undefined) { + if (typeof write.rkey !== "string" || write.rkey.length === 0) { + return c.json( + { + error: "InvalidRequest", + message: `Write ${i}: rkey must be a non-empty string`, + }, + 400, + ); + } + } + + if (typeof write.rkey === "string") { + const composite = `${write.collection}/${write.rkey}`; + if (seenRkeys.has(composite)) { + return c.json( + { + error: "InvalidRequest", + message: `Write ${i}: duplicate rkey in batch (${composite})`, + }, + 400, + ); + } + seenRkeys.add(composite); + } + + if (action === "delete") { + preparedWrites.push({ + $type: write.$type, + collection: write.collection, + rkey: write.rkey, + }); + continue; + } + + // Worker validates against a candidate TID so restrictive keySchemas + // reject unrkeyed creates here. DO picks the final rkey when none + // was supplied (see rpcApplyWrites collision-retry logic). + const candidateRkey = write.rkey ?? tidNow(); + + try { + const validated = validator.validate({ + collection: write.collection, + record: write.value, + rkey: candidateRkey, + validate: validateChecked, + }); + preparedWrites.push({ + $type: write.$type, + collection: write.collection, + rkey: write.rkey, + value: validated.record, + validationStatus: validated.status, + }); + } catch (err) { + if (err instanceof InvalidRecordError) { return invalidRecordError(c, err, `Write ${i}`); } + throw err; } } try { - const result = await accountDO.rpcApplyWrites(writes); + const result = await accountDO.rpcApplyWrites(preparedWrites); return c.json(result); } catch (err) { const deactivatedError = checkAccountDeactivatedError(c, err); if (deactivatedError) return deactivatedError; - - return c.json( - { - error: "InvalidRequest", - message: err instanceof Error ? err.message : String(err), - }, - 400, - ); + const conflictError = checkRecordAlreadyExistsError(c, err); + if (conflictError) return conflictError; + throw err; } } diff --git a/packages/pds/test/blob-ref-normalization.test.ts b/packages/pds/test/blob-ref-normalization.test.ts index d0248c24..cd040f47 100644 --- a/packages/pds/test/blob-ref-normalization.test.ts +++ b/packages/pds/test/blob-ref-normalization.test.ts @@ -1,4 +1,5 @@ import { describe, it, expect } from "vitest"; +import { now as genTid } from "@atcute/tid"; import { env, worker, runInDurableObject } from "./helpers"; import { asCid, isBlobRef } from "@atproto/lex-data"; import type { AccountDurableObject } from "../src/account-do"; @@ -31,6 +32,7 @@ describe("Blob Reference Normalization", () => { }; const blobCid = uploadData.blob.ref.$link; + const rkey = genTid(); // Step 2: Create a record with the blob ref in JSON wire format // This is exactly what clients send — { "$link": "bafk..." } objects, // not actual CID instances @@ -44,7 +46,7 @@ describe("Blob Reference Normalization", () => { body: JSON.stringify({ repo: env.DID, collection: "app.bsky.feed.post", - rkey: "blob-ref-test", + rkey, record: { $type: "app.bsky.feed.post", text: "Test post with image", @@ -73,7 +75,7 @@ describe("Blob Reference Normalization", () => { // Step 3: Read it back via the API — should round-trip correctly const getResponse = await worker.fetch( new Request( - `http://pds.test/xrpc/com.atproto.repo.getRecord?repo=${env.DID}&collection=app.bsky.feed.post&rkey=blob-ref-test`, + `http://pds.test/xrpc/com.atproto.repo.getRecord?repo=${env.DID}&collection=app.bsky.feed.post&rkey=${rkey}`, ), env, ); @@ -108,7 +110,7 @@ describe("Blob Reference Normalization", () => { const repo = await instance.getRepo(); const rawRecord = (await repo.getRecord( "app.bsky.feed.post", - "blob-ref-test", + rkey, )) as any; expect(rawRecord).toBeDefined(); @@ -147,6 +149,7 @@ describe("Blob Reference Normalization", () => { const blobCid = uploadData.blob.ref.$link; // Use putRecord (upsert) with a blob ref + const rkey = genTid(); const putResponse = await worker.fetch( new Request("http://pds.test/xrpc/com.atproto.repo.putRecord", { method: "POST", @@ -157,7 +160,7 @@ describe("Blob Reference Normalization", () => { body: JSON.stringify({ repo: env.DID, collection: "app.bsky.feed.post", - rkey: "blob-ref-put-test", + rkey, record: { $type: "app.bsky.feed.post", text: "Put record with image", @@ -191,7 +194,7 @@ describe("Blob Reference Normalization", () => { const repo = await instance.getRepo(); const rawRecord = (await repo.getRecord( "app.bsky.feed.post", - "blob-ref-put-test", + rkey, )) as any; const rawImage = rawRecord.embed.images[0].image; @@ -219,6 +222,7 @@ describe("Blob Reference Normalization", () => { }; const blobCid = uploadData.blob.ref.$link; + const rkey = genTid(); // Use applyWrites with a blob ref const applyResponse = await worker.fetch( new Request("http://pds.test/xrpc/com.atproto.repo.applyWrites", { @@ -233,7 +237,7 @@ describe("Blob Reference Normalization", () => { { $type: "com.atproto.repo.applyWrites#create", collection: "app.bsky.feed.post", - rkey: "blob-ref-batch-test", + rkey, value: { $type: "app.bsky.feed.post", text: "Batch write with image", @@ -269,7 +273,7 @@ describe("Blob Reference Normalization", () => { const repo = await instance.getRepo(); const rawRecord = (await repo.getRecord( "app.bsky.feed.post", - "blob-ref-batch-test", + rkey, )) as any; const rawImage = rawRecord.embed.images[0].image; diff --git a/packages/pds/test/bluesky-validation.test.ts b/packages/pds/test/bluesky-validation.test.ts index e2017367..546cd6bc 100644 --- a/packages/pds/test/bluesky-validation.test.ts +++ b/packages/pds/test/bluesky-validation.test.ts @@ -17,121 +17,142 @@ describe("Bluesky Schema Validation", () => { describe("app.bsky.feed.post", () => { it("validates valid posts", () => { - expect(() => { - validator.validateRecord("app.bsky.feed.post", { + const result = validator.validate({ + collection: "app.bsky.feed.post", + record: { $type: "app.bsky.feed.post", text: "Hello, Bluesky!", createdAt: new Date().toISOString(), - }); - }).not.toThrow(); + }, + }); + expect(result.status).toBe("valid"); }); it("rejects posts with missing required fields", () => { expect(() => { - validator.validateRecord("app.bsky.feed.post", { - $type: "app.bsky.feed.post", - text: "Hello", - // Missing createdAt + validator.validate({ + collection: "app.bsky.feed.post", + record: { + $type: "app.bsky.feed.post", + text: "Hello", + }, }); - }).toThrow(/validation failed/i); + }).toThrow(/invalid app\.bsky\.feed\.post record/i); }); it("rejects posts with text exceeding maxLength", () => { - const longText = "x".repeat(3001); // maxLength is 3000 + const longText = "x".repeat(3001); expect(() => { - validator.validateRecord("app.bsky.feed.post", { - $type: "app.bsky.feed.post", - text: longText, - createdAt: new Date().toISOString(), + validator.validate({ + collection: "app.bsky.feed.post", + record: { + $type: "app.bsky.feed.post", + text: longText, + createdAt: new Date().toISOString(), + }, }); - }).toThrow(/validation failed/i); + }).toThrow(/invalid app\.bsky\.feed\.post record/i); }); it("allows posts with optional fields", () => { - expect(() => { - validator.validateRecord("app.bsky.feed.post", { + const result = validator.validate({ + collection: "app.bsky.feed.post", + record: { $type: "app.bsky.feed.post", text: "Post with langs", createdAt: new Date().toISOString(), langs: ["en"], - }); - }).not.toThrow(); + }, + }); + expect(result.status).toBe("valid"); }); }); describe("app.bsky.actor.profile", () => { it("validates valid profiles", () => { - expect(() => { - validator.validateRecord("app.bsky.actor.profile", { + const result = validator.validate({ + collection: "app.bsky.actor.profile", + record: { $type: "app.bsky.actor.profile", displayName: "Alice", description: "A test user", - }); - }).not.toThrow(); + }, + rkey: "self", + }); + expect(result.status).toBe("valid"); }); it("allows empty profiles", () => { - expect(() => { - validator.validateRecord("app.bsky.actor.profile", { - $type: "app.bsky.actor.profile", - }); - }).not.toThrow(); + const result = validator.validate({ + collection: "app.bsky.actor.profile", + record: { $type: "app.bsky.actor.profile" }, + rkey: "self", + }); + expect(result.status).toBe("valid"); }); }); describe("app.bsky.feed.like", () => { it("rejects likes without required fields", () => { expect(() => { - validator.validateRecord("app.bsky.feed.like", { - $type: "app.bsky.feed.like", - createdAt: new Date().toISOString(), - // Missing subject + validator.validate({ + collection: "app.bsky.feed.like", + record: { + $type: "app.bsky.feed.like", + createdAt: new Date().toISOString(), + }, }); - }).toThrow(/validation failed/i); + }).toThrow(/invalid app\.bsky\.feed\.like record/i); expect(() => { - validator.validateRecord("app.bsky.feed.like", { - $type: "app.bsky.feed.like", - subject: { - uri: "at://did:plc:abc123/app.bsky.feed.post/xyz", - cid: "invalid-cid-format", + validator.validate({ + collection: "app.bsky.feed.like", + record: { + $type: "app.bsky.feed.like", + subject: { + uri: "at://did:plc:abc123/app.bsky.feed.post/xyz", + cid: "invalid-cid-format", + }, }, - // Missing createdAt }); - }).toThrow(/validation failed/i); + }).toThrow(/invalid app\.bsky\.feed\.like record/i); }); }); describe("app.bsky.graph.follow", () => { it("validates valid follows", () => { - expect(() => { - validator.validateRecord("app.bsky.graph.follow", { + const result = validator.validate({ + collection: "app.bsky.graph.follow", + record: { $type: "app.bsky.graph.follow", subject: "did:plc:abc123", createdAt: new Date().toISOString(), - }); - }).not.toThrow(); + }, + }); + expect(result.status).toBe("valid"); }); it("rejects follows without subject", () => { expect(() => { - validator.validateRecord("app.bsky.graph.follow", { - $type: "app.bsky.graph.follow", - createdAt: new Date().toISOString(), - // Missing subject DID + validator.validate({ + collection: "app.bsky.graph.follow", + record: { + $type: "app.bsky.graph.follow", + createdAt: new Date().toISOString(), + }, }); - }).toThrow(/validation failed/i); + }).toThrow(/invalid app\.bsky\.graph\.follow record/i); }); }); describe("unknown schemas (optimistic validation)", () => { - it("allows records for unknown schemas", () => { - expect(() => { - validator.validateRecord("com.example.custom", { - customField: "value", - }); - }).not.toThrow(); + it("allows records for unknown schemas with status 'unknown'", () => { + const result = validator.validate({ + collection: "com.example.custom", + record: { customField: "value" }, + }); + expect(result.status).toBe("unknown"); }); }); }); diff --git a/packages/pds/test/migration.test.ts b/packages/pds/test/migration.test.ts index d4c7275c..7d5ecf9e 100644 --- a/packages/pds/test/migration.test.ts +++ b/packages/pds/test/migration.test.ts @@ -1,4 +1,5 @@ import { describe, it, expect, afterEach } from "vitest"; +import { now as genTid } from "@atcute/tid"; import { env, worker } from "./helpers"; describe("Account Migration", () => { @@ -164,7 +165,7 @@ describe("Account Migration", () => { body: JSON.stringify({ repo: env.DID, collection: "app.bsky.feed.post", - rkey: "test-import-1", + rkey: genTid(), record: { $type: "app.bsky.feed.post", text: "Test post for import", @@ -410,7 +411,7 @@ describe("Account Migration", () => { }); it("returns CAR file for existing record", async () => { - // First create a record + const rkey = genTid(); const createResponse = await worker.fetch( new Request(`http://pds.test/xrpc/com.atproto.repo.createRecord`, { method: "POST", @@ -421,7 +422,7 @@ describe("Account Migration", () => { body: JSON.stringify({ repo: env.DID, collection: "app.bsky.feed.post", - rkey: "sync-test-record", + rkey, record: { $type: "app.bsky.feed.post", text: "Test post for sync.getRecord", @@ -436,7 +437,7 @@ describe("Account Migration", () => { // Now get the record proof const response = await worker.fetch( new Request( - `http://pds.test/xrpc/com.atproto.sync.getRecord?did=${env.DID}&collection=app.bsky.feed.post&rkey=sync-test-record`, + `http://pds.test/xrpc/com.atproto.sync.getRecord?did=${env.DID}&collection=app.bsky.feed.post&rkey=${rkey}`, ), env, ); @@ -594,7 +595,7 @@ describe("Account Migration", () => { body: JSON.stringify({ repo: env.DID, collection: "app.bsky.feed.post", - rkey: "migration-test", + rkey: genTid(), record: { $type: "app.bsky.feed.post", text: "Post to be migrated", diff --git a/packages/pds/test/validation.test.ts b/packages/pds/test/validation.test.ts index 3beec5c5..7c6fd764 100644 --- a/packages/pds/test/validation.test.ts +++ b/packages/pds/test/validation.test.ts @@ -1,129 +1,221 @@ import { describe, it, expect } from "vitest"; -import { RecordValidator, validator } from "../src/validation"; +import { + InvalidRecordError, + RecordValidator, + validator, +} from "../src/validation"; + +const post = (overrides: Record = {}) => ({ + $type: "app.bsky.feed.post", + text: "Hello world", + createdAt: new Date().toISOString(), + ...overrides, +}); describe("RecordValidator", () => { - describe("optimistic validation (default)", () => { - it("allows records for unknown schemas", () => { - const v = new RecordValidator({ strict: false }); - // Should not throw for unknown collection + describe("default (optimistic) mode", () => { + const v = new RecordValidator(); + + it("accepts records for unknown collections with status 'unknown'", () => { + const result = v.validate({ + collection: "com.example.unknown", + record: { text: "test" }, + }); + expect(result.status).toBe("unknown"); + }); + + it("validates known collections and reports 'valid'", () => { + const result = v.validate({ + collection: "app.bsky.feed.post", + record: post(), + }); + expect(result.status).toBe("valid"); + }); + + it("throws InvalidRecordError when a known record is invalid", () => { expect(() => { - v.validateRecord("com.example.unknown", { - text: "test", - createdAt: new Date().toISOString(), + v.validate({ + collection: "app.bsky.feed.post", + record: { $type: "app.bsky.feed.post", text: "no createdAt" }, }); - }).not.toThrow(); + }).toThrow(InvalidRecordError); }); + }); - it("validates records when schema is loaded", () => { - const v = new RecordValidator({ strict: false }); - // Valid post should pass + describe("validate flag", () => { + const v = new RecordValidator(); + + it("validate=true rejects unknown collections", () => { expect(() => { - v.validateRecord("app.bsky.feed.post", { - $type: "app.bsky.feed.post", - text: "Hello world", - createdAt: new Date().toISOString(), + v.validate({ + collection: "com.example.unknown", + record: { text: "test" }, + validate: true, }); - }).not.toThrow(); + }).toThrow(/unknown lexicon type/i); + }); - // Invalid post (missing createdAt) should fail + it("validate=false skips schema validation and round-trips the record", () => { + const record = { + $type: "app.bsky.feed.post", + text: "no createdAt", + }; + const result = v.validate({ + collection: "app.bsky.feed.post", + record, + validate: false, + }); + expect(result.status).toBeUndefined(); + // Confirm a record that *would* fail schema validation passes through + // when validate=false (the schema requires createdAt). + expect(result.record).toMatchObject({ + $type: "app.bsky.feed.post", + text: "no createdAt", + }); + expect(result.record.createdAt).toBeUndefined(); + }); + + it("validate=false still rejects legacy blob refs", () => { expect(() => { - v.validateRecord("app.bsky.feed.post", { - $type: "app.bsky.feed.post", - text: "Hello world", + v.validate({ + collection: "app.bsky.feed.post", + record: post({ + embed: { cid: "bafyreib2rxk3rybk3aobmv5cjuql3bm2twh4jo5uxgf5kkkqg5jkvqg5va", mimeType: "image/png" }, + }), + validate: false, }); - }).toThrow(/validation failed/i); + }).toThrow(/legacy blobs/i); }); - it("checks if schema is loaded", () => { - const v = new RecordValidator(); - expect(v.hasSchema("app.bsky.feed.post")).toBe(true); - expect(v.hasSchema("app.bsky.actor.profile")).toBe(true); - expect(v.hasSchema("com.example.unknown")).toBe(false); + it("validate=false still reconciles $type with collection", () => { + const { record } = v.validate({ + collection: "app.bsky.feed.post", + record: { text: "no type", createdAt: new Date().toISOString() }, + validate: false, + }); + expect(record.$type).toBe("app.bsky.feed.post"); }); + }); + + describe("$type reconciliation", () => { + const v = new RecordValidator(); - it("lists loaded schemas", () => { - const v = new RecordValidator(); - const schemas = v.getLoadedSchemas(); - expect(schemas).toContain("app.bsky.feed.post"); - expect(schemas).toContain("app.bsky.actor.profile"); - expect(schemas).toContain("app.bsky.graph.follow"); - expect(schemas.length).toBeGreaterThanOrEqual(10); + it("fills in missing $type from collection", () => { + const result = v.validate({ + collection: "app.bsky.feed.post", + record: { text: "Hi", createdAt: new Date().toISOString() }, + }); + expect(result.record.$type).toBe("app.bsky.feed.post"); + expect(result.status).toBe("valid"); }); - }); - describe("strict mode", () => { - it("rejects records for unknown schemas", () => { - const v = new RecordValidator({ strict: true }); + it("rejects mismatched $type", () => { expect(() => { - v.validateRecord("com.example.unknown", { - text: "test", + v.validate({ + collection: "app.bsky.feed.post", + record: { $type: "app.bsky.feed.like", text: "Hi" }, }); - }).toThrow(/no lexicon schema loaded/i); + }).toThrow(/expected app\.bsky\.feed\.post/); }); + }); + + describe("rkey validation", () => { + const v = new RecordValidator(); - it("validates records when schema is loaded", () => { - const v = new RecordValidator({ strict: true }); - // Valid record should pass + it("rejects an invalid rkey for app.bsky.actor.profile (must be 'self')", () => { expect(() => { - v.validateRecord("app.bsky.feed.post", { - $type: "app.bsky.feed.post", - text: "Hello world", - createdAt: new Date().toISOString(), + v.validate({ + collection: "app.bsky.actor.profile", + record: { $type: "app.bsky.actor.profile" }, + rkey: "not-self", }); - }).not.toThrow(); + }).toThrow(/invalid record key/i); }); - }); - - describe("maxLength validation", () => { - it("rejects strings exceeding maxLength", () => { - const v = new RecordValidator(); - const longText = "x".repeat(3001); // post maxLength is 3000 - // Short text should pass + it("rejects empty-string rkey unconditionally (even with validate=false)", () => { expect(() => { - v.validateRecord("app.bsky.feed.post", { - $type: "app.bsky.feed.post", - text: "short", - createdAt: new Date().toISOString(), + v.validate({ + collection: "com.example.unknown", + record: { foo: "bar" }, + rkey: "", + validate: false, }); - }).not.toThrow(); + }).toThrow(/invalid record key/i); + }); - // Long text should fail + it("rejects rkey containing path-traversal chars unconditionally", () => { expect(() => { - v.validateRecord("app.bsky.feed.post", { - $type: "app.bsky.feed.post", - text: longText, - createdAt: new Date().toISOString(), + v.validate({ + collection: "com.example.unknown", + record: { foo: "bar" }, + rkey: "../etc", + validate: false, }); - }).toThrow(/validation failed/i); + }).toThrow(/invalid record key/i); + }); + + it("accepts the literal 'self' rkey for profile", () => { + const result = v.validate({ + collection: "app.bsky.actor.profile", + record: { $type: "app.bsky.actor.profile" }, + rkey: "self", + }); + expect(result.status).toBe("valid"); }); }); - describe("required fields validation", () => { - it("rejects records missing required fields", () => { - const v = new RecordValidator(); + describe("legacy blob rejection", () => { + const v = new RecordValidator(); - // Complete record should pass + it("rejects records containing a legacy blob ref", () => { expect(() => { - v.validateRecord("app.bsky.graph.follow", { - $type: "app.bsky.graph.follow", - subject: "did:plc:abc123", - createdAt: new Date().toISOString(), + v.validate({ + collection: "com.example.unknown", + record: { + avatar: { + cid: "bafyreib2rxk3rybk3aobmv5cjuql3bm2twh4jo5uxgf5kkkqg5jkvqg5va", + mimeType: "image/png", + }, + }, }); - }).not.toThrow(); + }).toThrow(/legacy blobs/i); + }); + }); - // Incomplete record should fail (missing subject) + describe("required fields", () => { + const v = new RecordValidator(); + + it("rejects records missing required fields", () => { expect(() => { - v.validateRecord("app.bsky.graph.follow", { - $type: "app.bsky.graph.follow", - createdAt: new Date().toISOString(), + v.validate({ + collection: "app.bsky.graph.follow", + record: { + $type: "app.bsky.graph.follow", + createdAt: new Date().toISOString(), + }, }); - }).toThrow(/validation failed/i); + }).toThrow(InvalidRecordError); + }); + }); + + describe("schema set", () => { + const v = new RecordValidator(); + + it("includes app.bsky and com.atproto record schemas", () => { + expect(v.hasSchema("app.bsky.feed.post")).toBe(true); + expect(v.hasSchema("app.bsky.actor.profile")).toBe(true); + expect(v.hasSchema("com.atproto.lexicon.schema")).toBe(true); + expect(v.hasSchema("chat.bsky.actor.declaration")).toBe(true); + expect(v.hasSchema("com.example.unknown")).toBe(false); + }); + + it("loads at least the expected number of record types", () => { + expect(v.getLoadedSchemas().length).toBeGreaterThanOrEqual(18); }); }); describe("shared validator instance", () => { - it("exports a shared validator instance", () => { + it("exports a default validator", () => { expect(validator).toBeInstanceOf(RecordValidator); expect(validator.hasSchema("app.bsky.feed.post")).toBe(true); }); diff --git a/packages/pds/test/xrpc.test.ts b/packages/pds/test/xrpc.test.ts index 9ab93493..c01d311a 100644 --- a/packages/pds/test/xrpc.test.ts +++ b/packages/pds/test/xrpc.test.ts @@ -1,5 +1,6 @@ import { describe, it, expect } from "vitest"; import { env, worker } from "./helpers"; +import { now as genTid } from "@atcute/tid"; import { SignJWT, base64url, @@ -352,7 +353,7 @@ describe("XRPC Endpoints", () => { ); expect(response.status).toBe(200); - const data = await response.json(); + const data = (await response.json()) as any; expect(data).toMatchObject({ uri: expect.stringMatching( new RegExp( @@ -360,11 +361,109 @@ describe("XRPC Endpoints", () => { ), ), cid: expect.any(String), + validationStatus: "valid", + }); + }); + + it("returns 409 RecordAlreadyExists on duplicate client-supplied rkey", async () => { + const rkey = genTid(); + const body = JSON.stringify({ + repo: env.DID, + collection: "app.bsky.feed.post", + rkey, + record: { + $type: "app.bsky.feed.post", + text: "first", + createdAt: new Date().toISOString(), + }, }); + + const first = await worker.fetch( + new Request("http://pds.test/xrpc/com.atproto.repo.createRecord", { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${env.AUTH_TOKEN}`, + }, + body, + }), + env, + ); + expect(first.status).toBe(200); + + const second = await worker.fetch( + new Request("http://pds.test/xrpc/com.atproto.repo.createRecord", { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${env.AUTH_TOKEN}`, + }, + body, + }), + env, + ); + expect(second.status).toBe(409); + const data = (await second.json()) as any; + expect(data.error).toBe("RecordAlreadyExists"); + expect(data.message).toContain(rkey); + }); + + it("rejects createRecord with non-boolean validate flag as 400", async () => { + const response = await worker.fetch( + new Request("http://pds.test/xrpc/com.atproto.repo.createRecord", { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${env.AUTH_TOKEN}`, + }, + body: JSON.stringify({ + repo: env.DID, + collection: "app.bsky.feed.post", + validate: 1, + record: { + $type: "app.bsky.feed.post", + text: "Hi", + createdAt: new Date().toISOString(), + }, + }), + }), + env, + ); + expect(response.status).toBe(400); + const data = (await response.json()) as any; + expect(data.error).toBe("InvalidRequest"); + expect(data.message).toMatch(/validate.*boolean/i); + }); + + it("rejects an auto-generated rkey that violates the schema's keySchema", async () => { + // app.bsky.actor.profile requires rkey "self". Without a rkey, the + // worker generates a TID, which fails the literal('self') check. + const response = await worker.fetch( + new Request("http://pds.test/xrpc/com.atproto.repo.createRecord", { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${env.AUTH_TOKEN}`, + }, + body: JSON.stringify({ + repo: env.DID, + collection: "app.bsky.actor.profile", + record: { + $type: "app.bsky.actor.profile", + displayName: "Alice", + }, + }), + }), + env, + ); + expect(response.status).toBe(400); + const data = (await response.json()) as any; + expect(data.error).toBe("InvalidRecord"); + expect(data.message).toMatch(/record key/i); }); it("should get a record", async () => { - // First create a record + const rkey = genTid(); await worker.fetch( new Request("http://pds.test/xrpc/com.atproto.repo.createRecord", { method: "POST", @@ -375,7 +474,7 @@ describe("XRPC Endpoints", () => { body: JSON.stringify({ repo: env.DID, collection: "app.bsky.feed.post", - rkey: "test-post-1", + rkey, record: { $type: "app.bsky.feed.post", text: "Test post", @@ -389,7 +488,7 @@ describe("XRPC Endpoints", () => { // Now get it const response = await worker.fetch( new Request( - `http://pds.test/xrpc/com.atproto.repo.getRecord?repo=${env.DID}&collection=app.bsky.feed.post&rkey=test-post-1`, + `http://pds.test/xrpc/com.atproto.repo.getRecord?repo=${env.DID}&collection=app.bsky.feed.post&rkey=${rkey}`, ), env, ); @@ -397,7 +496,7 @@ describe("XRPC Endpoints", () => { const data = await response.json(); expect(data).toMatchObject({ - uri: `at://${env.DID}/app.bsky.feed.post/test-post-1`, + uri: `at://${env.DID}/app.bsky.feed.post/${rkey}`, cid: expect.any(String), value: { text: "Test post", @@ -406,8 +505,10 @@ describe("XRPC Endpoints", () => { }); it("should list records", async () => { - // Create a few records + const rkeys: string[] = []; for (let i = 1; i <= 3; i++) { + const rkey = genTid(); + rkeys.push(rkey); await worker.fetch( new Request("http://pds.test/xrpc/com.atproto.repo.createRecord", { method: "POST", @@ -418,7 +519,7 @@ describe("XRPC Endpoints", () => { body: JSON.stringify({ repo: env.DID, collection: "app.bsky.feed.post", - rkey: `post-${i}`, + rkey, record: { $type: "app.bsky.feed.post", text: `Post ${i}`, @@ -440,11 +541,9 @@ describe("XRPC Endpoints", () => { expect(response.status).toBe(200); const data = (await response.json()) as any; - // Records persist across tests, so we have at least 3 expect(data.records.length).toBeGreaterThanOrEqual(3); - // Verify our specific records are present const ourRecords = data.records.filter((r: any) => - r.uri.match(/\/post-[123]$/), + rkeys.some((k) => r.uri.endsWith(`/${k}`)), ); expect(ourRecords).toHaveLength(3); expect(ourRecords[0]).toMatchObject({ @@ -460,7 +559,7 @@ describe("XRPC Endpoints", () => { }); it("should delete a record", async () => { - // Create a record + const rkey = genTid(); await worker.fetch( new Request("http://pds.test/xrpc/com.atproto.repo.createRecord", { method: "POST", @@ -471,7 +570,7 @@ describe("XRPC Endpoints", () => { body: JSON.stringify({ repo: env.DID, collection: "app.bsky.feed.post", - rkey: "to-delete", + rkey, record: { $type: "app.bsky.feed.post", text: "Delete me", @@ -493,7 +592,7 @@ describe("XRPC Endpoints", () => { body: JSON.stringify({ repo: env.DID, collection: "app.bsky.feed.post", - rkey: "to-delete", + rkey, }), }), env, @@ -503,7 +602,7 @@ describe("XRPC Endpoints", () => { // Verify it's gone const getResponse = await worker.fetch( new Request( - `http://pds.test/xrpc/com.atproto.repo.getRecord?repo=${env.DID}&collection=app.bsky.feed.post&rkey=to-delete`, + `http://pds.test/xrpc/com.atproto.repo.getRecord?repo=${env.DID}&collection=app.bsky.feed.post&rkey=${rkey}`, ), env, ); @@ -627,7 +726,7 @@ describe("XRPC Endpoints", () => { }); it("should handle concurrent read operations", async () => { - // Create a record + const rkey = genTid(); await worker.fetch( new Request("http://pds.test/xrpc/com.atproto.repo.createRecord", { method: "POST", @@ -638,7 +737,7 @@ describe("XRPC Endpoints", () => { body: JSON.stringify({ repo: env.DID, collection: "app.bsky.feed.post", - rkey: "concurrent-read-test", + rkey, record: { $type: "app.bsky.feed.post", text: "Read me concurrently", @@ -655,7 +754,7 @@ describe("XRPC Endpoints", () => { promises.push( worker.fetch( new Request( - `http://pds.test/xrpc/com.atproto.repo.getRecord?repo=${env.DID}&collection=app.bsky.feed.post&rkey=concurrent-read-test`, + `http://pds.test/xrpc/com.atproto.repo.getRecord?repo=${env.DID}&collection=app.bsky.feed.post&rkey=${rkey}`, ), env, ), @@ -675,7 +774,7 @@ describe("XRPC Endpoints", () => { }); it("should handle create and delete race conditions", async () => { - // Create a record + const rkey = genTid(); const createResponse = await worker.fetch( new Request("http://pds.test/xrpc/com.atproto.repo.createRecord", { method: "POST", @@ -686,7 +785,7 @@ describe("XRPC Endpoints", () => { body: JSON.stringify({ repo: env.DID, collection: "app.bsky.feed.post", - rkey: "race-test", + rkey, record: { $type: "app.bsky.feed.post", text: "Race test", @@ -702,7 +801,7 @@ describe("XRPC Endpoints", () => { const [readResponse, deleteResponse] = await Promise.all([ worker.fetch( new Request( - `http://pds.test/xrpc/com.atproto.repo.getRecord?repo=${env.DID}&collection=app.bsky.feed.post&rkey=race-test`, + `http://pds.test/xrpc/com.atproto.repo.getRecord?repo=${env.DID}&collection=app.bsky.feed.post&rkey=${rkey}`, ), env, ), @@ -716,7 +815,7 @@ describe("XRPC Endpoints", () => { body: JSON.stringify({ repo: env.DID, collection: "app.bsky.feed.post", - rkey: "race-test", + rkey, }), }), env, @@ -733,6 +832,8 @@ describe("XRPC Endpoints", () => { describe("applyWrites", () => { it("should create multiple records in batch", async () => { + const rkey1 = genTid(); + const rkey2 = genTid(); const response = await worker.fetch( new Request("http://pds.test/xrpc/com.atproto.repo.applyWrites", { method: "POST", @@ -746,7 +847,7 @@ describe("XRPC Endpoints", () => { { $type: "com.atproto.repo.applyWrites#create", collection: "app.bsky.feed.post", - rkey: "batch-1", + rkey: rkey1, value: { $type: "app.bsky.feed.post", text: "Batch post 1", @@ -756,7 +857,7 @@ describe("XRPC Endpoints", () => { { $type: "com.atproto.repo.applyWrites#create", collection: "app.bsky.feed.post", - rkey: "batch-2", + rkey: rkey2, value: { $type: "app.bsky.feed.post", text: "Batch post 2", @@ -777,12 +878,12 @@ describe("XRPC Endpoints", () => { expect(data.results[0].$type).toBe( "com.atproto.repo.applyWrites#createResult", ); - expect(data.results[0].uri).toContain("batch-1"); - expect(data.results[1].uri).toContain("batch-2"); + expect(data.results[0].uri).toContain(rkey1); + expect(data.results[1].uri).toContain(rkey2); }); it("should update a record", async () => { - // First create a record + const rkey = genTid(); await worker.fetch( new Request("http://pds.test/xrpc/com.atproto.repo.createRecord", { method: "POST", @@ -793,7 +894,7 @@ describe("XRPC Endpoints", () => { body: JSON.stringify({ repo: env.DID, collection: "app.bsky.feed.post", - rkey: "to-update", + rkey, record: { $type: "app.bsky.feed.post", text: "Original text", @@ -818,7 +919,7 @@ describe("XRPC Endpoints", () => { { $type: "com.atproto.repo.applyWrites#update", collection: "app.bsky.feed.post", - rkey: "to-update", + rkey, value: { $type: "app.bsky.feed.post", text: "Updated text", @@ -840,7 +941,7 @@ describe("XRPC Endpoints", () => { // Verify the update const getResponse = await worker.fetch( new Request( - `http://pds.test/xrpc/com.atproto.repo.getRecord?repo=${env.DID}&collection=app.bsky.feed.post&rkey=to-update`, + `http://pds.test/xrpc/com.atproto.repo.getRecord?repo=${env.DID}&collection=app.bsky.feed.post&rkey=${rkey}`, ), env, ); @@ -849,7 +950,7 @@ describe("XRPC Endpoints", () => { }); it("should delete a record via applyWrites", async () => { - // First create a record + const rkey = genTid(); await worker.fetch( new Request("http://pds.test/xrpc/com.atproto.repo.createRecord", { method: "POST", @@ -860,7 +961,7 @@ describe("XRPC Endpoints", () => { body: JSON.stringify({ repo: env.DID, collection: "app.bsky.feed.post", - rkey: "to-delete-batch", + rkey, record: { $type: "app.bsky.feed.post", text: "Delete me via batch", @@ -885,7 +986,7 @@ describe("XRPC Endpoints", () => { { $type: "com.atproto.repo.applyWrites#delete", collection: "app.bsky.feed.post", - rkey: "to-delete-batch", + rkey, }, ], }), @@ -902,7 +1003,7 @@ describe("XRPC Endpoints", () => { // Verify deletion const getResponse = await worker.fetch( new Request( - `http://pds.test/xrpc/com.atproto.repo.getRecord?repo=${env.DID}&collection=app.bsky.feed.post&rkey=to-delete-batch`, + `http://pds.test/xrpc/com.atproto.repo.getRecord?repo=${env.DID}&collection=app.bsky.feed.post&rkey=${rkey}`, ), env, ); @@ -910,7 +1011,9 @@ describe("XRPC Endpoints", () => { }); it("should handle mixed operations", async () => { - // Create records to work with + const rkeyUpdate = genTid(); + const rkeyDelete = genTid(); + const rkeyNew = genTid(); await worker.fetch( new Request("http://pds.test/xrpc/com.atproto.repo.createRecord", { method: "POST", @@ -921,7 +1024,7 @@ describe("XRPC Endpoints", () => { body: JSON.stringify({ repo: env.DID, collection: "app.bsky.feed.post", - rkey: "mixed-update", + rkey: rkeyUpdate, record: { $type: "app.bsky.feed.post", text: "Will be updated", @@ -942,7 +1045,7 @@ describe("XRPC Endpoints", () => { body: JSON.stringify({ repo: env.DID, collection: "app.bsky.feed.post", - rkey: "mixed-delete", + rkey: rkeyDelete, record: { $type: "app.bsky.feed.post", text: "Will be deleted", @@ -967,7 +1070,7 @@ describe("XRPC Endpoints", () => { { $type: "com.atproto.repo.applyWrites#create", collection: "app.bsky.feed.post", - rkey: "mixed-new", + rkey: rkeyNew, value: { $type: "app.bsky.feed.post", text: "New from batch", @@ -977,7 +1080,7 @@ describe("XRPC Endpoints", () => { { $type: "com.atproto.repo.applyWrites#update", collection: "app.bsky.feed.post", - rkey: "mixed-update", + rkey: rkeyUpdate, value: { $type: "app.bsky.feed.post", text: "Updated from batch", @@ -987,7 +1090,7 @@ describe("XRPC Endpoints", () => { { $type: "com.atproto.repo.applyWrites#delete", collection: "app.bsky.feed.post", - rkey: "mixed-delete", + rkey: rkeyDelete, }, ], }), @@ -1026,6 +1129,230 @@ describe("XRPC Endpoints", () => { expect(response.status).toBe(401); }); + it("rejects update without rkey as 400", async () => { + const response = await worker.fetch( + new Request("http://pds.test/xrpc/com.atproto.repo.applyWrites", { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${env.AUTH_TOKEN}`, + }, + body: JSON.stringify({ + repo: env.DID, + writes: [ + { + $type: "com.atproto.repo.applyWrites#update", + collection: "app.bsky.feed.post", + value: { + $type: "app.bsky.feed.post", + text: "no rkey", + createdAt: new Date().toISOString(), + }, + }, + ], + }), + }), + env, + ); + expect(response.status).toBe(400); + const data = (await response.json()) as any; + expect(data.error).toBe("InvalidRequest"); + expect(data.message).toMatch(/update requires rkey/i); + }); + + it("rejects delete without rkey as 400", async () => { + const response = await worker.fetch( + new Request("http://pds.test/xrpc/com.atproto.repo.applyWrites", { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${env.AUTH_TOKEN}`, + }, + body: JSON.stringify({ + repo: env.DID, + writes: [ + { + $type: "com.atproto.repo.applyWrites#delete", + collection: "app.bsky.feed.post", + }, + ], + }), + }), + env, + ); + expect(response.status).toBe(400); + const data = (await response.json()) as any; + expect(data.error).toBe("InvalidRequest"); + expect(data.message).toMatch(/delete requires rkey/i); + }); + + it("returns per-result validationStatus on each create/update in a batch", async () => { + const knownRkey = genTid(); + const unknownRkey = genTid(); + const response = await worker.fetch( + new Request("http://pds.test/xrpc/com.atproto.repo.applyWrites", { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${env.AUTH_TOKEN}`, + }, + body: JSON.stringify({ + repo: env.DID, + writes: [ + { + $type: "com.atproto.repo.applyWrites#create", + collection: "app.bsky.feed.post", + rkey: knownRkey, + value: { + $type: "app.bsky.feed.post", + text: "known", + createdAt: new Date().toISOString(), + }, + }, + { + $type: "com.atproto.repo.applyWrites#create", + collection: "com.example.unknown", + rkey: unknownRkey, + value: { foo: "bar" }, + }, + ], + }), + }), + env, + ); + expect(response.status).toBe(200); + const data = (await response.json()) as any; + expect(data.results).toHaveLength(2); + expect(data.results[0].validationStatus).toBe("valid"); + expect(data.results[1].validationStatus).toBe("unknown"); + }); + + it("rejects intra-batch duplicate rkey as 400 InvalidRequest (not 409)", async () => { + const dupRkey = genTid(); + const response = await worker.fetch( + new Request("http://pds.test/xrpc/com.atproto.repo.applyWrites", { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${env.AUTH_TOKEN}`, + }, + body: JSON.stringify({ + repo: env.DID, + writes: [ + { + $type: "com.atproto.repo.applyWrites#create", + collection: "app.bsky.feed.post", + rkey: dupRkey, + value: { + $type: "app.bsky.feed.post", + text: "first", + createdAt: new Date().toISOString(), + }, + }, + { + $type: "com.atproto.repo.applyWrites#create", + collection: "app.bsky.feed.post", + rkey: dupRkey, + value: { + $type: "app.bsky.feed.post", + text: "dup", + createdAt: new Date().toISOString(), + }, + }, + ], + }), + }), + env, + ); + expect(response.status).toBe(400); + const data = (await response.json()) as any; + expect(data.error).toBe("InvalidRequest"); + expect(data.message).toMatch(/duplicate rkey in batch/i); + }); + + it("rejects validate=true with unknown collection in applyWrites as 400", async () => { + const response = await worker.fetch( + new Request("http://pds.test/xrpc/com.atproto.repo.applyWrites", { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${env.AUTH_TOKEN}`, + }, + body: JSON.stringify({ + repo: env.DID, + validate: true, + writes: [ + { + $type: "com.atproto.repo.applyWrites#create", + collection: "com.example.unknown", + rkey: genTid(), + value: { foo: "bar" }, + }, + ], + }), + }), + env, + ); + expect(response.status).toBe(400); + const data = (await response.json()) as any; + expect(data.error).toBe("InvalidRecord"); + expect(data.message).toMatch(/unknown lexicon type/i); + }); + + it("rejects empty-string rkey on applyWrites#create as 400", async () => { + const response = await worker.fetch( + new Request("http://pds.test/xrpc/com.atproto.repo.applyWrites", { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${env.AUTH_TOKEN}`, + }, + body: JSON.stringify({ + repo: env.DID, + writes: [ + { + $type: "com.atproto.repo.applyWrites#create", + collection: "app.bsky.feed.post", + rkey: "", + value: { + $type: "app.bsky.feed.post", + text: "hi", + createdAt: new Date().toISOString(), + }, + }, + ], + }), + }), + env, + ); + expect(response.status).toBe(400); + const data = (await response.json()) as any; + expect(data.error).toBe("InvalidRequest"); + expect(data.message).toMatch(/rkey must be a non-empty string/i); + }); + + it("rejects non-boolean validate flag as 400", async () => { + const response = await worker.fetch( + new Request("http://pds.test/xrpc/com.atproto.repo.applyWrites", { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${env.AUTH_TOKEN}`, + }, + body: JSON.stringify({ + repo: env.DID, + validate: "true", + writes: [], + }), + }), + env, + ); + expect(response.status).toBe(400); + const data = (await response.json()) as any; + expect(data.error).toBe("InvalidRequest"); + expect(data.message).toMatch(/validate.*boolean/i); + }); + it("should reject too many writes", async () => { const writes = Array.from({ length: 201 }, (_, i) => ({ $type: "com.atproto.repo.applyWrites#create", @@ -1211,7 +1538,7 @@ describe("XRPC Endpoints", () => { body: JSON.stringify({ repo: env.DID, collection: "app.bsky.feed.post", - rkey: "car-test", + rkey: genTid(), record: { $type: "app.bsky.feed.post", text: "CAR export test", @@ -1289,7 +1616,7 @@ describe("XRPC Endpoints", () => { body: JSON.stringify({ repo: env.DID, collection: "app.bsky.feed.post", - rkey: "stream-test", + rkey: genTid(), record: { $type: "app.bsky.feed.post", text: "Streaming test",