diff --git a/apps/web/src/scripts/db/kiloclaw-subscription-alignment.ts b/apps/web/src/scripts/db/kiloclaw-subscription-alignment.ts index 53b8c0980..c96b1a5bc 100644 --- a/apps/web/src/scripts/db/kiloclaw-subscription-alignment.ts +++ b/apps/web/src/scripts/db/kiloclaw-subscription-alignment.ts @@ -13,6 +13,8 @@ * pnpm script db kiloclaw-subscription-alignment apply-org * pnpm script db kiloclaw-subscription-alignment preview-changelog-baseline * pnpm script db kiloclaw-subscription-alignment apply-changelog-baseline + * pnpm script db kiloclaw-subscription-alignment preview-multi-row-all-destroyed + * pnpm script db kiloclaw-subscription-alignment apply-multi-row-all-destroyed * * Flags: * --confirm-sandboxes-destroyed Required for apply-duplicates to write @@ -23,11 +25,30 @@ * flag, apply-duplicates prints a manifest of duplicate sandbox IDs and * exits without writes. * + * --bulk Enables chunked bulk write path for low-risk, high-volume + * backfills. Used by: + * - apply-missing-personal for backfill_destroyed_terminal_personal rows + * (chunked bulk INSERT). + * - apply-multi-row-all-destroyed for chain collapses (chunked bulk + * UPDATE via FROM VALUES + chunked bulk INSERT of change-log rows; + * falls back per-user if a chunk transaction rolls back so that a + * single UQ_kiloclaw_subscriptions_transferred_to race doesn't + * poison the whole chunk). + * * Admin-panel workflow (recommended): operators destroy duplicate sandboxes * via the admin panel, which sets destroyed_at and tears down the resource. * Then apply-missing-personal picks up the now-destroyed instances via the * backfill_destroyed_terminal_personal path and inserts canceled terminal * subscription rows — no --confirm-sandboxes-destroyed flag required. + * + * multi-row-all-destroyed: collapses users whose `personalCurrentSubscriptionWhere` + * rows all point at destroyed personal instances. The guard in + * apps/web/src/lib/kiloclaw/current-personal-subscription.ts throws when + * activeRows === 0 AND rows.length > 1 — getBillingStatus returns 409 CONFLICT + * for the affected user. This mode is pure data: for each such user, pick the + * newest matching row (created_at DESC, id DESC) as the authoritative current + * row and point every older matching row's transferred_to_subscription_id at + * it. No instances are touched. Safe to run without flags. */ import { and, asc, desc, eq, inArray, isNull, notExists, or, sql } from 'drizzle-orm'; @@ -38,7 +59,10 @@ import { KILOCLAW_TRIAL_DURATION_DAYS, } from '@/lib/kiloclaw/constants'; import { db, type DrizzleTransaction } from '@/lib/drizzle'; -import { insertKiloClawSubscriptionChangeLog } from '@kilocode/db'; +import { + insertKiloClawSubscriptionChangeLog, + serializeKiloClawSubscriptionSnapshot, +} from '@kilocode/db'; import { kiloclaw_earlybird_purchases, kiloclaw_instances, @@ -63,7 +87,9 @@ type Mode = | 'preview-org' | 'apply-org' | 'preview-changelog-baseline' - | 'apply-changelog-baseline'; + | 'apply-changelog-baseline' + | 'preview-multi-row-all-destroyed' + | 'apply-multi-row-all-destroyed'; type PersonalInstanceWithoutRow = { instanceId: string; @@ -247,6 +273,36 @@ function printSection(label: string, rows: T[]) { } } +function logApplyProgress(params: { + label: string; + processed: number; + total: number; + startedAt: number; + every?: number; +}) { + const interval = params.every ?? 100; + if (params.processed !== params.total && params.processed % interval !== 0) { + return; + } + + const elapsedSeconds = Math.round((Date.now() - params.startedAt) / 1000); + console.log( + `[progress] ${params.label}: ${params.processed}/${params.total} processed (${elapsedSeconds}s elapsed)` + ); +} + +function chunkArray(rows: T[], size: number): T[][] { + if (size <= 0) { + return [rows]; + } + + const chunks: T[][] = []; + for (let index = 0; index < rows.length; index += size) { + chunks.push(rows.slice(index, index + size)); + } + return chunks; +} + function describeError(error: unknown): string { return error instanceof Error ? `${error.name}: ${error.message}` : String(error); } @@ -370,13 +426,13 @@ async function listDetachedSubscriptions(): Promise`( SELECT count(*)::int FROM ${kiloclaw_subscriptions} AS detached - WHERE detached.user_id = ${kiloclaw_subscriptions.user_id} + WHERE detached.user_id = "kiloclaw_subscriptions"."user_id" AND detached.instance_id IS NULL )`, activePersonalInstanceCount: sql`( SELECT count(*)::int FROM ${kiloclaw_instances} AS active_instance - WHERE active_instance.user_id = ${kiloclaw_subscriptions.user_id} + WHERE active_instance.user_id = "kiloclaw_subscriptions"."user_id" AND active_instance.organization_id IS NULL AND active_instance.destroyed_at IS NULL )`, @@ -385,14 +441,14 @@ async function listDetachedSubscriptions(): Promise`( SELECT active_instance.id FROM ${kiloclaw_instances} AS active_instance - WHERE active_instance.user_id = ${kiloclaw_subscriptions.user_id} + WHERE active_instance.user_id = "kiloclaw_subscriptions"."user_id" AND active_instance.organization_id IS NULL AND active_instance.destroyed_at IS NULL ORDER BY active_instance.created_at DESC @@ -914,23 +970,6 @@ async function listSubscriptionsMissingBaselineChangeLog(): Promise { - const rows = await executor - .select({ id: kiloclaw_subscription_change_log.id }) - .from(kiloclaw_subscription_change_log) - .where( - and( - eq(kiloclaw_subscription_change_log.subscription_id, subscriptionId), - isNull(kiloclaw_subscription_change_log.before_state) - ) - ) - .limit(1); - return rows.length > 0; -} - async function previewChangelogBaselineBackfill() { const rows = await listSubscriptionsMissingBaselineChangeLog(); printSection( @@ -952,72 +991,118 @@ async function applyChangelogBaselineBackfill() { let insertedFromCurrent = 0; let insertedFromMutation = 0; const failures: Array<{ subscriptionId: string; userId: string; error: string }> = []; + const startedAt = Date.now(); - for (const row of rows) { + const chunkSize = 500; + let processedCount = 0; + + for (const chunk of chunkArray(rows, chunkSize)) { try { - const result = await db.transaction(async tx => { - if (await hasBaselineChangeLogEntry(tx, row.id)) { - return 'skipped' as const; + // Counts are returned from the transaction callback and accumulated + // AFTER commit. Mutating the outer counters inside the callback would + // over-report on rollback: e.g. if the `currentStateRows` INSERT ran + // but the later `mutationStateRows` INSERT threw, the whole tx would + // roll back while the outer counters stayed incremented. + const chunkResult = await db.transaction(async tx => { + const chunkIds = chunk.map(row => row.id); + const existingBaselines = await tx + .select({ subscriptionId: kiloclaw_subscription_change_log.subscription_id }) + .from(kiloclaw_subscription_change_log) + .where( + and( + inArray(kiloclaw_subscription_change_log.subscription_id, chunkIds), + isNull(kiloclaw_subscription_change_log.before_state) + ) + ); + + const existingBaselineIds = new Set(existingBaselines.map(row => row.subscriptionId)); + const candidateRows = chunk.filter(row => !existingBaselineIds.has(row.id)); + + if (candidateRows.length === 0) { + return { insertedFromCurrent: 0, insertedFromMutation: 0 }; } - // When prior mutation logs exist, the oldest row's before_state holds - // the subscription's true initial state (captured before the first - // mutation was applied). Using it as the fabricated baseline's - // after_state preserves audit-replay integrity: replaying - // baseline -> mutation1 -> mutation2 ... reaches the current row. - // Falling back to the current row would inject a no-op delta before - // the first mutation and desync replay forever. - const [earliestMutation] = await tx - .select({ + const candidateIds = candidateRows.map(row => row.id); + const earliestMutations = await tx + .selectDistinctOn([kiloclaw_subscription_change_log.subscription_id], { + subscriptionId: kiloclaw_subscription_change_log.subscription_id, beforeState: kiloclaw_subscription_change_log.before_state, }) .from(kiloclaw_subscription_change_log) - .where(eq(kiloclaw_subscription_change_log.subscription_id, row.id)) - .orderBy(asc(kiloclaw_subscription_change_log.created_at)) - .limit(1); + .where(inArray(kiloclaw_subscription_change_log.subscription_id, candidateIds)) + .orderBy( + kiloclaw_subscription_change_log.subscription_id, + asc(kiloclaw_subscription_change_log.created_at) + ); + + const earliestMutationMap = new Map( + earliestMutations.map(row => [row.subscriptionId, row.beforeState ?? null]) + ); - const inheritedBaseline = earliestMutation?.beforeState ?? null; + const currentStateRows = candidateRows.filter(row => !earliestMutationMap.get(row.id)); + const mutationStateRows = candidateRows.filter(row => + Boolean(earliestMutationMap.get(row.id)) + ); - if (inheritedBaseline) { - await tx.insert(kiloclaw_subscription_change_log).values({ - subscription_id: row.id, - actor_type: ALIGNMENT_SCRIPT_ACTOR.actorType, - actor_id: ALIGNMENT_SCRIPT_ACTOR.actorId, - action: 'backfilled', - reason: 'baseline_subscription_snapshot_from_earliest_mutation', - before_state: null, - after_state: inheritedBaseline, - }); - return 'from_mutation' as const; + if (currentStateRows.length > 0) { + await tx.insert(kiloclaw_subscription_change_log).values( + currentStateRows.map(row => ({ + subscription_id: row.id, + actor_type: ALIGNMENT_SCRIPT_ACTOR.actorType, + actor_id: ALIGNMENT_SCRIPT_ACTOR.actorId, + action: 'backfilled' as const, + reason: 'baseline_subscription_snapshot', + before_state: null, + after_state: serializeKiloClawSubscriptionSnapshot(row), + })) + ); } - await insertAlignmentChangeLog(tx, { - subscriptionId: row.id, - action: 'backfilled', - reason: 'baseline_subscription_snapshot', - before: null, - after: row, - }); - return 'from_current' as const; + if (mutationStateRows.length > 0) { + await tx.insert(kiloclaw_subscription_change_log).values( + mutationStateRows.map(row => ({ + subscription_id: row.id, + actor_type: ALIGNMENT_SCRIPT_ACTOR.actorType, + actor_id: ALIGNMENT_SCRIPT_ACTOR.actorId, + action: 'backfilled' as const, + reason: 'baseline_subscription_snapshot_from_earliest_mutation', + before_state: null, + after_state: earliestMutationMap.get(row.id) ?? null, + })) + ); + } + + return { + insertedFromCurrent: currentStateRows.length, + insertedFromMutation: mutationStateRows.length, + }; }); - if (result === 'from_mutation') { - insertedFromMutation += 1; - } else if (result === 'from_current') { - insertedFromCurrent += 1; - } + insertedFromCurrent += chunkResult.insertedFromCurrent; + insertedFromMutation += chunkResult.insertedFromMutation; } catch (error) { - console.error('Changelog baseline backfill row failed', { - subscriptionId: row.id, - userId: row.user_id, - error: describeError(error), - }); - failures.push({ - subscriptionId: row.id, - userId: row.user_id, - error: describeError(error), + const errorMessage = describeError(error); + console.error('Changelog baseline backfill chunk failed', { + subscriptionIds: chunk.map(row => row.id), + error: errorMessage, }); + failures.push( + ...chunk.map(row => ({ + subscriptionId: row.id, + userId: row.user_id, + error: errorMessage, + })) + ); } + + processedCount += chunk.length; + logApplyProgress({ + label: 'apply-changelog-baseline', + processed: processedCount, + total: rows.length, + startedAt, + every: 500, + }); } console.log('\nChangelog baseline backfill results'); @@ -1344,26 +1429,24 @@ async function applyDuplicateActiveInstanceRow( // predecessor would block reassignment forever: preview classifies the // duplicate as safe, apply sees the transferred row as a current sub and // skips. Same applies to duplicate counts. - const [canonicalExisting, duplicateExisting] = await Promise.all([ - tx - .select() - .from(kiloclaw_subscriptions) - .where( - and( - eq(kiloclaw_subscriptions.instance_id, row.canonicalInstanceId), - isNull(kiloclaw_subscriptions.transferred_to_subscription_id) - ) - ), - tx - .select() - .from(kiloclaw_subscriptions) - .where( - and( - eq(kiloclaw_subscriptions.instance_id, row.duplicateInstanceId), - isNull(kiloclaw_subscriptions.transferred_to_subscription_id) - ) - ), - ]); + const canonicalExisting = await tx + .select() + .from(kiloclaw_subscriptions) + .where( + and( + eq(kiloclaw_subscriptions.instance_id, row.canonicalInstanceId), + isNull(kiloclaw_subscriptions.transferred_to_subscription_id) + ) + ); + const duplicateExisting = await tx + .select() + .from(kiloclaw_subscriptions) + .where( + and( + eq(kiloclaw_subscriptions.instance_id, row.duplicateInstanceId), + isNull(kiloclaw_subscriptions.transferred_to_subscription_id) + ) + ); if ( row.action === 'reassign_to_canonical_and_destroy_duplicate' && @@ -1486,6 +1569,7 @@ async function applyDuplicateActiveInstances(options: ApplyOptions) { userId: string; action: string; }> = []; + const startedAt = Date.now(); // Writing destroyed_at without destroying the underlying sandbox would make // an active sandbox invisible to lifecycle/access checks while it keeps @@ -1522,12 +1606,19 @@ async function applyDuplicateActiveInstances(options: ApplyOptions) { return; } - for (const row of rows) { + for (const [index, row] of rows.entries()) { if ( row.action !== 'backfill_destroy_duplicate_personal' && row.action !== 'backfill_destroy_duplicate_org' && row.action !== 'reassign_to_canonical_and_destroy_duplicate' ) { + logApplyProgress({ + label: 'apply-duplicates', + processed: index + 1, + total: rows.length, + startedAt, + every: 25, + }); continue; } @@ -1571,6 +1662,14 @@ async function applyDuplicateActiveInstances(options: ApplyOptions) { }); break; } + + logApplyProgress({ + label: 'apply-duplicates', + processed: index + 1, + total: rows.length, + startedAt, + every: 25, + }); } console.log('\nDuplicate active instance apply results'); @@ -1592,6 +1691,146 @@ type MissingPersonalOutcome = | 'skipped' | 'no_op'; +async function applyMissingPersonalDestroyedTerminalBulk(params: { + rows: MissingPersonalCandidate[]; + startedAt: number; + processedOffset: number; + totalRows: number; + progressEvery?: number; +}): Promise<{ + processedCount: number; + insertedCount: number; + skipped: Array<{ + instanceId: string; + userId: string; + action: string; + error?: string; + }>; +}> { + const chunkSize = 250; + const skipped: Array<{ + instanceId: string; + userId: string; + action: string; + error?: string; + }> = []; + let insertedCount = 0; + let processedCount = 0; + + for (const chunk of chunkArray(params.rows, chunkSize)) { + try { + await db.transaction(async tx => { + const rowsWithDestroyedAt = chunk.filter(row => Boolean(row.instanceDestroyedAt)); + const rowsWithoutDestroyedAt = chunk.filter(row => !row.instanceDestroyedAt); + + skipped.push( + ...rowsWithoutDestroyedAt.map(row => ({ + instanceId: row.instanceId, + userId: row.userId, + action: row.action, + })) + ); + + if (rowsWithDestroyedAt.length === 0) { + return; + } + + const instanceIds = rowsWithDestroyedAt.map(row => row.instanceId); + const existingRows = await tx + .select({ instanceId: kiloclaw_subscriptions.instance_id }) + .from(kiloclaw_subscriptions) + .where(inArray(kiloclaw_subscriptions.instance_id, instanceIds)); + + const existingInstanceIds = new Set( + existingRows + .map(row => row.instanceId) + .filter((instanceId): instanceId is string => Boolean(instanceId)) + ); + + const insertableRows = rowsWithDestroyedAt.filter( + row => !existingInstanceIds.has(row.instanceId) + ); + const skippedRows = rowsWithDestroyedAt.filter(row => + existingInstanceIds.has(row.instanceId) + ); + + skipped.push( + ...skippedRows.map(row => ({ + instanceId: row.instanceId, + userId: row.userId, + action: row.action, + })) + ); + + if (insertableRows.length === 0) { + return; + } + + const insertedRows = await tx + .insert(kiloclaw_subscriptions) + .values( + insertableRows.map(row => ({ + user_id: row.userId, + instance_id: row.instanceId, + plan: 'trial' as const, + status: 'canceled' as const, + payment_source: null, + cancel_at_period_end: false, + trial_started_at: row.instanceCreatedAt, + trial_ends_at: getPersonalTrialEndsAt(row.instanceCreatedAt), + created_at: row.instanceCreatedAt, + updated_at: row.instanceCreatedAt, + })) + ) + .returning(); + + insertedCount += insertedRows.length; + + if (insertedRows.length === 0) { + return; + } + + await tx.insert(kiloclaw_subscription_change_log).values( + insertedRows.map(insertedRow => ({ + subscription_id: insertedRow.id, + actor_type: ALIGNMENT_SCRIPT_ACTOR.actorType, + actor_id: ALIGNMENT_SCRIPT_ACTOR.actorId, + action: 'backfilled' as const, + reason: 'apply_missing_personal_backfill_destroyed_terminal', + before_state: null, + after_state: serializeKiloClawSubscriptionSnapshot(insertedRow), + })) + ); + }); + } catch (error) { + const errorMessage = describeError(error); + for (const row of chunk) { + skipped.push({ + instanceId: row.instanceId, + userId: row.userId, + action: row.action, + error: errorMessage, + }); + } + } + + processedCount += chunk.length; + logApplyProgress({ + label: 'apply-missing-personal', + processed: params.processedOffset + processedCount, + total: params.totalRows, + startedAt: params.startedAt, + every: params.progressEvery ?? 25, + }); + } + + return { + processedCount, + insertedCount, + skipped, + }; +} + async function applyMissingPersonalBackfillRow( row: MissingPersonalCandidate ): Promise { @@ -1644,19 +1883,17 @@ async function applyMissingPersonalBackfillRow( // Only current (non-transferred) rows block reassignment. A transferred // predecessor on this instance is runtime-invisible and must not block // the successor insert. - const [existing, hasLiveCurrent] = await Promise.all([ - tx - .select({ id: kiloclaw_subscriptions.id }) - .from(kiloclaw_subscriptions) - .where( - and( - eq(kiloclaw_subscriptions.instance_id, row.instanceId), - isNull(kiloclaw_subscriptions.transferred_to_subscription_id) - ) + const existing = await tx + .select({ id: kiloclaw_subscriptions.id }) + .from(kiloclaw_subscriptions) + .where( + and( + eq(kiloclaw_subscriptions.instance_id, row.instanceId), + isNull(kiloclaw_subscriptions.transferred_to_subscription_id) ) - .limit(1), - liveCurrentPersonalSubscriptionExistsForUser(tx, row.userId), - ]); + ) + .limit(1); + const hasLiveCurrent = await liveCurrentPersonalSubscriptionExistsForUser(tx, row.userId); if (existing.length > 0 || hasLiveCurrent) { return 'skipped'; @@ -1798,19 +2035,17 @@ async function applyMissingPersonalBackfillRow( // Only current (non-transferred) rows block bootstrap. Transferred // predecessor on this instance is runtime-invisible. - const [existingForInstance, hasPersonalForUser] = await Promise.all([ - tx - .select({ id: kiloclaw_subscriptions.id }) - .from(kiloclaw_subscriptions) - .where( - and( - eq(kiloclaw_subscriptions.instance_id, row.instanceId), - isNull(kiloclaw_subscriptions.transferred_to_subscription_id) - ) + const existingForInstance = await tx + .select({ id: kiloclaw_subscriptions.id }) + .from(kiloclaw_subscriptions) + .where( + and( + eq(kiloclaw_subscriptions.instance_id, row.instanceId), + isNull(kiloclaw_subscriptions.transferred_to_subscription_id) ) - .limit(1), - personalContextSubscriptionExistsForUser(tx, row.userId), - ]); + ) + .limit(1); + const hasPersonalForUser = await personalContextSubscriptionExistsForUser(tx, row.userId); if (existingForInstance.length > 0 || hasPersonalForUser) { return 'skipped'; @@ -1854,24 +2089,22 @@ async function applyMissingPersonalBackfillRow( // Only current (non-transferred) rows block earlybird backfill. // Transferred predecessor on this instance is runtime-invisible. - const [existingForInstance, hasPersonalForUser, earlybirdPurchase] = await Promise.all([ - tx - .select({ id: kiloclaw_subscriptions.id }) - .from(kiloclaw_subscriptions) - .where( - and( - eq(kiloclaw_subscriptions.instance_id, row.instanceId), - isNull(kiloclaw_subscriptions.transferred_to_subscription_id) - ) + const existingForInstance = await tx + .select({ id: kiloclaw_subscriptions.id }) + .from(kiloclaw_subscriptions) + .where( + and( + eq(kiloclaw_subscriptions.instance_id, row.instanceId), + isNull(kiloclaw_subscriptions.transferred_to_subscription_id) ) - .limit(1), - personalContextSubscriptionExistsForUser(tx, row.userId), - tx - .select({ createdAt: kiloclaw_earlybird_purchases.created_at }) - .from(kiloclaw_earlybird_purchases) - .where(eq(kiloclaw_earlybird_purchases.user_id, row.userId)) - .limit(1), - ]); + ) + .limit(1); + const hasPersonalForUser = await personalContextSubscriptionExistsForUser(tx, row.userId); + const earlybirdPurchase = await tx + .select({ createdAt: kiloclaw_earlybird_purchases.created_at }) + .from(kiloclaw_earlybird_purchases) + .where(eq(kiloclaw_earlybird_purchases.user_id, row.userId)) + .limit(1); if (existingForInstance.length > 0 || hasPersonalForUser || earlybirdPurchase.length === 0) { return 'skipped'; @@ -1966,7 +2199,7 @@ async function applyMissingPersonalBackfillRow( }); } -async function applyMissingPersonalBackfill() { +async function applyMissingPersonalBackfill(options: ApplyOptions) { const rows = await buildMissingPersonalCandidates(); let adopted = 0; let reassigned = 0; @@ -1979,8 +2212,33 @@ async function applyMissingPersonalBackfill() { action: string; error?: string; }> = []; + const startedAt = Date.now(); + let processedCount = 0; - for (const row of rows) { + const bulkDestroyedRows = options.bulk + ? rows.filter(row => row.action === 'backfill_destroyed_terminal_personal') + : []; + const rowByRowRows = options.bulk + ? rows.filter(row => row.action !== 'backfill_destroyed_terminal_personal') + : rows; + + if (bulkDestroyedRows.length > 0) { + console.log( + `[bulk] apply-missing-personal: processing ${bulkDestroyedRows.length} destroyed-terminal rows in chunked bulk mode` + ); + const bulkResult = await applyMissingPersonalDestroyedTerminalBulk({ + rows: bulkDestroyedRows, + startedAt, + processedOffset: processedCount, + totalRows: rows.length, + progressEvery: 25, + }); + processedCount += bulkResult.processedCount; + destroyedTerminalBackfilled += bulkResult.insertedCount; + skipped.push(...bulkResult.skipped); + } + + for (const row of rowByRowRows) { let outcome: MissingPersonalOutcome; try { outcome = await applyMissingPersonalBackfillRow(row); @@ -1997,6 +2255,14 @@ async function applyMissingPersonalBackfill() { action: row.action, error: describeError(error), }); + processedCount += 1; + logApplyProgress({ + label: 'apply-missing-personal', + processed: processedCount, + total: rows.length, + startedAt, + every: 25, + }); continue; } @@ -2022,6 +2288,15 @@ async function applyMissingPersonalBackfill() { case 'no_op': break; } + + processedCount += 1; + logApplyProgress({ + label: 'apply-missing-personal', + processed: processedCount, + total: rows.length, + startedAt, + every: 25, + }); } console.log('\nMissing personal backfill results'); @@ -2036,6 +2311,699 @@ async function applyMissingPersonalBackfill() { printSection('Missing personal rows skipped during apply', skipped); } +type MultiRowAllDestroyedPair = { + sourceId: string; + targetId: string; + sourceCreatedAt: string; + targetCreatedAt: string; +}; + +type MultiRowAllDestroyedCandidate = { + userId: string; + tailSubscriptionId: string; + tailCreatedAt: string; + currentAmbiguousRowCount: number; + pairs: MultiRowAllDestroyedPair[]; + // A candidate is `fullyCollapsible` when the planned pair set reduces the + // user's current ambiguous row count to exactly one. When a pre-existing + // partial chain (e.g. D→B in a history of [D, A, B, C]) claims an + // intermediate row's predecessor slot, the UQ_kiloclaw_subscriptions_transferred_to + // guard forces us to drop a pair, leaving more than one row ambiguous. + // Apply paths must skip these users — partial collapse would replace one + // `CurrentPersonalSubscriptionResolutionError`-firing shape with another + // and mask the problem in the change log. + fullyCollapsible: boolean; +}; + +type MultiRowAllDestroyedOutcome = + | 'collapsed' + | 'skipped_no_longer_ambiguous' + | 'skipped_race' + | 'skipped_not_collapsible'; + +// --------------------------------------------------------------------------- +// Multi-row-all-destroyed collapse +// +// Scope: users whose `personalCurrentSubscriptionWhere` predicate returns >1 rows +// AND NONE of the joined instances have `destroyed_at IS NULL`. The guard in +// apps/web/src/lib/kiloclaw/current-personal-subscription.ts filters rows to +// activeRows first; with zero alive rows and rows.length > 1 it throws at +// line 114 ("Expected at most one current personal subscription row"). +// +// Why this is separate from reassign_destroyed_access_row: that path requires +// a new live target instance to insert a successor onto and move the Stripe +// funding/schedule to. These users have no live personal instance at all — +// either they've moved to an org-owned instance (whose rows the guard filters +// via i.organization_id IS NULL) or they've fully destroyed their personal +// claw. There's nowhere to create a successor, so we just collapse the chain +// down to one tail row and let getBillingStatus return it. +// +// How the chain is formed: `kiloclaw_subscriptions` has a partial unique index +// UQ_kiloclaw_subscriptions_transferred_to on transferred_to_subscription_id, +// so each subscription can have at most ONE predecessor. We therefore cannot +// fan in all older rows onto the newest; we have to build a linked chain of +// (row_i → row_{i+1}) pairs within each user's personal-row history ordered +// oldest-to-newest. For every pair where the older row currently has +// transferred_to_subscription_id IS NULL, we set it to the immediately-next +// row's id. We skip any pair whose target already has a predecessor (to avoid +// UQ violations from pre-existing partial chains), so re-runs are idempotent. +// --------------------------------------------------------------------------- + +type MultiRowAllDestroyedSourceRow = { + subscriptionId: string; + userId: string; + subscriptionCreatedAt: string; + instanceDestroyedAt: string | null; + transferredToSubscriptionId: string | null; +}; + +async function buildMultiRowAllDestroyedCandidates(): Promise { + // Pulls ALL personal-instance subscription rows across the fleet, including + // rows that have already been transferred. We need the complete personal + // history per user to form a correct LEAD-chain (otherwise an older + // un-transferred row would be paired with a newer row whose predecessor + // slot is already occupied by some other row, violating + // UQ_kiloclaw_subscriptions_transferred_to). + const rows: MultiRowAllDestroyedSourceRow[] = await db + .select({ + subscriptionId: kiloclaw_subscriptions.id, + userId: kiloclaw_subscriptions.user_id, + subscriptionCreatedAt: kiloclaw_subscriptions.created_at, + instanceDestroyedAt: kiloclaw_instances.destroyed_at, + transferredToSubscriptionId: kiloclaw_subscriptions.transferred_to_subscription_id, + }) + .from(kiloclaw_subscriptions) + .innerJoin(kiloclaw_instances, eq(kiloclaw_instances.id, kiloclaw_subscriptions.instance_id)) + .where( + and( + eq(kiloclaw_instances.user_id, kiloclaw_subscriptions.user_id), + isNull(kiloclaw_instances.organization_id) + ) + ) + .orderBy( + asc(kiloclaw_subscriptions.user_id), + asc(kiloclaw_subscriptions.created_at), + asc(kiloclaw_subscriptions.id) + ); + + const byUser = new Map(); + for (const row of rows) { + const existing = byUser.get(row.userId); + if (existing) { + existing.push(row); + } else { + byUser.set(row.userId, [row]); + } + } + + const candidates: MultiRowAllDestroyedCandidate[] = []; + for (const [userId, userRows] of byUser) { + // `firing_all_destroyed_multi` per the 2026-04-18 incident analysis: + // >1 rows currently match personalCurrentSubscriptionWhere (transferred_to + // IS NULL, org-less) AND none of them are alive. If any current ambiguous + // row points at a live instance the guard picks it and returns safely, so + // there's nothing for us to collapse. + const currentAmbiguous = userRows.filter(row => row.transferredToSubscriptionId === null); + if (currentAmbiguous.length < 2) continue; + if (currentAmbiguous.some(row => row.instanceDestroyedAt === null)) continue; + + const pairs = planMultiRowAllDestroyedPairsFromSourceRows(userRows); + // `fullyCollapsible` === true means applying the plan reduces the + // ambiguous set to exactly one tail row. If every candidate target is + // already someone else's predecessor, the plan is empty and + // `fullyCollapsible` will be false — we still surface the user so + // operators and apply can count them as requiring manual review. + const fullyCollapsible = currentAmbiguous.length - pairs.length === 1; + + const tail = userRows[userRows.length - 1]; + if (!tail) continue; + + candidates.push({ + userId, + tailSubscriptionId: tail.subscriptionId, + tailCreatedAt: tail.subscriptionCreatedAt, + currentAmbiguousRowCount: currentAmbiguous.length, + pairs, + fullyCollapsible, + }); + } + + return candidates; +} + +// Sequential-chain pair planner shared by preview and both apply paths. +// Walks rows oldest-to-newest, pairing each non-transferred row with its +// immediately-next row as successor, skipping pairs whose target is already +// someone else's predecessor (UQ_kiloclaw_subscriptions_transferred_to). +// The plan collapses the user's ambiguous set when +// `currentAmbiguousCount - pairs.length === 1`. Callers must check that +// condition before applying — partial plans leave the user still ambiguous. +function planMultiRowAllDestroyedPairsFromSourceRows( + userRows: MultiRowAllDestroyedSourceRow[] +): MultiRowAllDestroyedPair[] { + const existingPredecessorTargets = new Set( + userRows.map(row => row.transferredToSubscriptionId).filter((id): id is string => id !== null) + ); + + const pairs: MultiRowAllDestroyedPair[] = []; + for (let index = 0; index < userRows.length - 1; index += 1) { + const source = userRows[index]; + const next = userRows[index + 1]; + if (!source || !next) continue; + if (source.transferredToSubscriptionId !== null) continue; + if (existingPredecessorTargets.has(next.subscriptionId)) continue; + pairs.push({ + sourceId: source.subscriptionId, + targetId: next.subscriptionId, + sourceCreatedAt: source.subscriptionCreatedAt, + targetCreatedAt: next.subscriptionCreatedAt, + }); + existingPredecessorTargets.add(next.subscriptionId); + } + return pairs; +} + +type MultiRowJoinedRow = { + subscription: KiloClawSubscription; + instanceDestroyedAt: string | null; +}; + +// Variant of the planner that works on full joined rows (subscription + +// instance destroyed_at) so apply paths can reuse each `before` row object +// directly for change-log snapshots without a per-pair SELECT. The +// collapsibility rule is the same: `ambiguous - pairs === 1` to fully +// collapse. +function planMultiRowAllDestroyedPairsFromJoinedRows( + joinedRows: MultiRowJoinedRow[] +): Array<{ before: KiloClawSubscription; targetId: string }> { + const existingPredecessorTargets = new Set( + joinedRows + .map(row => row.subscription.transferred_to_subscription_id) + .filter((id): id is string => id !== null) + ); + + const pairs: Array<{ before: KiloClawSubscription; targetId: string }> = []; + for (let index = 0; index < joinedRows.length - 1; index += 1) { + const source = joinedRows[index]?.subscription; + const next = joinedRows[index + 1]?.subscription; + if (!source || !next) continue; + if (source.transferred_to_subscription_id !== null) continue; + if (existingPredecessorTargets.has(next.id)) continue; + pairs.push({ before: source, targetId: next.id }); + existingPredecessorTargets.add(next.id); + } + return pairs; +} + +function summarizeMultiRowAllDestroyedCandidates(candidates: MultiRowAllDestroyedCandidate[]) { + const collapsible = candidates.filter(candidate => candidate.fullyCollapsible); + const nonCollapsible = candidates.filter(candidate => !candidate.fullyCollapsible); + const totalPairsToWrite = collapsible.reduce((acc, row) => acc + row.pairs.length, 0); + const buckets = new Map(); + for (const candidate of collapsible) { + const userRowCount = candidate.currentAmbiguousRowCount; + buckets.set(userRowCount, (buckets.get(userRowCount) ?? 0) + 1); + } + return { + collapsibleUsers: collapsible.length, + nonCollapsibleUsers: nonCollapsible.length, + totalPairsToWrite, + distribution: [...buckets.entries()] + .sort(([a], [b]) => a - b) + .map(([currentAmbiguousRowCount, users]) => ({ + currentAmbiguousRowCount, + users, + })), + }; +} + +async function previewMultiRowAllDestroyedCollapse() { + const candidates = await buildMultiRowAllDestroyedCandidates(); + const summary = summarizeMultiRowAllDestroyedCandidates(candidates); + const nonCollapsible = candidates.filter(candidate => !candidate.fullyCollapsible); + + console.log('\nmulti-row-all-destroyed preview'); + console.table([ + { metric: 'users_collapsible', count: summary.collapsibleUsers }, + { metric: 'users_non_collapsible', count: summary.nonCollapsibleUsers }, + { metric: 'pairs_to_write', count: summary.totalPairsToWrite }, + ]); + printSection( + 'Collapsible user ambiguous-row-count distribution (rows matching the guard before collapse)', + summary.distribution + ); + printSection( + 'Sample collapsible candidates (top 25 by pair count desc)', + candidates + .filter(candidate => candidate.fullyCollapsible) + .sort((a, b) => b.pairs.length - a.pairs.length) + .slice(0, 25) + .map(candidate => ({ + userId: candidate.userId, + tailSubscriptionId: candidate.tailSubscriptionId, + tailCreatedAt: candidate.tailCreatedAt, + currentAmbiguousRows: candidate.currentAmbiguousRowCount, + pairsToWrite: candidate.pairs.length, + oldestSourceCreatedAt: candidate.pairs[0]?.sourceCreatedAt ?? null, + })) + ); + if (nonCollapsible.length > 0) { + printSection( + 'Non-collapsible users (pre-existing partial chain blocks full collapse; require manual review)', + nonCollapsible.slice(0, 25).map(candidate => ({ + userId: candidate.userId, + currentAmbiguousRows: candidate.currentAmbiguousRowCount, + pairsPlanned: candidate.pairs.length, + ambiguousRowsRemainingAfterPlan: + candidate.currentAmbiguousRowCount - candidate.pairs.length, + })) + ); + } +} + +async function applyMultiRowAllDestroyedCollapseRow( + candidate: MultiRowAllDestroyedCandidate +): Promise<{ outcome: MultiRowAllDestroyedOutcome; pairsWritten: number }> { + return await db.transaction(async tx => { + // Re-read the user's full personal history inside the tx and rebuild the + // pair plan from scratch. This keeps us safe under reprovision/transfer + // races between preview and apply (new rows arrived, some rows got + // transferred out of band, etc.) without needing explicit row locks. The + // cost is a small duplication of the builder logic on a single user. + // + // The SELECT pulls all columns (not a narrow projection) so the same row + // objects serve both as the re-verification dataset and as the `before` + // snapshots for the change-log — no per-pair SELECT roundtrip needed. + const joinedRows = await tx + .select({ + subscription: kiloclaw_subscriptions, + instanceDestroyedAt: kiloclaw_instances.destroyed_at, + }) + .from(kiloclaw_subscriptions) + .innerJoin(kiloclaw_instances, eq(kiloclaw_instances.id, kiloclaw_subscriptions.instance_id)) + .where( + and( + eq(kiloclaw_subscriptions.user_id, candidate.userId), + eq(kiloclaw_instances.user_id, kiloclaw_subscriptions.user_id), + isNull(kiloclaw_instances.organization_id) + ) + ) + .orderBy(asc(kiloclaw_subscriptions.created_at), asc(kiloclaw_subscriptions.id)); + + const currentAmbiguous = joinedRows.filter( + row => row.subscription.transferred_to_subscription_id === null + ); + if (currentAmbiguous.length < 2) { + return { outcome: 'skipped_no_longer_ambiguous' as const, pairsWritten: 0 }; + } + if (currentAmbiguous.some(row => row.instanceDestroyedAt === null)) { + // A new live instance was provisioned between preview and apply — guard + // will pick that row at runtime. No collapse needed. + return { outcome: 'skipped_no_longer_ambiguous' as const, pairsWritten: 0 }; + } + + const plannedPairs = planMultiRowAllDestroyedPairsFromJoinedRows(joinedRows); + + // A partial plan would replace the guard-firing shape with a different + // guard-firing shape and obscure the broken state in the change log. + // Require that applying the plan reduces the ambiguous set to exactly one + // tail row before writing anything. This also covers the degenerate + // `plannedPairs.length === 0` case: `currentAmbiguous.length >= 2` by + // guard above, so zero pairs cannot satisfy `- plannedPairs.length === 1`. + if (currentAmbiguous.length - plannedPairs.length !== 1) { + return { outcome: 'skipped_not_collapsible' as const, pairsWritten: 0 }; + } + + // Apply each UPDATE individually but capture the after-row for batched + // change-log writes at the end of the user. Doing the UPDATEs in a + // single bulk-statement (UPDATE ... FROM VALUES) would save another + // N-1 roundtrips per user but would require raw SQL and a driver- + // specific RETURNING-row shape; the per-pair update is easy to reason + // about, keeps drizzle's types intact, and the big win here is already + // the dropped per-pair before-SELECT plus the batched change-log INSERT. + // + // `updated_at` is auto-updated by the schema's $onUpdateFn, so we never + // need to stamp it manually. + const appliedPairs: Array<{ before: KiloClawSubscription; after: KiloClawSubscription }> = []; + for (const pair of plannedPairs) { + const [after] = await tx + .update(kiloclaw_subscriptions) + .set({ transferred_to_subscription_id: pair.targetId }) + .where( + and( + eq(kiloclaw_subscriptions.id, pair.before.id), + isNull(kiloclaw_subscriptions.transferred_to_subscription_id) + ) + ) + .returning(); + + if (!after) { + // Source was concurrently transferred by another process. Fine — the + // row is no longer ambiguous. Move on. + continue; + } + + appliedPairs.push({ before: pair.before, after }); + } + + if (appliedPairs.length === 0) { + return { outcome: 'skipped_race' as const, pairsWritten: 0 }; + } + + // Single bulk INSERT for the whole user's change-log rows instead of + // one INSERT per pair. This mirrors the pattern in + // applyMissingPersonalDestroyedTerminalBulk. + await tx.insert(kiloclaw_subscription_change_log).values( + appliedPairs.map(({ before, after }) => ({ + subscription_id: after.id, + actor_type: ALIGNMENT_SCRIPT_ACTOR.actorType, + actor_id: ALIGNMENT_SCRIPT_ACTOR.actorId, + action: 'reassigned' as const, + reason: 'apply_multi_row_all_destroyed_collapse', + before_state: serializeKiloClawSubscriptionSnapshot(before), + after_state: serializeKiloClawSubscriptionSnapshot(after), + })) + ); + + return { outcome: 'collapsed' as const, pairsWritten: appliedPairs.length }; + }); +} + +async function applyMultiRowAllDestroyedCollapseChunkBulk(params: { + chunk: MultiRowAllDestroyedCandidate[]; +}): Promise<{ + usersCollapsed: number; + pairsWritten: number; + usersWithNoWork: number; + usersNotCollapsible: number; +}> { + return await db.transaction(async tx => { + const userIds = params.chunk.map(candidate => candidate.userId); + + // 1. Single multi-user SELECT: every personal-instance row for every user + // in the chunk. Grouped in memory below so pair-planning is identical + // to the per-user path. + const joinedRows = await tx + .select({ + subscription: kiloclaw_subscriptions, + instanceDestroyedAt: kiloclaw_instances.destroyed_at, + }) + .from(kiloclaw_subscriptions) + .innerJoin(kiloclaw_instances, eq(kiloclaw_instances.id, kiloclaw_subscriptions.instance_id)) + .where( + and( + inArray(kiloclaw_subscriptions.user_id, userIds), + eq(kiloclaw_instances.user_id, kiloclaw_subscriptions.user_id), + isNull(kiloclaw_instances.organization_id) + ) + ) + .orderBy( + asc(kiloclaw_subscriptions.user_id), + asc(kiloclaw_subscriptions.created_at), + asc(kiloclaw_subscriptions.id) + ); + + const byUser = new Map(); + for (const row of joinedRows) { + const uid = row.subscription.user_id; + const existing = byUser.get(uid); + if (existing) { + existing.push(row); + } else { + byUser.set(uid, [row]); + } + } + + // 2. Build every user's pair plan. Re-verifies the guard-firing invariant + // inside the tx (just like the per-user path). `allPlannedPairs` keeps + // the user id alongside each pair so we can tally per-user outcomes + // after the RETURNING set comes back. + type PlannedPair = { + userId: string; + before: KiloClawSubscription; + targetId: string; + }; + const allPlannedPairs: PlannedPair[] = []; + let usersWithNoWork = 0; + let usersNotCollapsible = 0; + + for (const candidate of params.chunk) { + const userRows = byUser.get(candidate.userId); + if (!userRows || userRows.length < 2) { + usersWithNoWork += 1; + continue; + } + const currentAmbiguous = userRows.filter( + row => row.subscription.transferred_to_subscription_id === null + ); + if (currentAmbiguous.length < 2) { + usersWithNoWork += 1; + continue; + } + if (currentAmbiguous.some(row => row.instanceDestroyedAt === null)) { + usersWithNoWork += 1; + continue; + } + + const plannedPairs = planMultiRowAllDestroyedPairsFromJoinedRows(userRows); + + // Skip non-collapsible users: writing their partial chain would replace + // one guard-firing shape with another and still leave the user broken. + // `plannedPairs.length === 0` is also caught here because + // `currentAmbiguous.length >= 2` above (zero pairs would leave `!== 1`). + if (currentAmbiguous.length - plannedPairs.length !== 1) { + usersNotCollapsible += 1; + continue; + } + + for (const pair of plannedPairs) { + allPlannedPairs.push({ + userId: candidate.userId, + before: pair.before, + targetId: pair.targetId, + }); + } + } + + if (allPlannedPairs.length === 0) { + return { usersCollapsed: 0, pairsWritten: 0, usersWithNoWork, usersNotCollapsible }; + } + + // 3. One raw-SQL bulk UPDATE for every pair in the chunk. FROM VALUES is + // the cleanest Postgres pattern for this — each `(source_id, target_id)` + // tuple drives one UPDATE row. The `transferred_to_subscription_id IS NULL` + // guard in the WHERE clause provides optimistic-concurrency against + // external writes; rows that raced are silently dropped from the + // RETURNING set. `updated_at` is auto-stamped by the schema's + // $onUpdateFn. + const valuesFragments = allPlannedPairs.map( + pair => sql`(${pair.before.id}::uuid, ${pair.targetId}::uuid)` + ); + + const { rows: updatedRows } = await tx.execute<{ + id: string; + updated_at: string; + }>(sql` + UPDATE ${kiloclaw_subscriptions} AS s + SET transferred_to_subscription_id = v.target_id + FROM (VALUES ${sql.join(valuesFragments, sql`, `)}) AS v(source_id, target_id) + WHERE s.id = v.source_id + AND s.transferred_to_subscription_id IS NULL + RETURNING s.id AS id, s.updated_at AS updated_at + `); + + if (updatedRows.length === 0) { + return { usersCollapsed: 0, pairsWritten: 0, usersWithNoWork, usersNotCollapsible }; + } + + // 4. Reconstruct the `after` snapshot for each updated row from the + // in-memory before + planned target id + fresh updated_at stamp. + // Cheaper than returning every column from Postgres. + const plannedById = new Map(allPlannedPairs.map(pair => [pair.before.id, pair])); + + const changeLogRows = updatedRows.flatMap(updated => { + const planned = plannedById.get(updated.id); + if (!planned) return []; + const after: KiloClawSubscription = { + ...planned.before, + transferred_to_subscription_id: planned.targetId, + updated_at: updated.updated_at, + }; + return [ + { + subscription_id: updated.id, + actor_type: ALIGNMENT_SCRIPT_ACTOR.actorType, + actor_id: ALIGNMENT_SCRIPT_ACTOR.actorId, + action: 'reassigned' as const, + reason: 'apply_multi_row_all_destroyed_collapse', + before_state: serializeKiloClawSubscriptionSnapshot(planned.before), + after_state: serializeKiloClawSubscriptionSnapshot(after), + }, + ]; + }); + + // 5. One bulk INSERT for every change-log row in the chunk. + if (changeLogRows.length > 0) { + await tx.insert(kiloclaw_subscription_change_log).values(changeLogRows); + } + + // A user counts as collapsed if at least one of their planned pairs + // actually landed in the RETURNING set. Users whose planned pairs all + // raced (should be vanishingly rare at this scale) are left uncounted + // here and will be re-checked by the final preview step. + const touchedSourceIds = new Set(updatedRows.map(row => row.id)); + const collapsedUserIds = new Set(); + for (const pair of allPlannedPairs) { + if (touchedSourceIds.has(pair.before.id)) { + collapsedUserIds.add(pair.userId); + } + } + + return { + usersCollapsed: collapsedUserIds.size, + pairsWritten: updatedRows.length, + usersWithNoWork, + usersNotCollapsible, + }; + }); +} + +async function applyMultiRowAllDestroyedCollapse(options: ApplyOptions) { + const allCandidates = await buildMultiRowAllDestroyedCandidates(); + const candidates = allCandidates.filter(candidate => candidate.fullyCollapsible); + const nonCollapsibleCandidates = allCandidates.filter(candidate => !candidate.fullyCollapsible); + const totalPairs = candidates.reduce((acc, row) => acc + row.pairs.length, 0); + + console.log( + `\nmulti-row-all-destroyed apply: ${candidates.length} collapsible users, ${totalPairs} pairs to write` + ); + if (nonCollapsibleCandidates.length > 0) { + console.log( + `[skip] ${nonCollapsibleCandidates.length} user(s) non-collapsible (pre-existing partial chain); require manual review` + ); + } + if (options.bulk) { + console.log( + '[bulk] enabled; processing ~250 users per transaction, per-user fallback on chunk failure' + ); + } + + let usersCollapsed = 0; + let pairsWritten = 0; + let usersWithNoWork = 0; + const skipped: Array<{ userId: string; reason: string; error?: string }> = [ + ...nonCollapsibleCandidates.map(candidate => ({ + userId: candidate.userId, + reason: 'skipped_not_collapsible', + })), + ]; + const perUserFallback: MultiRowAllDestroyedCandidate[] = []; + const startedAt = Date.now(); + + if (options.bulk) { + const bulkChunkSize = 250; + let processed = 0; + + for (const chunk of chunkArray(candidates, bulkChunkSize)) { + try { + const result = await applyMultiRowAllDestroyedCollapseChunkBulk({ chunk }); + usersCollapsed += result.usersCollapsed; + pairsWritten += result.pairsWritten; + usersWithNoWork += result.usersWithNoWork; + // `usersNotCollapsible` from the bulk result counts users whose + // apply-time re-plan came back non-collapsible after the preview + // showed them as collapsible (race with a concurrent chain write). + // Surface them as skipped without an individual user id, since the + // bulk path doesn't carry per-user identity out of the tx. + for (let i = 0; i < result.usersNotCollapsible; i += 1) { + skipped.push({ userId: '(bulk chunk)', reason: 'skipped_not_collapsible' }); + } + } catch (error) { + // One bad row (e.g. a UQ_kiloclaw_subscriptions_transferred_to race + // from a concurrent webhook / reprovision) poisons the entire chunk + // tx. Fall back to per-user for the chunk so the rest of the fleet + // isn't held hostage by one user. + console.error('[bulk-chunk-fail] falling back to per-user for chunk', { + chunkSize: chunk.length, + firstUserId: chunk[0]?.userId, + lastUserId: chunk[chunk.length - 1]?.userId, + error: describeError(error), + }); + perUserFallback.push(...chunk); + } + + processed += chunk.length; + logApplyProgress({ + label: 'apply-multi-row-all-destroyed-bulk', + processed, + total: candidates.length, + startedAt, + every: 1, + }); + } + } + + const perUserQueue = options.bulk ? perUserFallback : candidates; + if (options.bulk && perUserFallback.length > 0) { + console.log( + `[bulk] ${perUserFallback.length} user(s) routed to per-user fallback after chunk failure` + ); + } + + for (const [index, candidate] of perUserQueue.entries()) { + try { + const result = await applyMultiRowAllDestroyedCollapseRow(candidate); + if (result.outcome === 'collapsed') { + usersCollapsed += 1; + pairsWritten += result.pairsWritten; + } else { + skipped.push({ userId: candidate.userId, reason: result.outcome }); + } + } catch (error) { + console.error('multi-row-all-destroyed apply row failed', { + userId: candidate.userId, + tailSubscriptionId: candidate.tailSubscriptionId, + plannedPairs: candidate.pairs.length, + error: describeError(error), + }); + skipped.push({ + userId: candidate.userId, + reason: 'error', + error: describeError(error), + }); + } + + logApplyProgress({ + label: options.bulk + ? 'apply-multi-row-all-destroyed-per-user-fallback' + : 'apply-multi-row-all-destroyed', + processed: index + 1, + total: perUserQueue.length, + startedAt, + every: 50, + }); + } + + // Derive the non-collapsible metric from the skipped set so it includes + // preview-time detection, bulk in-tx re-plan races, and per-user in-tx + // re-plan races. Counting only `nonCollapsibleCandidates.length` would + // hide users whose plan was valid at preview but raced with a concurrent + // chain write during apply. + const usersSkippedNotCollapsible = skipped.filter( + row => row.reason === 'skipped_not_collapsible' + ).length; + + console.log('\nmulti-row-all-destroyed apply results'); + console.table([ + { metric: 'users_collapsed', count: usersCollapsed }, + { metric: 'pairs_written', count: pairsWritten }, + { metric: 'users_skipped_no_work', count: usersWithNoWork }, + { metric: 'users_skipped_not_collapsible', count: usersSkippedNotCollapsible }, + { metric: 'users_skipped_total', count: skipped.length }, + ]); + printSection('Users skipped during multi-row-all-destroyed apply', skipped.slice(0, 50)); +} + type OrgApplyOutcome = OrgBackfillAction | 'skipped'; const ORG_BACKFILL_REASON: Record = { @@ -2128,8 +3096,9 @@ async function applyOrgBackfill() { action: string; error?: string; }> = []; + const startedAt = Date.now(); - for (const row of rows) { + for (const [index, row] of rows.entries()) { let outcome: OrgApplyOutcome; try { outcome = await applyOrgBackfillRow(row); @@ -2148,6 +3117,13 @@ async function applyOrgBackfill() { action: row.action, error: describeError(error), }); + logApplyProgress({ + label: 'apply-org', + processed: index + 1, + total: rows.length, + startedAt, + every: 50, + }); continue; } @@ -2161,6 +3137,14 @@ async function applyOrgBackfill() { } else { counts[outcome] += 1; } + + logApplyProgress({ + label: 'apply-org', + processed: index + 1, + total: rows.length, + startedAt, + every: 50, + }); } console.log('\nOrg backfill results'); @@ -2179,6 +3163,7 @@ async function applyOrgBackfill() { type ApplyOptions = { confirmSandboxesDestroyed: boolean; + bulk: boolean; }; function parseMode(inputMode?: string): Mode { @@ -2194,6 +3179,8 @@ function parseMode(inputMode?: string): Mode { case 'apply-org': case 'preview-changelog-baseline': case 'apply-changelog-baseline': + case 'preview-multi-row-all-destroyed': + case 'apply-multi-row-all-destroyed': return mode; default: throw new Error(`Unsupported mode: ${inputMode}`); @@ -2203,6 +3190,7 @@ function parseMode(inputMode?: string): Mode { function parseApplyOptions(args: string[]): ApplyOptions { return { confirmSandboxesDestroyed: args.includes('--confirm-sandboxes-destroyed'), + bulk: args.includes('--bulk'), }; } @@ -2217,6 +3205,8 @@ const singleModeHandlers: Partial> = { 'apply-org': applyOrgBackfill, 'preview-changelog-baseline': previewChangelogBaselineBackfill, 'apply-changelog-baseline': applyChangelogBaselineBackfill, + 'preview-multi-row-all-destroyed': previewMultiRowAllDestroyedCollapse, + 'apply-multi-row-all-destroyed': applyMultiRowAllDestroyedCollapse, }; export async function run(...args: string[]) { @@ -2325,7 +3315,8 @@ export async function run(...args: string[]) { let repaired = 0; const failures: Array<{ subscriptionId: string; userId: string; error: string }> = []; - for (const row of repairable) { + const startedAt = Date.now(); + for (const [index, row] of repairable.entries()) { if (!row.targetInstanceId) continue; const targetInstanceId = row.targetInstanceId; try { @@ -2373,6 +3364,14 @@ export async function run(...args: string[]) { error: describeError(error), }); } + + logApplyProgress({ + label: 'repair-detached', + processed: index + 1, + total: repairable.length, + startedAt, + every: 25, + }); } console.log(`\nDetached subscriptions repaired: ${repaired}`);