Skip to content

Conversation

@joelhooks
Copy link
Contributor

    ╔═══════════════════════════════════════════════════════════════╗
    ║                                                               ║
    ║   ██████╗ ██╗   ██╗██████╗  █████╗ ██████╗ ██╗     ███████╗   ║
    ║   ██╔══██╗██║   ██║██╔══██╗██╔══██╗██╔══██╗██║     ██╔════╝   ║
    ║   ██║  ██║██║   ██║██████╔╝███████║██████╔╝██║     █████╗     ║
    ║   ██║  ██║██║   ██║██╔══██╗██╔══██║██╔══██╗██║     ██╔══╝     ║
    ║   ██████╔╝╚██████╔╝██║  ██║██║  ██║██████╔╝███████╗███████╗   ║
    ║   ╚═════╝  ╚═════╝ ╚═╝  ╚═╝╚═╝  ╚═╝╚═════╝ ╚══════╝╚══════╝   ║
    ║                                                               ║
    ║   ███████╗████████╗██████╗ ███████╗ █████╗ ███╗   ███╗███████╗║
    ║   ██╔════╝╚══██╔══╝██╔══██╗██╔════╝██╔══██╗████╗ ████║██╔════╝║
    ║   ███████╗   ██║   ██████╔╝█████╗  ███████║██╔████╔██║███████╗║
    ║   ╚════██║   ██║   ██╔══██╗██╔══╝  ██╔══██║██║╚██╔╝██║╚════██║║
    ║   ███████║   ██║   ██║  ██║███████╗██║  ██║██║ ╚═╝ ██║███████║║
    ║   ╚══════╝   ╚═╝   ╚═╝  ╚═╝╚══════╝╚═╝  ╚═╝╚═╝     ╚═╝╚══════╝║
    ║                                                               ║
    ╚═══════════════════════════════════════════════════════════════╝

Why

SSE connections drop. Networks flake. Clients reconnect. When they do, they miss events.

Durable streams solve this: persist events with monotonic offsets, let clients catch up from where they left off. No lost messages, no polling, no complexity on the client side.

How

┌─────────────────────────────────────────────────────────────┐
│                        DATA FLOW                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   Bus.publish(event)                                        │
│         │                                                   │
│         ▼                                                   │
│   EventStore.append() ──► SQLite + ULID offset              │
│         │                                                   │
│         ▼                                                   │
│   GET /stream/events?offset=X                               │
│         │                                                   │
│         ├──► Catch-up: JSON array of missed events          │
│         └──► Live: SSE stream from offset onwards           │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Config-gated. Zero behavior change when disabled:

// opencode.jsonc
{
  "experimental": {
    "durableStreams": true
  }
}

What's in the box

Component File Purpose
Config flag config.ts experimental.durableStreams (default: false)
EventStore event-store/ SQLite persistence, ULID offsets, catch-up queries
ServerRegistry server/registry.ts JSON file-based multi-server discovery
Stream endpoint server/stream.ts GET /stream/events (catch-up + live SSE)
Discovery server/discovery.ts GET/POST/DELETE /servers
Persistence hook bus/index.ts Auto-persist on Bus.publish()
Self-registration cli/cmd/serve.ts 15s heartbeat, graceful shutdown

Offset semantics

  • Offsets are opaque ULID strings (lexicographically sortable)
  • offset="-1" = start from beginning
  • Response includes Stream-Next-Offset header for resumption
  • Client stores offset, reconnects with it, gets exactly what they missed

Tests

407 pass | 0 fail

Full coverage on EventStore and ServerRegistry with edge cases for catch-up semantics, payload robustness, and ULID monotonicity.

- Add experimental.durableStreams boolean to Config schema (default: false)
- Add test verifying flag parsing
- Gates all durable streaming endpoints

Cell: opencode-c802w7-mjrev8a9v97
- EventStore class with append/query/getLatestOffset methods
- Drizzle schema with composite primary key (session_id, offset)
- Monotonic ULID generation for lexicographic sorting
- Prepared statement caching for performance
- Full test coverage

Cell: opencode-c802w7-mjrevag9nfq
- JSON file-based registry at ~/.opencode/servers.json
- Methods: register, unregister, list, heartbeat, pruneStale
- Atomic writes via temp file + rename pattern
- 30s heartbeat TTL with stale pruning
- Full test coverage

Cell: opencode-c802w7-mjrevcadmjq
…ds, and ULID monotonicity

- Catch-up semantics: inclusive offset behavior, non-existent offsets
- Payload edge cases: large nested objects, null, arrays, special chars
- Session isolation: special chars in sessionId, similar ID patterns
- ULID monotonicity: rapid appends, cross-session ordering

Ref: opencode-c802w7-mjrer9jnqr4
- Test ID overwrite behavior on duplicate register
- Test no-op behavior for unregister/heartbeat on nonexistent servers
- Test corrupted JSON recovery (fallback to empty state)
- Test sequential multi-server registration
- Test pruneStale on empty registry
- Test pruneStale preserves fresh servers

All tests pass. 100% coverage on ServerRegistry implementation.

Refs: opencode-c802w7-mjrer9jyhkc
- Create packages/opencode/src/server/stream.ts with durable streaming
- Config gate: returns 400 if experimental.durableStreams not enabled
- Catch-up mode: returns JSON array of events from offset onwards
- Live mode: SSE stream with catch-up events + heartbeat (real-time subscription TODO)
- Protocol: offset=-1 for stream start, Stream-Next-Offset header
- Mount at GET /stream/events in server.ts

Part of opencode-c802w7-mjrer9jcoqb (Durable Streaming)
- GET /servers: list registered servers with auto-prune
- POST /servers: manual server registration
- DELETE /servers/:serverId: unregister server
- Config gate: requires experimental.durableStreams

Related: opencode-c802w7-mjrer9k245p
- Register server on startup if experimental.durableStreams enabled
- Send heartbeat every 15 seconds to ServerRegistry
- Unregister on SIGINT/SIGTERM shutdown
- Use random hex ID for server identification

Related: opencode-c802w7-mjrer9k4mtq
- Add experimental.durableStreams config flag documentation
- Document /stream/events endpoint (catch-up + live SSE)
- Document server discovery endpoints (/servers)
- Explain catch-up semantics, offset usage, and auto-registration
- Mark as experimental and opt-in

Cell: opencode-c802w7-mjrer9k72c1
- Comprehensive guide for experimental durable streaming feature
- Documents EventStore (SQLite + ULID offsets)
- Documents ServerRegistry (multi-server discovery)
- Documents API endpoints (GET /stream/events, GET/POST/DELETE /servers)
- Explains catch-up semantics and offset usage
- Includes client implementation example
- Notes limitations and roadmap
- Marks feature as EXPERIMENTAL

Cell: opencode-c802w7-mjrer9kafjn
- Add lazy singleton EventStore initialization in Bus module
- Persist events to EventStore when experimental.durableStreams is enabled
- Use Instance.directory as sessionId for event persistence
- Close EventStore on instance disposal for cleanup
- Maintain zero-config behavior when flag is disabled

Part of opencode-c802w7-mjrer9jcoqb (durable streaming epic)
@joelhooks joelhooks force-pushed the feat/durable-streaming-experimental branch from ce30754 to 9abe049 Compare December 29, 2025 18:55
Also add .mise.toml for consistent bun version
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant