Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .changeset/lexicon-validation-parity.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
"@getcirrus/pds": minor
---

Lexicon validation now matches the reference PDS more closely:

- `createRecord`, `putRecord`, and `applyWrites` honor the `validate` flag from the request body. `true` requires a known schema, `false` skips schema validation, `undefined` validates known schemas optimistically.
- Responses include `validationStatus` (`"valid"` for known, `"unknown"` for unknown collections; omitted when `validate: false`). Per-write `validationStatus` is returned in `applyWrites` results.
- The record's `$type` is filled in from `collection` when missing and rejected on mismatch.
- Generic record-key shape (`isRecordKey`) is enforced for any provided rkey, regardless of `validate` flag — closes a hole where empty-string and path-traversal-style rkeys could reach the repo.
- Schema-specific record keys are validated against the schema's `keySchema` for known collections (e.g. `app.bsky.feed.post` requires a TID, `app.bsky.actor.profile` requires `self`).
- Legacy `{ cid, mimeType }` blob refs are rejected.
- Bundled schema set broadened to include `com.atproto.lexicon.schema`, `app.bsky.actor.status`, `app.bsky.notification.declaration`, and `chat.bsky.actor.declaration`.
- The Durable Object is now the authoritative rkey allocator: when the client doesn't supply an rkey, the worker validates against a candidate (so restrictive `keySchema`s still reject early) and the DO picks the final rkey against its MST state, with a small retry loop to defeat any worker-isolate clockid collisions.
- Client-supplied rkey collisions return `409 RecordAlreadyExists` instead of a generic 500.
- Intra-batch duplicate rkeys in `applyWrites` return `400 InvalidRequest` (distinguished from the 409 above).
- Missing rkey for `applyWrites#update`/`#delete` returns `400 InvalidRequest`.
- Non-boolean `validate` flag values return `400 InvalidRequest`.
- Non-string `rkey` values (including `null`) return `400 InvalidRequest`.
7 changes: 5 additions & 2 deletions packages/pds/e2e/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { AtpAgent } from "@atproto/api";
import { now as tidNow } from "@atcute/tid";

export function getPort(): number {
return (
Expand All @@ -15,10 +16,12 @@ export function createAgent(): AtpAgent {
}

/**
* Generate a unique rkey for test isolation
* Generate a unique TID-format rkey for test isolation. Most app.bsky.*
* record collections constrain the rkey to TID format, so tests can't use
* arbitrary strings.
*/
export function uniqueRkey(): string {
return `test-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
return tidNow();
}

export const TEST_DID = "did:web:test.local";
Expand Down
73 changes: 65 additions & 8 deletions packages/pds/src/account-do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
import { BlobStore, type BlobRef } from "./blobs";
import { jsonToLex } from "@atproto/lex-json";
import type { PDSEnv } from "./types";
import { RecordAlreadyExistsError, type ValidationStatus } from "./validation";

/**
* Account Durable Object - manages a single user's AT Protocol repository.
Expand Down Expand Up @@ -322,16 +323,42 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
collection: string,
rkey: string | undefined,
record: unknown,
validationStatus?: ValidationStatus,
): Promise<{
uri: string;
cid: string;
commit: { cid: string; rev: string };
validationStatus?: ValidationStatus;
}> {
await this.ensureActive();
const repo = await this.getRepo();
const keypair = await this.getKeypair();

const actualRkey = rkey || tidNow();
// Auto-generate rkey here (not in the worker) so the candidate is
// chosen against this DO's authoritative MST state, eliminating the
// 1/1024 collision risk between worker isolates picking the same
// timestamp+clockid in the same ms. For client-supplied rkeys, throw
// a structured RecordAlreadyExistsError if it collides. Use the
// MST's CID lookup (data.get) instead of repo.getRecord to avoid
// fetching and decoding the full record block on every write.
const autoGenerated = rkey === undefined;
let actualRkey = rkey ?? tidNow();
for (let attempt = 0; attempt < 5; attempt++) {
const existingCid = await repo.data.get(`${collection}/${actualRkey}`);
if (!existingCid) break;
if (!autoGenerated) {
throw new RecordAlreadyExistsError(
`${collection}/${actualRkey}`,
);
}
if (attempt === 4) {
throw new Error(
`Failed to allocate unique rkey for ${collection} after 5 attempts`,
);
}
actualRkey = tidNow();
}

const createOp: RecordCreateOp = {
action: WriteOpAction.Create,
collection,
Expand Down Expand Up @@ -394,6 +421,7 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
cid: this.repo.cid.toString(),
rev: this.repo.commit.rev,
},
...(validationStatus !== undefined ? { validationStatus } : {}),
};
}

Expand Down Expand Up @@ -466,11 +494,12 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
collection: string,
rkey: string,
record: unknown,
validationStatus?: ValidationStatus,
): Promise<{
uri: string;
cid: string;
commit: { cid: string; rev: string };
validationStatus: string;
validationStatus?: ValidationStatus;
}> {
await this.ensureActive();
const repo = await this.getRepo();
Expand Down Expand Up @@ -548,7 +577,7 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
cid: this.repo.cid.toString(),
rev: this.repo.commit.rev,
},
validationStatus: "valid",
...(validationStatus !== undefined ? { validationStatus } : {}),
};
}

Expand All @@ -561,14 +590,15 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
collection: string;
rkey?: string;
value?: unknown;
validationStatus?: ValidationStatus;
}>,
): Promise<{
commit: { cid: string; rev: string };
results: Array<{
$type: string;
uri?: string;
cid?: string;
validationStatus?: string;
validationStatus?: ValidationStatus;
}>;
}> {
await this.ensureActive();
Expand All @@ -581,15 +611,38 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
$type: string;
uri?: string;
cid?: string;
validationStatus?: string;
validationStatus?: ValidationStatus;
collection: string;
rkey: string;
action: WriteOpAction;
}> = [];

// Track rkeys this batch will write so two auto-generated creates in
// the same batch don't pick the same rkey.
const reservedRkeys = new Set<string>();

for (const write of writes) {
if (write.$type === "com.atproto.repo.applyWrites#create") {
const rkey = write.rkey || tidNow();
const autoGenerated = write.rkey === undefined;
let rkey = write.rkey ?? tidNow();
for (let attempt = 0; attempt < 5; attempt++) {
const composite = `${write.collection}/${rkey}`;
const collidesInBatch = reservedRkeys.has(composite);
const collidesInRepo =
!collidesInBatch &&
(await repo.data.get(composite)) !== null;
if (!collidesInBatch && !collidesInRepo) break;
if (!autoGenerated) {
throw new RecordAlreadyExistsError(composite);
}
if (attempt === 4) {
throw new Error(
`Failed to allocate unique rkey for ${write.collection} after 5 attempts`,
);
}
rkey = tidNow();
}
reservedRkeys.add(`${write.collection}/${rkey}`);
const op: RecordCreateOp = {
action: WriteOpAction.Create,
collection: write.collection,
Expand All @@ -602,6 +655,7 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
collection: write.collection,
rkey,
action: WriteOpAction.Create,
validationStatus: write.validationStatus,
});
} else if (write.$type === "com.atproto.repo.applyWrites#update") {
if (!write.rkey) {
Expand All @@ -619,6 +673,7 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
collection: write.collection,
rkey: write.rkey,
action: WriteOpAction.Update,
validationStatus: write.validationStatus,
});
} else if (write.$type === "com.atproto.repo.applyWrites#delete") {
if (!write.rkey) {
Expand Down Expand Up @@ -657,7 +712,7 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
$type: string;
uri?: string;
cid?: string;
validationStatus?: string;
validationStatus?: ValidationStatus;
}> = [];
const opsWithCids: Array<RecordWriteOp & { cid?: CID | null }> = [];

Expand All @@ -678,7 +733,9 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
$type: result.$type,
uri: `at://${this.repo.did}/${result.collection}/${result.rkey}`,
cid: recordCid?.toString(),
validationStatus: "valid",
...(result.validationStatus !== undefined
? { validationStatus: result.validationStatus }
: {}),
});
// Include the record CID in the op for the firehose
opsWithCids.push({ ...op, cid: recordCid });
Expand Down
Loading
Loading