Skip to content

Default to Zenoh transport on macOS and document replay workflow#1906

Open
bogwi wants to merge 32 commits intodimensionalOS:devfrom
bogwi:feat/integrate-zenoh
Open

Default to Zenoh transport on macOS and document replay workflow#1906
bogwi wants to merge 32 commits intodimensionalOS:devfrom
bogwi:feat/integrate-zenoh

Conversation

@bogwi
Copy link
Copy Markdown

@bogwi bogwi commented Apr 23, 2026

Supersedes #1787.

This PR carries forward the Zenoh transport integration from #1787 and wraps it into a merge-ready branch that fixes the remaining macOS Big Office replay gap.

Validation

  • manual macOS replay validation:
    • dimos --transport=zenoh --dtop --replay --replay-dir=unitree_go2_bigoffice run unitree-go2 works smoothly
    • after this change, the plain command dimos --dtop --replay --replay-dir=unitree_go2_bigoffice run unitree-go2 also works on macOS by resolving to Zenoh

Notes

  • this PR is intended as the wrapped successor to Feat/integrate zenoh #1787, not a separate redesign
  • Linux behavior remains unchanged by default: explicit Zenoh still works, and the default transport remains LCM

vrinek and others added 28 commits April 24, 2026 00:41
Prepare the codebase for Zenoh integration without changing behavior.
All existing tests pass (1401 passed, 3 xfailed for Phase 2 stubs).

- Add `transport` field to GlobalConfig (default: "lcm")
- Add ZENOH_AVAILABLE guard in transport.py
- Branch _get_transport_for() on global_config.transport
- Gate LCM configurators to only run when transport is "lcm"
- Add ZenohTransport/pZenohTransport behind ZENOH_AVAILABLE guard
- Add zenohpubsub.py stub (raises NotImplementedError)
- Add `zenoh` optional dependency group in pyproject.toml
- Add test_zenoh_transport.py covering all new conditional branches

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
TDD: tests written first, then implementation.
Follows DDSService pattern — module-level session dict with lock.

- ZenohConfig with mode/connect/listen fields and session_key
- ZenohService.start() opens session if not exists for config
- ZenohService.stop() does NOT close shared session
- session property raises RuntimeError if not started
- Two services with same config share one session (8 tests pass)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
TDD: tests written first, then implementation.

- ZenohPubSubBase(ZenohService, AllPubSub[Topic, bytes])
- Publisher caching per key expression (avoids re-declaring)
- Subscriber tracking for cleanup on stop()
- Idempotent unsubscribe (guards against Zenoh ZError)
- subscribe_all() via dimos/** wildcard
- Zenoh and PickleZenoh composed classes (encoder mixins)
- 7 unit tests pass

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Both encoder-composed variants pass all spec conformance tests:
- test_store, test_multiple_subscribers, test_unsubscribe
- test_multiple_messages, test_async_iterator
- 25 total tests pass (10 new Zenoh tests)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Remove xfail markers — Phase 2 stubs are now real implementations.
Add transport wrapper integration tests for broadcast/subscribe.

- ZenohTransport wraps Zenoh (LCM-encoded) with DDSTransport pattern
- pZenohTransport wraps PickleZenoh with Topic wrapping for pubsub layer
- Auto-start on first broadcast, stop/restart lifecycle
- 16 tests pass (4 new wrapper tests)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Zenoh appears alongside LCM, SHM in benchmark heatmaps.
Results: competitive with LCM for localhost — 82-149k msgs/sec
for small messages, 0% message loss, <1ms latency.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Transport-level errors (session closed, invalid key expression) are
logged but not raised. Delivery guarantees are handled by Zenoh's
reliability protocol, not by exception propagation.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Fix dimensionalOS#3: unsubscribe() now only calls undeclare() if it successfully
  removed the subscriber from the list. If stop() already cleared the
  list, unsubscribe() returns without double-undeclaring.
- Fix dimensionalOS#5: on_sample callback wraps payload.to_bytes() in try/except
  to prevent malformed payloads from crashing Zenoh's internal thread.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Check membership before removing instead of catching ValueError.
Reads more clearly and avoids using exceptions for control flow.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Two issues prevented the Rerun bridge from showing data over Zenoh:

1. The bridge hardcoded LCM() as its pubsub. Now resolves lazily at
   start() using self.config.g.transport from the worker's GlobalConfig.

2. Zenoh key expressions cannot contain '#' (forbidden character).
   Type info is now embedded as a '/' segment in the key expression
   (e.g., dimos/pointcloud/sensor_msgs.PointCloud2). _key_expr_to_topic
   reconstructs the Topic with lcm_type for subscribe_all decoding.

Also fixes entity path mapping to strip the dimos/ prefix so Zenoh
entity paths match LCM paths in the Rerun viewer.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- typed_out/untyped_out → typed_data/untyped_data
- Use TypedMsg instead of Image for blueprint integration tests
- Image still used in transport wrapper test (real LCM round-trip)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace raw time.sleep() calls with named helpers that document intent.
wait_for_subscribers() explains Zenoh has no "subscriber ready" signal.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace manual if-both-received check with threading.Barrier(2).
The previous approach could miss the event if both callbacks ran
concurrently and checked the other's list before it was populated.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Review findings dimensionalOS#2 and dimensionalOS#4:
- Remove Config.pubsubs from RerunBridgeModule — pubsubs are resolved
  lazily at start() from global_config.transport
- Remove _zenoh_topic field from pZenohTransport — construct on demand
  like pLCMTransport does, avoiding dual state

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
8 new tests covering:
- Typed/untyped topic → key expression conversion
- Key expression → topic with known/unknown/missing type
- Default lcm_type fallback
- Round-trip typed and untyped

Also documents known limitation: if a topic's base path ends with a
segment matching a registered DimosMsg type name, _key_expr_to_topic
will incorrectly split it. In practice this doesn't happen because
stream names (cmd_vel, lidar) don't match type names.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Existing blueprints pass pubsubs=[LCM()] to RerunBridgeModule.
Removing the field caused a Pydantic ValidationError (extra_forbidden).
Keep the field but document that it's ignored — start() resolves
the pubsub backend from global_config.transport instead.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
TF (transform frames) is hardcoded to LCM in the Module base class.
When transport=zenoh, module streams use Zenoh but TF stays on LCM.
The bridge now listens on both so the robot pose updates in the viewer.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Zenoh tests used time.sleep() to wait for subscriber propagation,
which is either too slow or too flaky in CI. Replace with _retry_until()
that re-publishes in a tight loop until the subscriber's Event fires.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Calls zenoh.init_log_from_env_or("warn") at module load so that
RUST_LOG=debug surfaces Zenoh's Rust-side transport logs (including
SHM negotiation). Defaults to warn to avoid noise.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
uv sync --extra zenoh would resolve dimos[dev] from PyPI instead of
the local project, uninstalling other dependencies. The zenoh extra
only needs eclipse-zenoh — base deps are already installed.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… missing

Align with module_coordinator._get_transport_for: raise RuntimeError instead
of silently falling back to LCM when transport is zenoh and eclipse-zenoh is
not installed.
@bogwi bogwi changed the title Wrap Zenoh integration for macOS Big Office replay Default to Zenoh transport on macOS and document replay workflow Apr 23, 2026
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 23, 2026

Greptile Summary

This PR adds a Zenoh pub/sub transport as an alternative to LCM and defaults to it on macOS when eclipse-zenoh is installed, fixing reliability issues with UDP multicast during heavy replay workloads. All three concerns from the previous review round (import-collection safety, silent pubsubs discard, and missing importorskip guard in test_zenohservice) are addressed in the latest commits.

Confidence Score: 5/5

Safe to merge; all previous P0/P1 concerns are resolved and only minor style/test-reliability suggestions remain.

All three blocking issues from the prior review round (import-collection failures, silent pubsubs discard, missing importorskip) are addressed. Remaining findings are P2: a class-variable style nit on _started, a module-level side-effect ordering in zenohservice, and a potential Barrier deadlock in one test. None of these affect production correctness.

No files require special attention for merge readiness.

Important Files Changed

Filename Overview
dimos/core/global_config.py Adds transport field with a platform+availability-aware default factory and validate_assignment=True; clean and well-tested.
dimos/core/transport.py Adds ZENOH_AVAILABLE sentinel and ZenohTransport/pZenohTransport inside a conditional block; RLock-guarded lazy start is correct, _started class-variable style is the only minor concern.
dimos/core/coordination/module_coordinator.py Transport branching in _get_transport_for and LCM configurator gating in _run_configurators are well-structured; dimos/ topic prefix correctly applied.
dimos/protocol/service/zenohservice.py Singleton session management via _sessions dict; stop() intentionally does not close the shared session. Module-level zenoh.init_log_from_env_or side-effect is a minor style concern.
dimos/protocol/pubsub/impl/zenohpubsub.py Clean raw-bytes Zenoh pubsub with encoder mixins; _topic_to_key_expr/_key_expr_to_topic round-trip logic and known limitations are documented.
dimos/visualization/rerun/bridge.py Adds _default_pubsubs and _resolve_pubsubs to route the bridge to Zenoh+LCM or LCM-only based on transport config; previous concern about silent pubsubs discard is now addressed via model_fields_set inspection.
dimos/core/test_zenoh_transport.py Comprehensive tests for transport branching, GlobalConfig validation, and ZenohTransport lifecycle; Zenoh-specific suites correctly guarded with skipif.
dimos/protocol/pubsub/impl/test_zenohpubsub.py Guarded with pytest.importorskip("zenoh"); test_multiple_subscribers uses a Barrier that could deadlock if Zenoh dispatches callbacks on a single thread.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    CLI["CLI --transport=lcm|zenoh"] --> GC["GlobalConfig.transport\n(default: zenoh on macOS+zenoh, else lcm)"]
    GC --> MC["_get_transport_for()"]
    MC -->|transport==lcm| LCM["LCMTransport / pLCMTransport"]
    MC -->|transport==zenoh| ZT["ZenohTransport / pZenohTransport"]
    ZT --> ZPS["ZenohPubSubBase\n(ZenohService session)"]
    ZPS --> ZS["_sessions singleton\n(ZenohService)"]
    GC --> RP["_resolve_pubsubs()\nin RerunBridgeModule.start()"]
    RP -->|transport==lcm| LCM2["[LCM()]"]
    RP -->|transport==zenoh| ZL["[Zenoh(), LCM()]\n(TF still on LCM)"]
    GC --> RC["_run_configurators()\nskips lcm_configurators\nwhen transport==zenoh"]
Loading

Reviews (5): Last reviewed commit: "Merge branch 'dev' into feat/integrate-z..." | Re-trigger Greptile

Comment thread dimos/core/test_zenoh_transport.py
Comment thread dimos/visualization/rerun/bridge.py Outdated
Comment thread dimos/protocol/service/test_zenohservice.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants