Remove old _ts fields and normalize event result updating process#25
Remove old _ts fields and normalize event result updating process#25pirate wants to merge 244 commits intobrowser-use:mainfrom
Conversation
Add support for middlewares to hook into event bus handler lifecycle
implement swappable EventHistory storage backend
Updated the description to clarify the library's functionality and similarities to JS event systems.
Revise README description for bubus library
…ead of process-until-event
…, enforce reserved event fields
Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
add middlewares to ts and context managers to python
There was a problem hiding this comment.
26 issues found across 230 files
Note: This PR contains a large number of files. cubic only reviews up to 75 files per PR, so some files may not have been reviewed.
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="bubus/bridges.py">
<violation number="1" location="bubus/bridges.py:263">
P2: Unbounded request body size: Content-Length is accepted without a maximum limit before readexactly, allowing a client to trigger memory exhaustion by advertising a huge body size.</violation>
</file>
<file name="bubus/bridge_sqlite.py">
<violation number="1" location="bubus/bridge_sqlite.py:262">
P2: _ensure_columns is not thread-safe; concurrent emit calls can both attempt the same ALTER TABLE, leading to sqlite3.OperationalError (duplicate column) and a failed emit.</violation>
</file>
<file name="docs/api/eventhandler.mdx">
<violation number="1" location="docs/api/eventhandler.mdx:59">
P3: Python docs reference `EventHandler.from_json_dict(...)`, but no such factory exists in the implementation, so the example will fail.</violation>
</file>
<file name="docs/api/eventresult.mdx">
<violation number="1" location="docs/api/eventresult.mdx:22">
P3: The new await semantics statement is misleading for TypeScript: `EventResult` is not awaitable in the TS implementation and the TS example uses `entry.result`. Consider scoping the await semantics to Python or clarifying the TS behavior to avoid incorrect expectations.</violation>
</file>
<file name="bubus-ts/src/event_handler.ts">
<violation number="1" location="bubus-ts/src/event_handler.ts:212">
P2: Path sanitization only handles POSIX separators; Windows backslashes in stack paths won’t match these regexes, so usernames can be persisted in handler_file_path.</violation>
</file>
<file name="bubus-ts/src/bridge_nats.ts">
<violation number="1" location="bubus-ts/src/bridge_nats.ts:68">
P2: Race condition: `running` is set only after awaiting async setup, so concurrent `start()` calls can both pass the guard and create duplicate NATS connections/subscriptions.</violation>
</file>
<file name="bubus-ts/src/helpers.ts">
<violation number="1" location="bubus-ts/src/helpers.ts:57">
P2: When performance.now is unavailable, elapsed_ms is always 0, so epoch_ns stays at the start-time anchor and only advances by +1ns per call. This makes monotonicDatetime return nearly static timestamps rather than current time in runtimes without performance.</violation>
</file>
<file name="bubus-ts/src/bridge_jsonl.ts">
<violation number="1" location="bubus-ts/src/bridge_jsonl.ts:74">
P2: `start()` sets `running` only after several awaits, so rapid consecutive `ensureStarted()` calls can start multiple `listenLoop` tasks and reset offsets concurrently.</violation>
<violation number="2" location="bubus-ts/src/bridge_jsonl.ts:156">
P2: Decoding file chunks without a streaming TextDecoder can corrupt multibyte UTF‑8 characters split across reads, leading to data corruption for non‑ASCII JSONL payloads.</violation>
<violation number="3" location="bubus-ts/src/bridge_jsonl.ts:164">
P2: Custom dirname only handles '/' separators, so Windows-style backslash paths resolve to '.', causing mkdir to target the current directory instead of the file’s parent.</violation>
</file>
<file name="bubus-ts/src/bridges.ts">
<violation number="1" location="bubus-ts/src/bridges.ts:252">
P2: HTTP listener accumulates request bodies without any size limit, allowing a large POST to exhaust memory and crash the process (DoS).</violation>
<violation number="2" location="bubus-ts/src/bridges.ts:265">
P2: Async handleIncomingPayload promise is discarded in the unix listener without a catch, which can lead to unhandled promise rejections if parsing/emitting fails.</violation>
<violation number="3" location="bubus-ts/src/bridges.ts:306">
P2: Unbounded accumulation of socket data can lead to memory exhaustion if a client sends a long stream without newlines.</violation>
</file>
<file name="bubus-ts/src/logging.ts">
<violation number="1" location="bubus-ts/src/logging.ts:116">
P3: printed_child_ids is initialized from event_results keys (handler IDs) but compared against child.event_id, so the set never matches child events and the intended de-duplication is ineffective.</violation>
</file>
<file name="bubus/events_suck.py">
<violation number="1" location="bubus/events_suck.py:162">
P2: _build_event_method always adds a VAR_KEYWORD parameter named 'extra'. If an event class defines a field named 'extra', this creates duplicate parameter names and inspect.Signature will raise ValueError, breaking wrapper generation.</violation>
</file>
<file name="bubus-ts/src/retry.ts">
<violation number="1" location="bubus-ts/src/retry.ts:360">
P2: _runWithTimeout doesn’t clear the timeout if `fn()` throws synchronously, leaving a pending timer until it expires and potentially accumulating timers under repeated sync failures.</violation>
</file>
<file name="README.md">
<violation number="1" location="README.md:22">
P3: The README intro example registers an undefined handler (`some_function`) and constructs `SomeEvent` with invalid Python syntax, so the snippet will not run as shown.</violation>
</file>
<file name="bubus/retry.py">
<violation number="1" location="bubus/retry.py:134">
P2: The 5‑minute hardcoded lock-file cleanup can delete an in-use semaphore file for long-running operations, allowing another process to acquire a new lock and violate the multiprocess concurrency limit.</violation>
</file>
<file name="bubus-ts/src/base_event.ts">
<violation number="1" location="bubus-ts/src/base_event.ts:237">
P2: Payload fields can shadow BaseEvent instance methods because parsed data from a loose schema is assigned directly onto `this` without reserving method names. If a user defines an event field like `done` or `eventCompleted`, the method is overwritten and calling it will throw. Add a guard to reject keys that collide with prototype methods or expand the reserved list.</violation>
</file>
<file name="bubus-ts/src/bridge_redis.ts">
<violation number="1" location="bubus-ts/src/bridge_redis.ts:143">
P2: Async dispatchInboundPayload is fired without awaiting or catch; if it rejects (e.g., BaseEvent.fromJSON throws), this becomes an unhandled promise rejection in the Redis message handler.</violation>
</file>
<file name="bubus-ts/src/timing.ts">
<violation number="1" location="bubus-ts/src/timing.ts:30">
P2: Timeout callback can throw and bypass rejection, causing an uncaught exception in the timer handler.</violation>
</file>
<file name="bubus/event_handler.py">
<violation number="1" location="bubus/event_handler.py:273">
P2: Validation uses `assert`, which is stripped with Python -O, disabling the eventbus_name validation in optimized/production runs.</violation>
<violation number="2" location="bubus/event_handler.py:293">
P2: get_callable_handler_name asserts __name__ exists, but callable instances often lack __name__, causing debug-only failures despite the fallback that handles missing names.</violation>
</file>
<file name="bubus/jsonschema.py">
<violation number="1" location="bubus/jsonschema.py:19">
P2: JSON Schema string constraints (minLength/maxLength/pattern) are not mapped to Pydantic Field parameters, so string validations are silently ignored during schema conversion.</violation>
<violation number="2" location="bubus/jsonschema.py:278">
P2: `additionalProperties` is ignored when `properties` are present, so schemas that forbid or constrain extra keys are not enforced (Pydantic defaults to ignoring extra fields).</violation>
</file>
<file name="bubus-ts/src/bridge_postgres.ts">
<violation number="1" location="bubus-ts/src/bridge_postgres.ts:142">
P2: `start()` is not guarded against concurrent calls: `running` is only set true after awaited initialization, while `ensureStarted()` calls `start()` without awaiting. Concurrent calls can create multiple clients and overwrite `this.client`, leaking connections and duplicating LISTEN setup.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| return | ||
|
|
||
| try: | ||
| body = await reader.readexactly(body_size) |
There was a problem hiding this comment.
P2: Unbounded request body size: Content-Length is accepted without a maximum limit before readexactly, allowing a client to trigger memory exhaustion by advertising a huge body size.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At bubus/bridges.py, line 263:
<comment>Unbounded request body size: Content-Length is accepted without a maximum limit before readexactly, allowing a client to trigger memory exhaustion by advertising a huge body size.</comment>
<file context>
@@ -0,0 +1,375 @@
+ return
+
+ try:
+ body = await reader.readexactly(body_size)
+ except asyncio.IncompleteReadError:
+ await self._write_http_response(writer, status=400, body='incomplete body')
</file context>
| with closing(self._connect()) as conn: | ||
| for key in missing_columns: | ||
| column_type = 'JSON' if key == _EVENT_PAYLOAD_COLUMN else 'TEXT' | ||
| conn.execute(f'ALTER TABLE "{self.table}" ADD COLUMN "{key}" {column_type}') |
There was a problem hiding this comment.
P2: _ensure_columns is not thread-safe; concurrent emit calls can both attempt the same ALTER TABLE, leading to sqlite3.OperationalError (duplicate column) and a failed emit.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At bubus/bridge_sqlite.py, line 262:
<comment>_ensure_columns is not thread-safe; concurrent emit calls can both attempt the same ALTER TABLE, leading to sqlite3.OperationalError (duplicate column) and a failed emit.</comment>
<file context>
@@ -0,0 +1,331 @@
+ with closing(self._connect()) as conn:
+ for key in missing_columns:
+ column_type = 'JSON' if key == _EVENT_PAYLOAD_COLUMN else 'TEXT'
+ conn.execute(f'ALTER TABLE "{self.table}" ADD COLUMN "{key}" {column_type}')
+ self._table_columns.add(key)
+ conn.commit()
</file context>
| normalized = path | ||
| } | ||
| } | ||
| normalized = normalized.replace(/\/users\/[^/]+\//i, '~/').replace(/\/home\/[^/]+\//i, '~/') |
There was a problem hiding this comment.
P2: Path sanitization only handles POSIX separators; Windows backslashes in stack paths won’t match these regexes, so usernames can be persisted in handler_file_path.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At bubus-ts/src/event_handler.ts, line 212:
<comment>Path sanitization only handles POSIX separators; Windows backslashes in stack paths won’t match these regexes, so usernames can be persisted in handler_file_path.</comment>
<file context>
@@ -0,0 +1,348 @@
+ normalized = path
+ }
+ }
+ normalized = normalized.replace(/\/users\/[^/]+\//i, '~/').replace(/\/home\/[^/]+\//i, '~/')
+ this.handler_file_path = line_number ? `${normalized}:${line_number}` : normalized
+ }
</file context>
| this.nc = await connect({ servers: this.server }) | ||
| const sub = this.nc.subscribe(this.subject) | ||
|
|
||
| this.running = true |
There was a problem hiding this comment.
P2: Race condition: running is set only after awaiting async setup, so concurrent start() calls can both pass the guard and create duplicate NATS connections/subscriptions.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At bubus-ts/src/bridge_nats.ts, line 68:
<comment>Race condition: `running` is set only after awaiting async setup, so concurrent `start()` calls can both pass the guard and create duplicate NATS connections/subscriptions.</comment>
<file context>
@@ -0,0 +1,104 @@
+ this.nc = await connect({ servers: this.server })
+ const sub = this.nc.subscribe(this.subject)
+
+ this.running = true
+ this.sub_task = (async () => {
+ for await (const msg of sub) {
</file context>
| return normalized | ||
| } | ||
|
|
||
| const elapsed_ms = has_performance_now ? performance.now() - monotonic_clock_anchor_ms : 0 |
There was a problem hiding this comment.
P2: When performance.now is unavailable, elapsed_ms is always 0, so epoch_ns stays at the start-time anchor and only advances by +1ns per call. This makes monotonicDatetime return nearly static timestamps rather than current time in runtimes without performance.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At bubus-ts/src/helpers.ts, line 57:
<comment>When performance.now is unavailable, elapsed_ms is always 0, so epoch_ns stays at the start-time anchor and only advances by +1ns per call. This makes monotonicDatetime return nearly static timestamps rather than current time in runtimes without performance.</comment>
<file context>
@@ -0,0 +1,65 @@
+ return normalized
+ }
+
+ const elapsed_ms = has_performance_now ? performance.now() - monotonic_clock_anchor_ms : 0
+ const elapsed_ns = BigInt(Math.max(0, Math.floor(elapsed_ms * 1_000_000)))
+ let epoch_ns = monotonic_epoch_anchor_ns + elapsed_ns
</file context>
| } | ||
|
|
||
| async start(): Promise<void> { | ||
| if (this.running) return |
There was a problem hiding this comment.
P2: start() is not guarded against concurrent calls: running is only set true after awaited initialization, while ensureStarted() calls start() without awaiting. Concurrent calls can create multiple clients and overwrite this.client, leaking connections and duplicating LISTEN setup.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At bubus-ts/src/bridge_postgres.ts, line 142:
<comment>`start()` is not guarded against concurrent calls: `running` is only set true after awaited initialization, while `ensureStarted()` calls `start()` without awaiting. Concurrent calls can create multiple clients and overwrite `this.client`, leaking connections and duplicating LISTEN setup.</comment>
<file context>
@@ -0,0 +1,277 @@
+ }
+
+ async start(): Promise<void> {
+ if (this.running) return
+ if (!isNodeRuntime()) {
+ throw new Error('PostgresEventBridge is only supported in Node.js runtimes')
</file context>
| # "...": "..." | ||
| # } | ||
|
|
||
| restored = EventHandler.from_json_dict(payload, handler=real_handler) |
There was a problem hiding this comment.
P3: Python docs reference EventHandler.from_json_dict(...), but no such factory exists in the implementation, so the example will fail.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At docs/api/eventhandler.mdx, line 59:
<comment>Python docs reference `EventHandler.from_json_dict(...)`, but no such factory exists in the implementation, so the example will fail.</comment>
<file context>
@@ -0,0 +1,86 @@
+# "...": "..."
+# }
+
+restored = EventHandler.from_json_dict(payload, handler=real_handler)
+```
+
</file context>
|
|
||
| ## Await semantics | ||
|
|
||
| Awaiting an `EventResult` resolves to handler return value or raises captured failure. |
There was a problem hiding this comment.
P3: The new await semantics statement is misleading for TypeScript: EventResult is not awaitable in the TS implementation and the TS example uses entry.result. Consider scoping the await semantics to Python or clarifying the TS behavior to avoid incorrect expectations.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At docs/api/eventresult.mdx, line 22:
<comment>The new await semantics statement is misleading for TypeScript: `EventResult` is not awaitable in the TS implementation and the TS example uses `entry.result`. Consider scoping the await semantics to Python or clarifying the TS behavior to avoid incorrect expectations.</comment>
<file context>
@@ -0,0 +1,113 @@
+
+## Await semantics
+
+Awaiting an `EventResult` resolves to handler return value or raises captured failure.
+
+<Tabs>
</file context>
| result_items.push({ type: 'result', result }) | ||
| } | ||
| const children = parent_to_children.get(event.event_id) ?? [] | ||
| const printed_child_ids = new Set<string>(event.event_results.size > 0 ? event.event_results.keys() : []) |
There was a problem hiding this comment.
P3: printed_child_ids is initialized from event_results keys (handler IDs) but compared against child.event_id, so the set never matches child events and the intended de-duplication is ineffective.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At bubus-ts/src/logging.ts, line 116:
<comment>printed_child_ids is initialized from event_results keys (handler IDs) but compared against child.event_id, so the set never matches child events and the intended de-duplication is ineffective.</comment>
<file context>
@@ -0,0 +1,247 @@
+ result_items.push({ type: 'result', result })
+ }
+ const children = parent_to_children.get(event.event_id) ?? []
+ const printed_child_ids = new Set<string>(event.event_results.size > 0 ? event.event_results.keys() : [])
+ for (const child of children) {
+ if (!printed_child_ids.has(child.event_id) && !child.event_emitted_by_handler_id) {
</file context>
| bus.on(SomeEvent, some_function) | ||
| await bus.emit(SomeEvent({some_data: 132})) |
There was a problem hiding this comment.
P3: The README intro example registers an undefined handler (some_function) and constructs SomeEvent with invalid Python syntax, so the snippet will not run as shown.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At README.md, line 22:
<comment>The README intro example registers an undefined handler (`some_function`) and constructs `SomeEvent` with invalid Python syntax, so the snippet will not run as shown.</comment>
<file context>
@@ -1,21 +1,53 @@
+def handle_some_event(event: SomeEvent):
+ print('hi!')
+
+bus.on(SomeEvent, some_function)
+await bus.emit(SomeEvent({some_data: 132}))
+# "hi!""
</file context>
| bus.on(SomeEvent, some_function) | |
| await bus.emit(SomeEvent({some_data: 132})) | |
| bus.on(SomeEvent, handle_some_event) | |
| await bus.emit(SomeEvent(some_data=132)) |
Summary by cubic
Removed legacy _ts fields and standardized how handler results are created and updated for clearer, consistent event processing. Also added a full TypeScript runtime with cross-runtime bridges, new handler APIs, and CI to test Python and TypeScript side by side.
New Features
Refactors
Written for commit 10d07a8. Summary will update on new commits.