diff --git a/services/cloud-agent-next/src/persistence/CloudAgentSession.ts b/services/cloud-agent-next/src/persistence/CloudAgentSession.ts index 10151894b..b7f98c5c2 100644 --- a/services/cloud-agent-next/src/persistence/CloudAgentSession.ts +++ b/services/cloud-agent-next/src/persistence/CloudAgentSession.ts @@ -36,6 +36,7 @@ import type { ExecutionMetadata, AddExecutionParams, UpdateExecutionStatusParams, + LatestAssistantMessage, } from '../session/types.js'; import type { ExecutionStatus } from '../core/execution.js'; import type { Result } from '../lib/result.js'; @@ -581,6 +582,13 @@ export class CloudAgentSession extends DurableObject { return metadata || null; } + async getLatestAssistantMessage(): Promise { + const sessionId = await this.requireSessionId(); + const metadata = await this.getMetadata(); + if (!metadata?.kiloSessionId) return null; + return this.eventQueries.getLatestAssistantMessage(sessionId, metadata.kiloSessionId); + } + /** * Update session metadata with validation. * Throws an error if validation fails. diff --git a/services/cloud-agent-next/src/router.test.ts b/services/cloud-agent-next/src/router.test.ts index 131fbfcd5..2a6301968 100644 --- a/services/cloud-agent-next/src/router.test.ts +++ b/services/cloud-agent-next/src/router.test.ts @@ -55,8 +55,12 @@ import type { TRPCContext, SessionId } from './types.js'; import type { CloudAgentSessionState } from './persistence/types.js'; type MockSessionStub = { - deleteSession: ReturnType; - markAsInterrupted: ReturnType; + deleteSession?: ReturnType; + markAsInterrupted?: ReturnType; + getMetadata?: ReturnType; + getActiveExecutionId?: ReturnType; + getExecution?: ReturnType; + getLatestAssistantMessage?: ReturnType; }; type MockCAS = { @@ -892,5 +896,147 @@ describe('router sessionId validation', () => { }); }); }); + + describe('getLatestAssistantMessage procedure', () => { + let mockContext: TRPCContext; + let caller: ReturnType; + let cloudAgentSession: MockCAS; + let mockGetMetadata: ReturnType; + let mockGetLatestAssistantMessage: ReturnType; + + beforeEach(() => { + vi.clearAllMocks(); + + mockGetMetadata = vi.fn(); + mockGetLatestAssistantMessage = vi.fn(); + + mockContext = { + userId: 'test-user-123', + authToken: 'test-token', + botId: undefined, + request: {} as Request, + env: { + Sandbox: {} as TRPCContext['env']['Sandbox'], + SandboxSmall: {} as TRPCContext['env']['SandboxSmall'], + CLOUD_AGENT_SESSION: { + idFromName: vi.fn((id: string) => ({ id })), + get: vi.fn(() => ({ + getMetadata: mockGetMetadata, + getLatestAssistantMessage: mockGetLatestAssistantMessage, + })), + } as unknown as TRPCContext['env']['CLOUD_AGENT_SESSION'], + SESSION_INGEST: { + fetch: vi.fn(), + } as unknown as TRPCContext['env']['SESSION_INGEST'], + R2_BUCKET: {} as TRPCContext['env']['R2_BUCKET'], + NEXTAUTH_SECRET: 'test-secret', + INTERNAL_API_SECRET_PROD: { + get: vi.fn().mockResolvedValue('test-secret'), + } as unknown as TRPCContext['env']['INTERNAL_API_SECRET_PROD'], + }, + }; + cloudAgentSession = mockContext.env.CLOUD_AGENT_SESSION as unknown as MockCAS; + caller = appRouter.createCaller(mockContext); + }); + + it('should return the latest assistant message for the owner', async () => { + const sessionId: SessionId = 'agent_55555555-5555-5555-5555-555555555555'; + mockGetMetadata.mockResolvedValue({ + version: 123456789, + sessionId, + userId: 'test-user-123', + timestamp: 123456789, + kiloSessionId: 'ses_00000000000000000000000001', + } satisfies CloudAgentSessionState); + mockGetLatestAssistantMessage.mockResolvedValue({ + eventId: 12, + timestamp: 1700000000000, + info: { + id: 'msg_00000000000000000000000001', + role: 'assistant', + sessionID: 'ses_00000000000000000000000001', + }, + parts: [ + { + id: 'part_00000000000000000000000001', + messageID: 'msg_00000000000000000000000001', + type: 'text', + text: 'Done', + }, + ], + }); + + const result = await caller.getLatestAssistantMessage({ cloudAgentSessionId: sessionId }); + + expect(result).toEqual({ + cloudAgentSessionId: sessionId, + message: { + eventId: 12, + timestamp: 1700000000000, + info: { + id: 'msg_00000000000000000000000001', + role: 'assistant', + sessionID: 'ses_00000000000000000000000001', + }, + parts: [ + { + id: 'part_00000000000000000000000001', + messageID: 'msg_00000000000000000000000001', + type: 'text', + text: 'Done', + }, + ], + }, + }); + expect(cloudAgentSession.idFromName).toHaveBeenCalledWith(`test-user-123:${sessionId}`); + expect(mockGetLatestAssistantMessage).toHaveBeenCalled(); + }); + + it('should return null when the session has no assistant messages', async () => { + const sessionId: SessionId = 'agent_66666666-6666-6666-6666-666666666666'; + mockGetMetadata.mockResolvedValue({ + version: 123456789, + sessionId, + userId: 'test-user-123', + timestamp: 123456789, + kiloSessionId: 'ses_00000000000000000000000001', + } satisfies CloudAgentSessionState); + mockGetLatestAssistantMessage.mockResolvedValue(null); + + await expect( + caller.getLatestAssistantMessage({ cloudAgentSessionId: sessionId }) + ).resolves.toEqual({ + cloudAgentSessionId: sessionId, + message: null, + }); + }); + + it('should return NOT_FOUND for a missing session', async () => { + const sessionId: SessionId = 'agent_77777777-7777-7777-7777-777777777777'; + mockGetMetadata.mockResolvedValue(null); + + await expect( + caller.getLatestAssistantMessage({ cloudAgentSessionId: sessionId }) + ).rejects.toThrow('Session not found'); + expect(mockGetLatestAssistantMessage).not.toHaveBeenCalled(); + }); + + it('should require authentication', async () => { + const unauthenticatedContext: TRPCContext = { + userId: undefined, + authToken: undefined, + botId: undefined, + env: mockContext.env, + } as unknown as TRPCContext; + + const unauthenticatedCaller = appRouter.createCaller(unauthenticatedContext); + + await expect( + unauthenticatedCaller.getLatestAssistantMessage({ + cloudAgentSessionId: 'agent_12345678-1234-1234-1234-123456789abc', + }) + ).rejects.toThrow('Authentication required'); + }); + }); }); }); diff --git a/services/cloud-agent-next/src/router/handlers/session-management.ts b/services/cloud-agent-next/src/router/handlers/session-management.ts index 2e140e177..ddfbab045 100644 --- a/services/cloud-agent-next/src/router/handlers/session-management.ts +++ b/services/cloud-agent-next/src/router/handlers/session-management.ts @@ -14,7 +14,13 @@ import { import { cleanupWorkspace, getSessionWorkspacePath, getSessionHomePath } from '../../workspace.js'; import { withDORetry } from '../../utils/do-retry.js'; import { protectedProcedure, publicProcedure, internalApiProtectedProcedure } from '../auth.js'; -import { sessionIdSchema, GetSessionInput, GetSessionOutput } from '../schemas.js'; +import { + sessionIdSchema, + GetSessionInput, + GetSessionOutput, + GetLatestAssistantMessageInput, + GetLatestAssistantMessageOutput, +} from '../schemas.js'; import { computeExecutionHealth } from '../../core/execution.js'; /** @@ -448,6 +454,43 @@ export function createSessionManagementHandlers() { }); }), + getLatestAssistantMessage: protectedProcedure + .input(GetLatestAssistantMessageInput) + .output(GetLatestAssistantMessageOutput) + .query(async ({ input, ctx }) => { + return withLogTags({ source: 'getLatestAssistantMessage' }, async () => { + const sessionId = input.cloudAgentSessionId as SessionId; + const { userId, env } = ctx; + + logger.setTags({ userId, sessionId }); + logger.info('Fetching latest assistant message'); + + const doKey = `${userId}:${sessionId}`; + const getStub = () => + env.CLOUD_AGENT_SESSION.get(env.CLOUD_AGENT_SESSION.idFromName(doKey)); + + const metadata = await withDORetry(getStub, s => s.getMetadata(), 'getMetadata'); + if (!metadata) { + logger.info('Session not found'); + throw new TRPCError({ + code: 'NOT_FOUND', + message: 'Session not found', + }); + } + + const message = await withDORetry( + getStub, + s => s.getLatestAssistantMessage(), + 'getLatestAssistantMessage' + ); + + return { + cloudAgentSessionId: sessionId, + message, + }; + }); + }), + /** * Get all log files and running processes for a session's sandbox. * diff --git a/services/cloud-agent-next/src/router/schemas.ts b/services/cloud-agent-next/src/router/schemas.ts index 1f74da288..9bdb23f8c 100644 --- a/services/cloud-agent-next/src/router/schemas.ts +++ b/services/cloud-agent-next/src/router/schemas.ts @@ -407,6 +407,38 @@ export const GetSessionOutput = z.object({ export type GetSessionResponse = z.infer; +export const GetLatestAssistantMessageInput = z.object({ + cloudAgentSessionId: sessionIdSchema.describe('Cloud-agent session ID to inspect'), +}); + +export const AssistantMessageInfoSchema = z + .object({ + id: z.string().describe('Assistant message ID'), + role: z.literal('assistant'), + }) + .passthrough(); + +export const AssistantMessagePartSchema = z + .object({ + id: z.string().describe('Message part ID'), + messageID: z.string().describe('Parent message ID'), + }) + .passthrough(); + +export const LatestAssistantMessageSchema = z.object({ + eventId: z.number().describe('Stored event ID for the message.updated event'), + timestamp: z.number().describe('Stored event timestamp in milliseconds'), + info: AssistantMessageInfoSchema, + parts: z.array(AssistantMessagePartSchema), +}); + +export const GetLatestAssistantMessageOutput = z.object({ + cloudAgentSessionId: sessionIdSchema, + message: LatestAssistantMessageSchema.nullable(), +}); + +export type GetLatestAssistantMessageResponse = z.infer; + /** * Response schema for V2 execution endpoints. * Returns acknowledgment when execution has started. diff --git a/services/cloud-agent-next/src/session/queries/events.ts b/services/cloud-agent-next/src/session/queries/events.ts index c88743f34..661167d45 100644 --- a/services/cloud-agent-next/src/session/queries/events.ts +++ b/services/cloud-agent-next/src/session/queries/events.ts @@ -1,12 +1,71 @@ import { count, max, eq, and, gt, gte, lte, lt, inArray, asc } from 'drizzle-orm'; import type { DrizzleSqliteDODatabase } from 'drizzle-orm/durable-sqlite'; +import * as z from 'zod'; import type { StoredEvent } from '../../websocket/types.js'; import type { EventId } from '../../types/ids.js'; +import type { AssistantMessagePart, LatestAssistantMessage } from '../types.js'; import { events } from '../../db/sqlite-schema.js'; import type { SQL } from 'drizzle-orm'; type SqlStorage = DurableObjectState['storage']['sql']; +const storedEventRowSchema = z.object({ + id: z.number(), + execution_id: z.string(), + session_id: z.string(), + stream_event_type: z.string(), + payload: z.string(), + timestamp: z.number(), +}); + +const assistantMessageInfoSchema = z + .object({ + id: z.string(), + role: z.literal('assistant'), + }) + .passthrough(); + +const assistantMessageUpdatedPayloadSchema = z + .object({ + event: z.literal('message.updated'), + properties: z + .object({ + info: assistantMessageInfoSchema, + }) + .passthrough(), + }) + .passthrough(); + +const assistantMessagePartSchema = z + .object({ + id: z.string(), + messageID: z.string(), + }) + .passthrough(); + +const assistantMessagePartUpdatedPayloadSchema = z + .object({ + event: z.literal('message.part.updated'), + properties: z + .object({ + part: assistantMessagePartSchema, + }) + .passthrough(), + }) + .passthrough(); + +const assistantMessagePartRemovedPayloadSchema = z + .object({ + event: z.literal('message.part.removed'), + properties: z + .object({ + messageID: z.string(), + partID: z.string(), + }) + .passthrough(), + }) + .passthrough(); + // --------------------------------------------------------------------------- // Types // --------------------------------------------------------------------------- @@ -64,6 +123,21 @@ function buildConditions(filters: Omit): SQL[] { return conditions; } +function parseJsonPayload(payload: string, schema: z.ZodType): T | null { + try { + const raw: unknown = JSON.parse(payload); + const parsed = schema.safeParse(raw); + return parsed.success ? parsed.data : null; + } catch { + return null; + } +} + +function parseStoredEventRow(row: unknown): StoredEvent | null { + const parsed = storedEventRowSchema.safeParse(row); + return parsed.success ? parsed.data : null; +} + // --------------------------------------------------------------------------- // Factory Function // --------------------------------------------------------------------------- @@ -131,6 +205,99 @@ export function createEventQueries(db: DrizzleSqliteDODatabase, rawSql: SqlStora return query.all() satisfies StoredEvent[]; }, + getLatestAssistantMessage( + sessionId: string, + kiloSessionId: string + ): LatestAssistantMessage | null { + const messageRow = parseStoredEventRow( + rawSql + .exec( + ` + SELECT id, execution_id, session_id, stream_event_type, payload, timestamp + FROM events + WHERE session_id = ? + AND stream_event_type = 'kilocode' + AND entity_id IS NOT NULL + AND substr(entity_id, 1, 8) = 'message/' + AND json_extract(payload, '$.event') = 'message.updated' + AND json_extract(payload, '$.properties.info.role') = 'assistant' + AND json_extract(payload, '$.properties.info.sessionID') = ? + ORDER BY entity_id DESC + LIMIT 1 + `, + sessionId, + kiloSessionId + ) + .toArray()[0] + ); + if (!messageRow) return null; + + const messagePayload = parseJsonPayload( + messageRow.payload, + assistantMessageUpdatedPayloadSchema + ); + if (!messagePayload) return null; + + const messageId = messagePayload.properties.info.id; + const partPrefix = `part/${messageId}/`; + const partsById = new Map(); + + for (const rawPartRow of rawSql.exec( + ` + SELECT id, execution_id, session_id, stream_event_type, payload, timestamp + FROM events + WHERE session_id = ? + AND stream_event_type = 'kilocode' + AND ( + ( + entity_id IS NOT NULL + AND substr(entity_id, 1, ?) = ? + AND json_extract(payload, '$.event') = 'message.part.updated' + ) + OR ( + json_extract(payload, '$.event') = 'message.part.removed' + AND json_extract(payload, '$.properties.messageID') = ? + ) + ) + ORDER BY timestamp ASC, id ASC + `, + sessionId, + partPrefix.length, + partPrefix, + messageId + )) { + const partRow = parseStoredEventRow(rawPartRow); + if (!partRow) continue; + + const updatedPartPayload = parseJsonPayload( + partRow.payload, + assistantMessagePartUpdatedPayloadSchema + ); + if (updatedPartPayload?.properties.part.messageID === messageId) { + const part = updatedPartPayload.properties.part; + partsById.set(part.id, part); + continue; + } + + const removedPartPayload = parseJsonPayload( + partRow.payload, + assistantMessagePartRemovedPayloadSchema + ); + if (removedPartPayload?.properties.messageID === messageId) { + partsById.delete(removedPartPayload.properties.partID); + } + } + + const parts = [...partsById.values()].sort((a, b) => a.id.localeCompare(b.id)); + + return { + eventId: messageRow.id, + timestamp: messageRow.timestamp, + info: messagePayload.properties.info, + parts, + } satisfies LatestAssistantMessage; + }, + // Uses toSQL() + raw exec() for true lazy cursor-based iteration. // Drizzle's durable-sqlite .all() materializes everything; the raw // SqlStorageCursor lets callers break early without loading all rows. diff --git a/services/cloud-agent-next/src/session/types.ts b/services/cloud-agent-next/src/session/types.ts index 7893059a4..d3acef2b3 100644 --- a/services/cloud-agent-next/src/session/types.ts +++ b/services/cloud-agent-next/src/session/types.ts @@ -5,7 +5,7 @@ * the CloudAgentSession Durable Object's key-value storage. */ -import type { ExecutionId } from '../types/ids.js'; +import type { EventId, ExecutionId } from '../types/ids.js'; import type { ExecutionStatus } from '../core/execution.js'; import type { ExecutionMode, StreamingMode } from '../execution/types.js'; @@ -34,6 +34,27 @@ export type ExecutionMetadata = { ingestToken?: string; }; +// --------------------------------------------------------------------------- +// Latest Assistant Message +// --------------------------------------------------------------------------- + +export type AssistantMessageInfo = Record & { + id: string; + role: 'assistant'; +}; + +export type AssistantMessagePart = Record & { + id: string; + messageID: string; +}; + +export type LatestAssistantMessage = { + eventId: EventId; + timestamp: number; + info: AssistantMessageInfo; + parts: AssistantMessagePart[]; +}; + // --------------------------------------------------------------------------- // Session State Extension // --------------------------------------------------------------------------- diff --git a/services/cloud-agent-next/test/integration/session/events.test.ts b/services/cloud-agent-next/test/integration/session/events.test.ts index cbaefc527..4a165c127 100644 --- a/services/cloud-agent-next/test/integration/session/events.test.ts +++ b/services/cloud-agent-next/test/integration/session/events.test.ts @@ -11,9 +11,18 @@ import { env, runInDurableObject, listDurableObjectIds } from 'cloudflare:test'; import { describe, it, expect, beforeEach } from 'vitest'; import { drizzle } from 'drizzle-orm/durable-sqlite'; +import * as z from 'zod'; import { createEventQueries } from '../../../src/session/queries/events.js'; import type { EventId } from '../../../src/types/ids.js'; +const messageUpdatedPayloadSchema = z.object({ + properties: z.object({ + info: z.object({ + text: z.string(), + }), + }), +}); + describe('Event Storage', () => { beforeEach(async () => { // Verify previous test's DOs are automatically removed (isolation) @@ -273,7 +282,8 @@ describe('Event Storage', () => { // Only one row in the table expect(result.allEvents).toHaveLength(1); // Payload should be the latest version - expect(JSON.parse(result.allEvents[0].payload).properties.info.text).toBe('hello world'); + const payload: unknown = JSON.parse(result.allEvents[0].payload); + expect(messageUpdatedPayloadSchema.parse(payload).properties.info.text).toBe('hello world'); // Timestamp should be updated expect(result.allEvents[0].stream_event_type).toBe('kilocode'); }); @@ -333,4 +343,159 @@ describe('Event Storage', () => { // entity_id should not appear in the projected results (StoredEvent type) expect(result.allEvents[0]).not.toHaveProperty('entity_id'); }); + + it('should return latest assistant message by sortable message ID with current parts', async () => { + const id = env.CLOUD_AGENT_SESSION.idFromName('user_1:sess_7'); + const stub = env.CLOUD_AGENT_SESSION.get(id); + + const result = await runInDurableObject(stub, async (_instance, state) => { + const db = drizzle(state.storage, { logger: false }); + const events = createEventQueries(db, state.storage.sql); + const now = Date.now(); + const latestAssistantId = 'msg_00000000000000000000000002'; + const olderAssistantId = 'msg_00000000000000000000000001'; + const newerUserId = 'msg_00000000000000000000000003'; + + events.upsert({ + executionId: 'exc_1', + sessionId: 'sess_1', + streamEventType: 'kilocode', + payload: JSON.stringify({ + event: 'message.updated', + properties: { info: { id: latestAssistantId, role: 'assistant', sessionID: 'ses_root' } }, + }), + timestamp: now, + entityId: `message/${latestAssistantId}`, + }); + events.upsert({ + executionId: 'exc_1', + sessionId: 'sess_1', + streamEventType: 'kilocode', + payload: JSON.stringify({ + event: 'message.part.updated', + properties: { + part: { + id: 'part_00000000000000000000000002', + messageID: latestAssistantId, + sessionID: 'ses_root', + type: 'text', + text: 'latest answer', + }, + }, + }), + timestamp: now + 1, + entityId: `part/${latestAssistantId}/part_00000000000000000000000002`, + }); + events.upsert({ + executionId: 'exc_1', + sessionId: 'sess_1', + streamEventType: 'kilocode', + payload: JSON.stringify({ + event: 'message.part.updated', + properties: { + part: { + id: 'part_00000000000000000000000003', + messageID: latestAssistantId, + sessionID: 'ses_root', + type: 'text', + text: 'removed answer', + }, + }, + }), + timestamp: now + 2, + entityId: `part/${latestAssistantId}/part_00000000000000000000000003`, + }); + events.insert({ + executionId: 'exc_1', + sessionId: 'sess_1', + streamEventType: 'kilocode', + payload: JSON.stringify({ + event: 'message.part.removed', + properties: { + sessionID: 'ses_root', + messageID: latestAssistantId, + partID: 'part_00000000000000000000000003', + }, + }), + timestamp: now + 3, + }); + events.upsert({ + executionId: 'exc_1', + sessionId: 'sess_1', + streamEventType: 'kilocode', + payload: JSON.stringify({ + event: 'message.updated', + properties: { info: { id: olderAssistantId, role: 'assistant', sessionID: 'ses_root' } }, + }), + timestamp: now + 2, + entityId: `message/${olderAssistantId}`, + }); + events.upsert({ + executionId: 'exc_1', + sessionId: 'sess_1', + streamEventType: 'kilocode', + payload: JSON.stringify({ + event: 'message.updated', + properties: { info: { id: newerUserId, role: 'user', sessionID: 'ses_root' } }, + }), + timestamp: now + 3, + entityId: `message/${newerUserId}`, + }); + + return events.getLatestAssistantMessage('sess_1', 'ses_root'); + }); + + expect(result?.info.id).toBe('msg_00000000000000000000000002'); + expect(result?.parts).toEqual([ + expect.objectContaining({ + id: 'part_00000000000000000000000002', + messageID: 'msg_00000000000000000000000002', + text: 'latest answer', + }), + ]); + }); + + it('should require root-session assistant messages', async () => { + const id = env.CLOUD_AGENT_SESSION.idFromName('user_1:sess_8'); + const stub = env.CLOUD_AGENT_SESSION.get(id); + + const result = await runInDurableObject(stub, async (_instance, state) => { + const db = drizzle(state.storage, { logger: false }); + const events = createEventQueries(db, state.storage.sql); + const now = Date.now(); + const rootMessageId = 'msg_00000000000000000000000002'; + const childMessageId = 'msg_00000000000000000000000003'; + + events.upsert({ + executionId: 'exc_1', + sessionId: 'sess_1', + streamEventType: 'kilocode', + payload: JSON.stringify({ + event: 'message.updated', + properties: { info: { id: rootMessageId, role: 'assistant', sessionID: 'ses_root' } }, + }), + timestamp: now, + entityId: `message/${rootMessageId}`, + }); + events.upsert({ + executionId: 'exc_1', + sessionId: 'sess_1', + streamEventType: 'kilocode', + payload: JSON.stringify({ + event: 'message.updated', + properties: { info: { id: childMessageId, role: 'assistant', sessionID: 'ses_child' } }, + }), + timestamp: now + 1, + entityId: `message/${childMessageId}`, + }); + + return { + root: events.getLatestAssistantMessage('sess_1', 'ses_root'), + missingRoot: events.getLatestAssistantMessage('sess_1', 'ses_missing'), + }; + }); + + expect(result.root?.info.id).toBe('msg_00000000000000000000000002'); + expect(result.missingRoot).toBeNull(); + }); });