Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
119a951
fix: Introduce dev sign in bypass
vdekrijger Apr 14, 2026
25780fa
fix: Introduce dev sign in bypass
vdekrijger Apr 14, 2026
5fe491b
feat: Hide the posthog AI tab by default (unless enabled in settings)
vdekrijger Apr 15, 2026
7519b45
feat: Better represent the other AI types
vdekrijger Apr 15, 2026
60a5167
feat: Add pref store
vdekrijger Apr 15, 2026
3b88870
WIP COMMIT
vdekrijger Apr 15, 2026
c72eff7
feat: Show cloud / local rns
vdekrijger Apr 15, 2026
830db27
WIP COMMIT
vdekrijger Apr 15, 2026
307915a
WIP COMMIT
vdekrijger Apr 15, 2026
513f0d7
WIP - Add queue and some thinking optimisations
vdekrijger Apr 15, 2026
09ced13
syncing
aspicer Apr 15, 2026
6dc6a75
WIP - Add archivable tasks and make it so that we can improve resilie…
vdekrijger Apr 15, 2026
f7b5aae
WIP - Add retry for completed and failed cloud runs
vdekrijger Apr 15, 2026
407bb86
code
aspicer Apr 16, 2026
637583c
Merge branch 'aspicer/syncing' into hackathon-lisbon-code-updates
aspicer Apr 16, 2026
be180d4
WIP - Added improvmeents for the thinking and connecting messages
vdekrijger Apr 16, 2026
00cfbc2
WIP - Added network detector / Better error handling in error cases a…
vdekrijger Apr 16, 2026
304bb41
Merge remote-tracking branch 'origin/hackathon-lisbon-code-updates' i…
aspicer Apr 16, 2026
5e021ce
WIP - Add highlighting and timestamps to messages
vdekrijger Apr 16, 2026
c62483a
Merge remote-tracking branch 'origin/hackathon-lisbon-code-updates' i…
aspicer Apr 16, 2026
94abba5
fix: Introduce dev sign in bypass
vdekrijger Apr 14, 2026
71b78fc
WIP - readded user filter, added more hapctics and improved markdown …
vdekrijger Apr 16, 2026
2795bf4
WIP - Add access for photo library things
vdekrijger Apr 16, 2026
f3ef56f
subscription resuem
aspicer Apr 16, 2026
2cbc239
meep
aspicer Apr 16, 2026
736d6e2
Merge remote-tracking branch 'origin/hackathon-lisbon-code-updates' i…
aspicer Apr 16, 2026
5b9e731
session id
aspicer Apr 16, 2026
be674e7
fix: Generate title for attachment-only messages (#1674)
charlesvien Apr 16, 2026
56ed7b2
feat: auto recover on disconnect (#1687)
jonathanlab Apr 16, 2026
46df7e3
feat: Add task search and sidebar context menu integration to command…
charlesvien Apr 16, 2026
c866b6c
fix: Prevent auto-generated title from overwriting manual rename (#1676)
charlesvien Apr 16, 2026
7e7ff37
feat: Implement a background to the installer dmg (#1662)
charlesvien Apr 16, 2026
79730b7
feat: Revamped update banner (#1659)
charlesvien Apr 16, 2026
a076c1e
feat(sig): Add user autonomy config, refactor API (#1672)
oliverb123 Apr 16, 2026
92f845a
WIP - Address some beta testing feedback and add an overlay for local…
vdekrijger Apr 16, 2026
0bcf56f
WIP - Add stoppable tasks annddddd some retries imporvoement
vdekrijger Apr 16, 2026
e44934f
fix: REmove queued and fix the task filtering
vdekrijger Apr 16, 2026
6f55019
wip: Small UX work to wipe button won msg send and error handling imp…
vdekrijger Apr 16, 2026
389bc1f
wip: Add cloud polling when the app regains focus
vdekrijger Apr 16, 2026
591172c
camera
aspicer Apr 16, 2026
47a94ce
wip: improvmenets to the show where to run feature
vdekrijger Apr 16, 2026
d17e998
Merge branch 'hackathon-lisbon-code-updates' of github.com:PostHog/co…
vdekrijger Apr 16, 2026
450df6d
wip: Handle continue in the cloud thing better
vdekrijger Apr 16, 2026
d299931
feat: fix identifier
vdekrijger Apr 16, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions .claude/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
"deny": [
"Read(./apps/cli/**)",
"Edit(./apps/cli/**)",
"Write(./apps/cli/**)",
"Read(./apps/mobile/**)",
"Edit(./apps/mobile/**)",
"Write(./apps/mobile/**)"
"Write(./apps/cli/**)"
]
}
}
Binary file added apps/code/build/dmg-background.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 7 additions & 0 deletions apps/code/forge.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,13 @@ const config: ForgeConfig = {
new MakerDMG({
icon: "./build/app-icon.icns",
format: "ULFO",
background: "./build/dmg-background.png",
iconSize: 80,
window: { size: { width: 540, height: 380 } },
contents: (opts) => [
{ x: 135, y: 225, type: "file", path: opts.appPath },
{ x: 405, y: 225, type: "link", path: "/Applications" },
],
...(shouldSignMacApp && appleCodesignIdentity
? {
"code-sign": {
Expand Down
2 changes: 2 additions & 0 deletions apps/code/src/main/di/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { WorkspaceRepository } from "../db/repositories/workspace-repository";
import { WorktreeRepository } from "../db/repositories/worktree-repository";
import { DatabaseService } from "../db/service";
import { AgentAuthAdapter } from "../services/agent/auth-adapter";
import { LocalCommandReceiver } from "../services/agent/local-command-receiver";
import { AgentService } from "../services/agent/service";
import { AppLifecycleService } from "../services/app-lifecycle/service";
import { ArchiveService } from "../services/archive/service";
Expand Down Expand Up @@ -64,6 +65,7 @@ container.bind(MAIN_TOKENS.ArchiveRepository).to(ArchiveRepository);
container.bind(MAIN_TOKENS.SuspensionRepository).to(SuspensionRepositoryImpl);
container.bind(MAIN_TOKENS.AgentAuthAdapter).to(AgentAuthAdapter);
container.bind(MAIN_TOKENS.AgentService).to(AgentService);
container.bind(MAIN_TOKENS.LocalCommandReceiver).to(LocalCommandReceiver);
container.bind(MAIN_TOKENS.AuthService).to(AuthService);
container.bind(MAIN_TOKENS.AuthProxyService).to(AuthProxyService);
container.bind(MAIN_TOKENS.ArchiveService).to(ArchiveService);
Expand Down
1 change: 1 addition & 0 deletions apps/code/src/main/di/tokens.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export const MAIN_TOKENS = Object.freeze({
// Services
AgentAuthAdapter: Symbol.for("Main.AgentAuthAdapter"),
AgentService: Symbol.for("Main.AgentService"),
LocalCommandReceiver: Symbol.for("Main.LocalCommandReceiver"),
AuthService: Symbol.for("Main.AuthService"),
AuthProxyService: Symbol.for("Main.AuthProxyService"),
ArchiveService: Symbol.for("Main.ArchiveService"),
Expand Down
288 changes: 288 additions & 0 deletions apps/code/src/main/services/agent/local-command-receiver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
import { inject, injectable, preDestroy } from "inversify";
import { MAIN_TOKENS } from "../../di/tokens";
import { logger } from "../../utils/logger";
import { localCommandCursorStore } from "../../utils/store";
import type { AuthService } from "../auth/service";

const log = logger.scope("local-command-receiver");
const INITIAL_RECONNECT_DELAY_MS = 2000;
const MAX_RECONNECT_DELAY_MS = 30_000;
// After this many consecutive failures, assume Last-Event-ID is stale (event
// trimmed from the backend buffer) and fall back to a fresh connect with
// start=latest. Accepts that we may drop commands issued during the outage.
const STALE_EVENT_ID_THRESHOLD = 3;

/**
* JSON-RPC envelope carried inside an `incoming_command` SSE event. The
* backend repackages whatever mobile POSTs to /command/ into this shape
* (see products/tasks/backend/api.py :: command).
*/
export interface IncomingCommandPayload {
jsonrpc: string;
method: string;
params?: { content?: string } & Record<string, unknown>;
id?: string | number;
}

interface SubscribeParams {
taskId: string;
taskRunId: string;
projectId: number;
apiHost: string;
onCommand: (payload: IncomingCommandPayload) => Promise<void>;
}

interface Subscription {
taskRunId: string;
controller: AbortController;
}

/**
* Subscribes to the PostHog task-run SSE stream for a local run and
* delivers `incoming_command` events (published by the backend when mobile
* POSTs to /command/ on a run with environment=local) to a caller-supplied
* callback.
*
* One SSE connection per subscribed run. Reconnects with backoff on failure.
* Uses the `Last-Event-ID` header to resume from the last processed event
* so brief network blips don't drop commands.
*/
@injectable()
export class LocalCommandReceiver {
private readonly subs = new Map<string, Subscription>();

constructor(
@inject(MAIN_TOKENS.AuthService)
private readonly auth: AuthService,
) {}

subscribe(params: SubscribeParams): void {
if (this.subs.has(params.taskRunId)) {
log.debug("Already subscribed", { taskRunId: params.taskRunId });
return;
}
const controller = new AbortController();
this.subs.set(params.taskRunId, {
taskRunId: params.taskRunId,
controller,
});
log.info("Subscribing to SSE stream", { taskRunId: params.taskRunId });
void this.connectLoop(params, controller).catch((err) => {
if (controller.signal.aborted) return;
log.error("Connect loop exited unexpectedly", {
taskRunId: params.taskRunId,
error: err instanceof Error ? err.message : String(err),
});
});
}

unsubscribe(taskRunId: string): void {
const sub = this.subs.get(taskRunId);
if (!sub) return;
sub.controller.abort();
this.subs.delete(taskRunId);
log.info("Unsubscribed", { taskRunId });
}

@preDestroy()
async shutdown(): Promise<void> {
// Abort before awaiting teardown — per async-cleanup-ordering guidance.
for (const sub of this.subs.values()) sub.controller.abort();
this.subs.clear();
}

private async connectLoop(
params: SubscribeParams,
controller: AbortController,
): Promise<void> {
// Seed from persisted cursor so we resume without replay across reconnects
// AND across app restarts. On first-ever subscribe the cursor is undefined;
// we send no start param and no Last-Event-ID, which lets the backend read
// from the beginning of the stream. That is intentional — it catches
// mobile-originated commands published while this desktop was offline.
let lastEventId = this.loadCursor(params.taskRunId);
let consecutiveFailures = 0;
// Set after we drop a stale Last-Event-ID so the next fetch asks for
// start=latest — we accept dropping commands issued during the outage
// rather than looping forever on an un-resumable cursor.
let droppedStaleCursor = false;

while (!controller.signal.aborted) {
let streamOpened = false;
try {
const { accessToken } = await this.auth.getValidAccessToken();
const url = new URL(
`${params.apiHost}/api/projects/${params.projectId}/tasks/${params.taskId}/runs/${params.taskRunId}/stream/`,
);
if (droppedStaleCursor) {
url.searchParams.set("start", "latest");
}

const headers: Record<string, string> = {
Authorization: `Bearer ${accessToken}`,
Accept: "text/event-stream",
};
if (lastEventId) headers["Last-Event-ID"] = lastEventId;

const response = await fetch(url.toString(), {
headers,
signal: controller.signal,
});
if (!response.ok) {
const body = await response.text().catch(() => "");
throw new Error(
`SSE HTTP ${response.status}${body ? `: ${body.slice(0, 200)}` : ""}`,
);
}

streamOpened = true;
consecutiveFailures = 0;
droppedStaleCursor = false;
lastEventId = await this.readEventStream(
response.body,
params.onCommand,
controller.signal,
lastEventId,
params.taskRunId,
);
log.info("SSE stream ended cleanly", {
taskRunId: params.taskRunId,
});
} catch (err) {
if (controller.signal.aborted) return;
if (!streamOpened) consecutiveFailures++;
if (
consecutiveFailures >= STALE_EVENT_ID_THRESHOLD &&
lastEventId !== undefined
) {
log.warn(
"Dropping possibly-stale Last-Event-ID after repeated failures",
{
taskRunId: params.taskRunId,
consecutiveFailures,
},
);
lastEventId = undefined;
droppedStaleCursor = true;
}
log.warn("SSE disconnected, will reconnect", {
taskRunId: params.taskRunId,
consecutiveFailures,
error: err instanceof Error ? err.message : String(err),
});
}
if (controller.signal.aborted) return;
const delay = Math.min(
MAX_RECONNECT_DELAY_MS,
INITIAL_RECONNECT_DELAY_MS * 2 ** Math.max(0, consecutiveFailures - 1),
);
await this.sleep(delay, controller.signal);
}
}

private loadCursor(taskRunId: string): string | undefined {
const cursors = localCommandCursorStore.get("cursors");
return cursors[taskRunId];
}

private saveCursor(taskRunId: string, eventId: string): void {
const cursors = localCommandCursorStore.get("cursors");
cursors[taskRunId] = eventId;
localCommandCursorStore.set("cursors", cursors);
}

private async readEventStream(
body: ReadableStream<Uint8Array> | null,
onCommand: SubscribeParams["onCommand"],
signal: AbortSignal,
seedLastEventId: string | undefined,
taskRunId: string,
): Promise<string | undefined> {
if (!body) throw new Error("Missing SSE response body");
const reader = body.getReader();
const decoder = new TextDecoder();
let buffer = "";
let lastEventId = seedLastEventId;

try {
while (!signal.aborted) {
const { done, value } = await reader.read();
if (done) return lastEventId;
buffer += decoder.decode(value, { stream: true });

// SSE event blocks are separated by a blank line (\n\n).
while (true) {
const separator = buffer.indexOf("\n\n");
if (separator === -1) break;
const rawEvent = buffer.slice(0, separator);
buffer = buffer.slice(separator + 2);

let dataChunks = "";
let eventId: string | undefined;
for (const line of rawEvent.split("\n")) {
if (line.startsWith("data: ")) {
dataChunks += line.slice(6);
} else if (line.startsWith("id: ")) {
eventId = line.slice(4);
}
// `event:` and comments are ignored — we route on data.type.
}
if (!dataChunks) continue;

let parsed: unknown;
try {
parsed = JSON.parse(dataChunks);
} catch {
log.warn("Failed to parse SSE data chunk", {
preview: dataChunks.slice(0, 120),
});
continue;
}

if (
typeof parsed === "object" &&
parsed !== null &&
(parsed as { type?: unknown }).type === "incoming_command"
) {
const payload = (parsed as { payload?: unknown }).payload;
if (payload && typeof payload === "object") {
try {
await onCommand(payload as IncomingCommandPayload);
} catch (err) {
log.error("Incoming command handler threw", {
error: err instanceof Error ? err.message : String(err),
});
}
}
}

if (eventId) {
lastEventId = eventId;
this.saveCursor(taskRunId, eventId);
}
}
}
return lastEventId;
} finally {
try {
await reader.cancel();
} catch {
// Reader already closed or cancelled; nothing to do.
}
}
}

private sleep(ms: number, signal: AbortSignal): Promise<void> {
return new Promise((resolve) => {
const timer = setTimeout(resolve, ms);
signal.addEventListener(
"abort",
() => {
clearTimeout(timer);
resolve();
},
{ once: true },
);
});
}
}
30 changes: 12 additions & 18 deletions apps/code/src/main/services/agent/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,6 @@ export const credentialsSchema = z.object({

export type Credentials = z.infer<typeof credentialsSchema>;

// Session config schema
export const sessionConfigSchema = z.object({
taskId: z.string(),
taskRunId: z.string(),
repoPath: z.string(),
credentials: credentialsSchema,
logUrl: z.string().optional(),
/** The agent's session ID (for resume - SDK session ID for Claude, Codex's session ID for Codex) */
sessionId: z.string().optional(),
adapter: z.enum(["claude", "codex"]).optional(),
/** Additional directories Claude can access beyond cwd (for worktree support) */
additionalDirectories: z.array(z.string()).optional(),
/** Permission mode to use for the session (e.g. "default", "acceptEdits", "plan", "bypassPermissions") */
permissionMode: z.string().optional(),
});

export type SessionConfig = z.infer<typeof sessionConfigSchema>;

// Start session input/output

export const startSessionInput = z.object({
Expand Down Expand Up @@ -175,6 +157,7 @@ export const reconnectSessionInput = z.object({
customInstructions: z.string().max(2000).optional(),
effort: effortLevelSchema.optional(),
jsonSchema: z.record(z.string(), z.unknown()).nullish(),
runMode: z.enum(["local", "cloud"]).optional(),
});

export type ReconnectSessionInput = z.infer<typeof reconnectSessionInput>;
Expand All @@ -200,6 +183,11 @@ export const recordActivityInput = z.object({
export const AgentServiceEvent = {
SessionEvent: "session-event",
PermissionRequest: "permission-request",
// Fires when a pending permission is resolved by anything other than the
// Electron UI (e.g. a mobile client calling permission_response). Renderer
// uses this to clear its own pendingPermissions copy in lockstep with the
// main-process map.
PermissionResolved: "permission-resolved",
SessionsIdle: "sessions-idle",
SessionIdleKilled: "session-idle-killed",
AgentFileActivity: "agent-file-activity",
Expand Down Expand Up @@ -228,9 +216,15 @@ export interface AgentFileActivityPayload {
branchName: string | null;
}

export interface PermissionResolvedPayload {
taskRunId: string;
toolCallId: string;
}

export interface AgentServiceEvents {
[AgentServiceEvent.SessionEvent]: AgentSessionEventPayload;
[AgentServiceEvent.PermissionRequest]: PermissionRequestPayload;
[AgentServiceEvent.PermissionResolved]: PermissionResolvedPayload;
[AgentServiceEvent.SessionsIdle]: undefined;
[AgentServiceEvent.SessionIdleKilled]: SessionIdleKilledPayload;
[AgentServiceEvent.AgentFileActivity]: AgentFileActivityPayload;
Expand Down
Loading
Loading