Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions src/core/task-persistence/__tests__/apiMessages.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// cd src && npx vitest run core/task-persistence/__tests__/apiMessages.spec.ts

import * as os from "os"
import * as path from "path"
import * as fs from "fs/promises"

import { readApiMessages } from "../apiMessages"

let tmpBaseDir: string

beforeEach(async () => {
tmpBaseDir = await fs.mkdtemp(path.join(os.tmpdir(), "roo-test-api-"))
})

describe("apiMessages.readApiMessages", () => {
it("returns empty array when api_conversation_history.json contains invalid JSON", async () => {
const taskId = "task-corrupt-api"
const taskDir = path.join(tmpBaseDir, "tasks", taskId)
await fs.mkdir(taskDir, { recursive: true })
const filePath = path.join(taskDir, "api_conversation_history.json")
await fs.writeFile(filePath, "<<<corrupt data>>>", "utf8")

const result = await readApiMessages({
taskId,
globalStoragePath: tmpBaseDir,
})

expect(result).toEqual([])
})

it("returns empty array when claude_messages.json fallback contains invalid JSON", async () => {
const taskId = "task-corrupt-fallback"
const taskDir = path.join(tmpBaseDir, "tasks", taskId)
await fs.mkdir(taskDir, { recursive: true })

// Only write the old fallback file (claude_messages.json), NOT the new one
const oldPath = path.join(taskDir, "claude_messages.json")
await fs.writeFile(oldPath, "not json at all {[!", "utf8")

const result = await readApiMessages({
taskId,
globalStoragePath: tmpBaseDir,
})

expect(result).toEqual([])

// The corrupted fallback file should NOT be deleted
const stillExists = await fs
.access(oldPath)
.then(() => true)
.catch(() => false)
expect(stillExists).toBe(true)
})

it("returns [] when file contains valid JSON that is not an array", async () => {
const taskId = "task-non-array-api"
const taskDir = path.join(tmpBaseDir, "tasks", taskId)
await fs.mkdir(taskDir, { recursive: true })
const filePath = path.join(taskDir, "api_conversation_history.json")
await fs.writeFile(filePath, JSON.stringify("hello"), "utf8")

const result = await readApiMessages({
taskId,
globalStoragePath: tmpBaseDir,
})

expect(result).toEqual([])
})

it("returns [] when fallback file contains valid JSON that is not an array", async () => {
const taskId = "task-non-array-fallback"
const taskDir = path.join(tmpBaseDir, "tasks", taskId)
await fs.mkdir(taskDir, { recursive: true })

// Only write the old fallback file, NOT the new one
const oldPath = path.join(taskDir, "claude_messages.json")
await fs.writeFile(oldPath, JSON.stringify({ key: "value" }), "utf8")

const result = await readApiMessages({
taskId,
globalStoragePath: tmpBaseDir,
})

expect(result).toEqual([])
})
})
35 changes: 34 additions & 1 deletion src/core/task-persistence/__tests__/taskMessages.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ vi.mock("../../../utils/safeWriteJson", () => ({
}))

// Import after mocks
import { saveTaskMessages } from "../taskMessages"
import { saveTaskMessages, readTaskMessages } from "../taskMessages"

let tmpBaseDir: string

Expand Down Expand Up @@ -66,3 +66,36 @@ describe("taskMessages.saveTaskMessages", () => {
expect(persisted).toEqual(messages)
})
})

describe("taskMessages.readTaskMessages", () => {
it("returns empty array when file contains invalid JSON", async () => {
const taskId = "task-corrupt-json"
// Manually create the task directory and write corrupted JSON
const taskDir = path.join(tmpBaseDir, "tasks", taskId)
await fs.mkdir(taskDir, { recursive: true })
const filePath = path.join(taskDir, "ui_messages.json")
await fs.writeFile(filePath, "{not valid json!!!", "utf8")

const result = await readTaskMessages({
taskId,
globalStoragePath: tmpBaseDir,
})

expect(result).toEqual([])
})

it("returns [] when file contains valid JSON that is not an array", async () => {
const taskId = "task-non-array-json"
const taskDir = path.join(tmpBaseDir, "tasks", taskId)
await fs.mkdir(taskDir, { recursive: true })
const filePath = path.join(taskDir, "ui_messages.json")
await fs.writeFile(filePath, JSON.stringify("hello"), "utf8")

const result = await readTaskMessages({
taskId,
globalStoragePath: tmpBaseDir,
})

expect(result).toEqual([])
})
})
30 changes: 21 additions & 9 deletions src/core/task-persistence/apiMessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,23 @@ export async function readApiMessages({
const fileContent = await fs.readFile(filePath, "utf8")
try {
const parsedData = JSON.parse(fileContent)
if (Array.isArray(parsedData) && parsedData.length === 0) {
if (!Array.isArray(parsedData)) {
console.warn(
`[readApiMessages] Parsed data is not an array (got ${typeof parsedData}), returning empty. TaskId: ${taskId}, Path: ${filePath}`,
)
return []
}
if (parsedData.length === 0) {
console.error(
`[Roo-Debug] readApiMessages: Found API conversation history file, but it's empty (parsed as []). TaskId: ${taskId}, Path: ${filePath}`,
)
}
return parsedData
} catch (error) {
console.error(
`[Roo-Debug] readApiMessages: Error parsing API conversation history file. TaskId: ${taskId}, Path: ${filePath}, Error: ${error}`,
console.warn(
`[readApiMessages] Error parsing API conversation history file, returning empty. TaskId: ${taskId}, Path: ${filePath}, Error: ${error}`,
)
throw error
return []
}
} else {
const oldPath = path.join(taskDir, "claude_messages.json")
Expand All @@ -70,19 +76,25 @@ export async function readApiMessages({
const fileContent = await fs.readFile(oldPath, "utf8")
try {
const parsedData = JSON.parse(fileContent)
if (Array.isArray(parsedData) && parsedData.length === 0) {
if (!Array.isArray(parsedData)) {
console.warn(
`[readApiMessages] Parsed OLD data is not an array (got ${typeof parsedData}), returning empty. TaskId: ${taskId}, Path: ${oldPath}`,
)
return []
}
if (parsedData.length === 0) {
console.error(
`[Roo-Debug] readApiMessages: Found OLD API conversation history file (claude_messages.json), but it's empty (parsed as []). TaskId: ${taskId}, Path: ${oldPath}`,
)
}
await fs.unlink(oldPath)
return parsedData
} catch (error) {
console.error(
`[Roo-Debug] readApiMessages: Error parsing OLD API conversation history file (claude_messages.json). TaskId: ${taskId}, Path: ${oldPath}, Error: ${error}`,
console.warn(
`[readApiMessages] Error parsing OLD API conversation history file (claude_messages.json), returning empty. TaskId: ${taskId}, Path: ${oldPath}, Error: ${error}`,
)
// DO NOT unlink oldPath if parsing failed, throw error instead.
throw error
// DO NOT unlink oldPath if parsing failed.
return []
}
}
}
Expand Down
16 changes: 15 additions & 1 deletion src/core/task-persistence/taskMessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,21 @@ export async function readTaskMessages({
const fileExists = await fileExistsAtPath(filePath)

if (fileExists) {
return JSON.parse(await fs.readFile(filePath, "utf8"))
try {
const parsedData = JSON.parse(await fs.readFile(filePath, "utf8"))
if (!Array.isArray(parsedData)) {
console.warn(
`[readTaskMessages] Parsed data is not an array (got ${typeof parsedData}), returning empty. TaskId: ${taskId}, Path: ${filePath}`,
)
return []
}
return parsedData
} catch (error) {
console.warn(
`[readTaskMessages] Failed to parse ${filePath} for task ${taskId}, returning empty: ${error instanceof Error ? error.message : String(error)}`,
)
return []
}
}

return []
Expand Down
57 changes: 46 additions & 11 deletions src/core/task/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1203,10 +1203,10 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
* tools execute (added in recursivelyMakeClineRequests after streaming completes).
* So we usually only need to flush the pending user message with tool_results.
*/
public async flushPendingToolResultsToHistory(): Promise<void> {
public async flushPendingToolResultsToHistory(): Promise<boolean> {
// Only flush if there's actually pending content to save
if (this.userMessageContent.length === 0) {
return
return true
}

// CRITICAL: Wait for the assistant message to be saved to API history first.
Expand Down Expand Up @@ -1236,7 +1236,7 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {

// If task was aborted while waiting, don't flush
if (this.abort) {
return
return false
}

// Save the user message with tool_result blocks
Expand All @@ -1253,25 +1253,58 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
const userMessageWithTs = { ...validatedMessage, ts: Date.now() }
this.apiConversationHistory.push(userMessageWithTs as ApiMessage)

await this.saveApiConversationHistory()
const saved = await this.saveApiConversationHistory()

if (saved) {
// Clear the pending content since it's now saved
this.userMessageContent = []
} else {
console.warn(
`[Task#${this.taskId}] flushPendingToolResultsToHistory: save failed, retaining pending tool results in memory`,
)
}

// Clear the pending content since it's now saved
this.userMessageContent = []
return saved
}

private async saveApiConversationHistory() {
private async saveApiConversationHistory(): Promise<boolean> {
try {
await saveApiMessages({
messages: this.apiConversationHistory,
messages: structuredClone(this.apiConversationHistory),
taskId: this.taskId,
globalStoragePath: this.globalStoragePath,
})
return true
} catch (error) {
// In the off chance this fails, we don't want to stop the task.
console.error("Failed to save API conversation history:", error)
return false
}
}

/**
* Public wrapper to retry saving the API conversation history.
* Uses exponential backoff: up to 3 attempts with delays of 100 ms, 500 ms, 1500 ms.
* Used by delegation flow when flushPendingToolResultsToHistory reports failure.
*/
public async retrySaveApiConversationHistory(): Promise<boolean> {
const delays = [100, 500, 1500]

for (let attempt = 0; attempt < delays.length; attempt++) {
await new Promise<void>((resolve) => setTimeout(resolve, delays[attempt]))
console.warn(
`[Task#${this.taskId}] retrySaveApiConversationHistory: retry attempt ${attempt + 1}/${delays.length}`,
)

const success = await this.saveApiConversationHistory()

if (success) {
return true
}
}

return false
}

// Cline Messages

private async getSavedClineMessages(): Promise<ClineMessage[]> {
Expand Down Expand Up @@ -1333,10 +1366,10 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
}
}

private async saveClineMessages() {
private async saveClineMessages(): Promise<boolean> {
try {
await saveTaskMessages({
messages: this.clineMessages,
messages: structuredClone(this.clineMessages),
taskId: this.taskId,
globalStoragePath: this.globalStoragePath,
})
Expand Down Expand Up @@ -1366,8 +1399,10 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
this.debouncedEmitTokenUsage(tokenUsage, this.toolUsage)

await this.providerRef.deref()?.updateTaskHistory(historyItem)
return true
} catch (error) {
console.error("Failed to save Roo messages:", error)
return false
}
}

Expand Down
Loading
Loading