This document describes the implementation plan for the extensibility hook system specified in docs/dev/hook_system.md. The implementation uses CPEX as an optional external dependency for core plumbing, while all Mellea-specific types — hook enums, payload models, and the plugin base class — are owned by Mellea under a new mellea/plugins/ subpackage.
The primary developer-facing API is the @hook decorator, the Plugin base class, and programmatic registration (register(), PluginSet). YAML configuration is supported as a secondary mechanism for deployment-time overrides. Plugins work identically whether invoked through a session or via the functional API (instruct(backend, context, ...)).
mellea/plugins/
├── __init__.py # Public API: Plugin, hook, block, PluginSet, register, plugin_scope
├── manager.py # Singleton wrapper + session-tag filtering
├── base.py # Plugin base class, MelleaBasePayload, MelleaPlugin (advanced)
├── types.py # HookType enum, PluginMode enum + hook registration
├── policies.py # HookPayloadPolicy table + DefaultHookPolicy for Mellea hooks
├── context.py # Plugin context factory helper
├── decorators.py # @hook decorator implementation
├── pluginset.py # PluginSet class
├── registry.py # register(), block(), plugin_scope() + global/session dispatch logic
└── hooks/
├── __init__.py # Re-exports all payload classes
├── session.py # session lifecycle payloads
├── component.py # component lifecycle payloads
├── generation.py # generation pipeline payloads
├── validation.py # validation payloads
├── sampling.py # sampling pipeline payloads
├── tool.py # tool execution payloads
├── adapter.py # adapter operation payloads
├── context_ops.py # context operation payloads
└── error.py # error handling payload
The following types from cpex.framework form the plumbing layer. Mellea uses these but does not import any CPEX-specific hook types (prompts, tools, resources, agents, http).
| Type | Role |
|---|---|
Plugin |
ABC base class. __init__(config: PluginConfig), initialize(), shutdown(). Hook methods discovered by convention (method name = hook type) or @hook() decorator. Signature: async def hook_name(self, payload, context) -> PluginResult. |
PluginManager |
Borg singleton. __init__(config_path, timeout, observability, hook_policies). Key methods: invoke_hook(hook_type, payload, global_context, ...) -> (PluginResult, PluginContextTable), has_hooks_for(hook_type) -> bool, initialize(), shutdown(). The hook_policies parameter accepts a dict[str, HookPayloadPolicy] mapping hook types to their writable-field policies. |
PluginPayload |
Base type for all hook payloads. Frozen Pydantic BaseModel (ConfigDict(frozen=True)). Plugins use model_copy(update={...}) to propose modifications. |
PluginResult[T] |
Generic result: continue_processing: bool, `modified_payload: T |
PluginViolation |
reason, description, code, details. |
PluginConfig |
name, kind, hooks, mode, priority, conditions, config, ... |
PluginMode |
SEQUENTIAL, CONCURRENT, AUDIT, FIRE_AND_FORGET, DISABLED. |
PluginContext |
state: dict, global_context: GlobalContext, metadata: dict. |
HookPayloadPolicy |
Frozen dataclass with writable_fields: frozenset[str]. Defines which payload fields plugins may modify for a given hook type. |
DefaultHookPolicy |
Enum: ALLOW (accept all modifications), DENY (reject all modifications). Controls behavior for hooks without an explicit policy. |
apply_policy() |
apply_policy(original, modified, policy) -> BaseModel | None. Accepts only changes to writable fields via model_copy(update=...), discarding unauthorized changes. Returns None if no effective changes. |
HookRegistry |
get_hook_registry(), register_hook(hook_type, payload_class, result_class), is_registered(hook_type). |
@hook decorator |
@hook("hook_type") or @hook("hook_type", PayloadType, ResultType) for custom method names. |
classDiagram
direction TB
%% Core Plugin Classes
class Plugin {
<<abstract>>
+__init__(config: PluginConfig)
+initialize()* async
+shutdown()* async
+hook_name(payload, context)* async PluginResult
}
class PluginManager {
<<singleton>>
-config_path: str
-timeout: int
-observability: Any
-hook_registry: HookRegistry
-hook_policies: dict~str, HookPayloadPolicy~
+__init__(config_path, timeout, observability, hook_policies)
+invoke_hook(hook_type, payload, global_context, ...) tuple~PluginResult, PluginContextTable~
+has_hooks_for(hook_type: str) bool
+initialize() async
+shutdown() async
}
%% Configuration
class PluginConfig {
+name: str
+kind: str
+hooks: list~str~
+mode: PluginMode
+priority: int
+conditions: dict
+config: dict
}
class PluginMode {
<<enumeration>>
SEQUENTIAL
CONCURRENT
AUDIT
FIRE_AND_FORGET
DISABLED
}
%% Policy
class HookPayloadPolicy {
<<frozen dataclass>>
+writable_fields: frozenset~str~
}
class DefaultHookPolicy {
<<enumeration>>
ALLOW
DENY
}
%% Payload & Result
class PluginPayload {
<<type alias>>
pydantic.BaseModel
model_config: frozen=True
}
class PluginResult~T~ {
+continue_processing: bool
+modified_payload: T | None
+violation: PluginViolation | None
+metadata: dict
}
class PluginViolation {
+reason: str
+description: str
+code: str
+details: dict
}
%% Context
class PluginContext {
+state: dict
+global_context: GlobalContext
+metadata: dict
}
class hook {
<<decorator>>
+__call__(hook_type: str)
+__call__(hook_type, PayloadType, ResultType)
}
%% Relationships
Plugin --> PluginConfig : configured by
Plugin ..> PluginPayload : receives
Plugin ..> PluginResult : returns
Plugin ..> PluginContext : receives
Plugin ..> hook : decorated by
PluginManager --> Plugin : manages 0..*
PluginManager --> HookPayloadPolicy : enforces per hook
PluginConfig --> PluginMode : has
PluginResult --> PluginViolation : may contain
PluginResult --> PluginPayload : wraps modified
Plugins can also be configured via YAML as a secondary mechanism. Programmatic registration via @hook, Plugin subclasses, and register() is the primary approach.
plugins:
- name: content-policy
kind: mellea.plugins.examples.ContentPolicyPlugin
hooks:
- component_pre_create
- generation_post_call
mode: sequential
priority: 10
config:
blocked_terms: ["term1", "term2"]
- name: telemetry
kind: mellea.plugins.examples.TelemetryPlugin
hooks:
- component_post_success
- sampling_loop_end
mode: fire_and_forget
priority: 100
config:
endpoint: "https://telemetry.example.com"Mellea exposes its own @hook decorator and Plugin base class that translate to CPEX registrations internally. This serves two purposes:
- Mellea-aligned API: The
@hookdecorator accepts amodeparameter using Mellea's ownPluginModeenum (PluginMode.SEQUENTIAL,PluginMode.CONCURRENT,PluginMode.AUDIT,PluginMode.FIRE_AND_FORGET), which maps to CPEX's internalPluginModeenum. ThePluginbase class uses__init_subclass__keyword arguments (name=,priority=) for ergonomic metadata declaration. - Session tagging: Mellea's wrapper adds session-scoping metadata that CPEX's
PluginManagerdoes not natively support. Themanager.pylayer filters hooks at dispatch time based on session tags.
Users never import from cpex.framework directly.
A single HookType(str, Enum) containing all hook types. String-based values for compatibility with CPEX's invoke_hook(hook_type: str, ...).
class HookType(str, Enum):
# Session Lifecycle
SESSION_PRE_INIT = "session_pre_init"
SESSION_POST_INIT = "session_post_init"
SESSION_RESET = "session_reset"
SESSION_CLEANUP = "session_cleanup"
# Component Lifecycle
COMPONENT_PRE_CREATE = "component_pre_create"
COMPONENT_POST_CREATE = "component_post_create"
COMPONENT_PRE_EXECUTE = "component_pre_execute"
COMPONENT_POST_SUCCESS = "component_post_success"
COMPONENT_POST_ERROR = "component_post_error"
# Generation Pipeline
GENERATION_PRE_CALL = "generation_pre_call"
GENERATION_POST_CALL = "generation_post_call"
GENERATION_STREAM_CHUNK = "generation_stream_chunk"
# Validation
VALIDATION_PRE_CHECK = "validation_pre_check"
VALIDATION_POST_CHECK = "validation_post_check"
# Sampling Pipeline
SAMPLING_LOOP_START = "sampling_loop_start"
SAMPLING_ITERATION = "sampling_iteration"
SAMPLING_REPAIR = "sampling_repair"
SAMPLING_LOOP_END = "sampling_loop_end"
# Tool Execution
TOOL_PRE_INVOKE = "tool_pre_invoke"
TOOL_POST_INVOKE = "tool_post_invoke"
# Backend Adapter Ops
ADAPTER_PRE_LOAD = "adapter_pre_load"
ADAPTER_POST_LOAD = "adapter_post_load"
ADAPTER_PRE_UNLOAD = "adapter_pre_unload"
ADAPTER_POST_UNLOAD = "adapter_post_unload"
# Context Operations
CONTEXT_UPDATE = "context_update"
CONTEXT_PRUNE = "context_prune"
# Error Handling
ERROR_OCCURRED = "error_occurred"All Mellea hook payloads inherit from this base, which extends PluginPayload with the common fields from the hook system spec (Section 2):
class MelleaBasePayload(PluginPayload):
"""Frozen base — all payloads are immutable by design.
Plugins must use ``model_copy(update={...})`` to propose modifications
and return the copy via ``PluginResult.modified_payload``. The plugin
manager applies the hook's ``HookPayloadPolicy`` to filter changes to
writable fields only.
"""
model_config = ConfigDict(frozen=True, arbitrary_types_allowed=True)
session_id: str | None = None
request_id: str
timestamp: datetime = Field(default_factory=datetime.utcnow)
hook: str
user_metadata: dict[str, Any] = Field(default_factory=dict)frozen=True prevents in-place mutations — attribute assignment on a payload instance raises FrozenModelError. arbitrary_types_allowed=True is required because payloads include non-serializable Mellea objects (Backend, Context, Component, ModelOutputThunk). This means external plugins cannot receive these payloads directly; they are designed for native in-process plugins.
A register_mellea_hooks() function registers all hook types with CPEX HookRegistry. Called once during plugin initialization. Idempotent via is_registered() check.
def register_mellea_hooks() -> None:
registry = get_hook_registry()
for hook_type, (payload_cls, result_cls) in _HOOK_REGISTRY.items():
if not registry.is_registered(hook_type):
registry.register_hook(hook_type, payload_cls, result_cls)CPEX provides a generic GlobalContext with a state: dict. The build_global_context factory populates it with lightweight, cross-cutting ambient metadata. Hook-specific data (context, session, action, etc.) belongs on the typed payload, not here.
def build_global_context(
*,
backend: Backend | None = None,
**extra_fields,
) -> GlobalContext:
state: dict[str, Any] = {}
if backend is not None:
state["backend_name"] = getattr(backend, "model_id", "unknown")
state.update(extra_fields)
return GlobalContext(request_id="", state=state)Design note: Previously,
context,session, and the fullbackendobject were stored inGlobalContext.state, duplicating data already present on typed payloads. This was removed to eliminate the duplication footgun — plugins should access domain objects through the typed, policy-controlled payload fields.
Plugin is the primary base class for multi-hook Mellea plugins. It is one of two ways to define plugins, alongside @hook on standalone functions (simplest). Plugin provides automatic context-manager support and plugin metadata via __init_subclass__ keyword arguments.
class Plugin:
"""Base class for multi-hook Mellea plugins.
Subclasses get automatic context-manager support and plugin metadata::
class PIIRedactor(Plugin, name="pii-redactor", priority=5):
@hook(HookType.COMPONENT_PRE_CREATE, mode=PluginMode.SEQUENTIAL)
async def redact_input(self, payload, ctx):
...
with PIIRedactor():
result = session.instruct("...")
"""
def __init_subclass__(
cls, *, name: str = "", priority: int = 50, **kwargs: Any
) -> None:
"""Set plugin metadata on subclasses that provide a ``name``."""
super().__init_subclass__(**kwargs)
if name:
cls._mellea_plugin_meta = PluginMeta(name=name, priority=priority)
def __enter__(self) -> Any: ...
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
async def __aenter__(self) -> Any: ...
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...Plugin has no cpex dependency and does not require lifecycle hooks. All context-manager methods delegate to _plugin_cm_enter/_plugin_cm_exit module-level helpers that handle UUID scope generation and registration/deregistration.
MelleaPlugin extends CPEX's Plugin with typed context accessor helpers. Use this only when you need cpex lifecycle hooks (initialize/shutdown) or typed context accessors. For all other cases, prefer the lighter-weight Plugin base class.
class MelleaPlugin(_CpexPlugin):
"""Advanced base class for Mellea plugins with lifecycle hooks and typed accessors."""
def get_backend_name(self, context: PluginContext) -> str | None:
return context.global_context.state.get("backend_name")
@property
def plugin_config(self) -> dict[str, Any]:
return self._config.config or {}MelleaPlugin is not part of the primary public API and is not exported from mellea.plugins. Users who need it import it directly from mellea.plugins.base. CPEX's initialize() and shutdown() suffice — no new abstract methods are added.
The @hook decorator works on both standalone async functions and Plugin subclass methods:
@dataclass(frozen=True)
class HookMeta:
hook_type: str
mode: PluginMode = PluginMode.SEQUENTIAL
priority: int = 50
def hook(
hook_type: str,
*,
mode: PluginMode = PluginMode.SEQUENTIAL,
priority: int = 50,
) -> Callable:
"""Register an async function or method as a hook handler."""
def decorator(fn):
import inspect
if not inspect.iscoroutinefunction(fn):
raise TypeError(
f"@hook-decorated function {fn.__qualname__!r} must be async "
f"(defined with 'async def'), got a regular function."
)
fn._mellea_hook_meta = HookMeta(
hook_type=hook_type,
mode=mode,
priority=priority,
)
return fn
return decoratorThe mode parameter uses Mellea's PluginMode enum, which maps to CPEX's internal PluginMode:
PluginMode.SEQUENTIAL: Hook is awaited inline, executed serially. Violations halt the chain.PluginMode.CONCURRENT: Hook is awaited inline, but run concurrently with otherconcurrenthooks. Violations are honored.PluginMode.AUDIT: Hook is awaited inline. Violations are logged but execution continues.PluginMode.FIRE_AND_FORGET: Hook is dispatched as a backgroundasyncio.create_task(). Result is ignored. This is handled by CPEX'sPluginManagerdispatch logic.
When used on a standalone function, the metadata is read at register() time or when passed to start_session(plugins=[...]). When used on a Plugin subclass method, it is discovered during class registration via _ClassPluginAdapter introspection.
Plugin metadata is attached to Plugin subclasses automatically via __init_subclass__:
@dataclass(frozen=True)
class PluginMeta:
name: str
priority: int = 50When a class inherits from Plugin with name= and/or priority= keyword arguments, __init_subclass__ stores a PluginMeta instance as cls._mellea_plugin_meta. On registration, all methods with _mellea_hook_meta are discovered and registered as hook handlers bound to the instance. Methods without @hook are ignored.
A named, composable group of hook functions and plugin instances:
class PluginSet:
def __init__(
self,
name: str,
items: list[Callable | Any | "PluginSet"],
*,
priority: int | None = None,
):
self.name = name
self.items = items
self.priority = priority
def flatten(self) -> list[tuple[Callable | Any, int | None]]:
"""Recursively flatten nested PluginSets into (item, priority_override) pairs."""
result = []
for item in self.items:
if isinstance(item, PluginSet):
result.extend(item.flatten())
else:
result.append((item, self.priority))
return resultPluginSets are inert containers — they do not register anything themselves. Registration happens when they are passed to register() or start_session(plugins=[...]).
Convenience function for returning a blocking result from a hook:
def block(
reason: str,
*,
code: str = "",
description: str = "",
details: dict[str, Any] | None = None,
) -> PluginResult:
return PluginResult(
continue_processing=False,
violation=PluginViolation(
reason=reason,
description=description or reason,
code=code,
details=details or {},
),
)Defines the concrete per-hook-type policies for Mellea hooks. These are injected into the PluginManager at initialization time via the hook_policies parameter.
from cpex.framework.hooks.policies import HookPayloadPolicy
MELLEA_HOOK_PAYLOAD_POLICIES: dict[str, HookPayloadPolicy] = {
# Session Lifecycle
"session_pre_init": HookPayloadPolicy(
writable_fields=frozenset({"model_id", "model_options"}),
),
# session_post_init, session_reset, session_cleanup: observe-only (no entry)
# Component Lifecycle
"component_pre_create": HookPayloadPolicy(
writable_fields=frozenset({
"description", "images", "requirements", "icl_examples",
"grounding_context", "user_variables", "prefix", "template_id",
}),
),
"component_post_create": HookPayloadPolicy(
writable_fields=frozenset({"component"}),
),
"component_pre_execute": HookPayloadPolicy(
writable_fields=frozenset({
"requirements", "model_options", "format",
"strategy", "tool_calls_enabled",
}),
),
# component_post_success, component_post_error: observe-only
# Generation Pipeline
"generation_pre_call": HookPayloadPolicy(
writable_fields=frozenset({"model_options", "format", "tool_calls"}),
),
# generation_post_call: observe-only
"generation_stream_chunk": HookPayloadPolicy(
writable_fields=frozenset({"chunk", "accumulated"}),
),
# Validation
"validation_pre_check": HookPayloadPolicy(
writable_fields=frozenset({"requirements", "model_options"}),
),
"validation_post_check": HookPayloadPolicy(
writable_fields=frozenset({"results", "all_validations_passed"}),
),
# Sampling Pipeline
"sampling_loop_start": HookPayloadPolicy(
writable_fields=frozenset({"loop_budget"}),
),
# sampling_iteration, sampling_repair, sampling_loop_end: observe-only
# Tool Execution
"tool_pre_invoke": HookPayloadPolicy(
writable_fields=frozenset({"model_tool_call"}),
),
"tool_post_invoke": HookPayloadPolicy(
writable_fields=frozenset({"tool_output"}),
),
# adapter_*, context_*, error_occurred: observe-only (no entry)
}Hooks absent from this table are observe-only. With DefaultHookPolicy.DENY (the Mellea default), any modification attempt on an observe-only hook is rejected with a warning log.
The PluginManager is lazily initialized on first use (either via register() or start_session(plugins=[...])). A config path is no longer required — code-first registration may be the only path.
_plugin_manager: PluginManager | None = None
_plugins_enabled: bool = False
_session_tags: dict[str, set[str]] = {} # session_id -> set of plugin keys
def has_plugins() -> bool:
"""Fast check: are plugins configured and available?"""
return _plugins_enabled
def get_plugin_manager() -> PluginManager | None:
"""Returns the initialized PluginManager, or None if plugins are not configured."""
return _plugin_manager
def ensure_plugin_manager() -> PluginManager:
"""Lazily initialize the PluginManager if not already created."""
global _plugin_manager, _plugins_enabled
if _plugin_manager is None:
register_mellea_hooks()
pm = PluginManager(
"",
timeout=5,
hook_policies=MELLEA_HOOK_PAYLOAD_POLICIES,
)
_run_async_in_thread(pm.initialize())
_plugin_manager = pm
_plugins_enabled = True
return _plugin_manager
async def initialize_plugins(
config_path: str | None = None, *, timeout: float = 5.0
) -> PluginManager:
"""Initialize the PluginManager with Mellea hook registrations and optional YAML config."""
global _plugin_manager, _plugins_enabled
register_mellea_hooks()
pm = PluginManager(
config_path or "",
timeout=int(timeout),
hook_policies=MELLEA_HOOK_PAYLOAD_POLICIES,
)
await pm.initialize()
_plugin_manager = pm
_plugins_enabled = True
return pm
async def shutdown_plugins() -> None:
"""Shut down the PluginManager."""
global _plugin_manager, _plugins_enabled, _session_tags
if _plugin_manager is not None:
await _plugin_manager.shutdown()
_plugin_manager = None
_plugins_enabled = False
_session_tags.clear()All hook call sites use this single function. Three layers of no-op guards ensure zero overhead when plugins are not configured:
_plugins_enabledboolean — module-level, a single pointer dereferencehas_hooks_for(hook_type)— skips invocation when no plugin subscribes to this hook- Returns
(None, original_payload)immediately when either guard fails
The wrapper accepts only backend and optional **context_fields as kwargs. Domain-specific data (context, session, action, etc.) belongs on the typed payload, not on the invocation kwargs.
async def invoke_hook(
hook_type: HookType,
payload: MelleaBasePayload,
*,
backend: Backend | None = None,
**context_fields,
) -> tuple[PluginResult | None, MelleaBasePayload]:
"""Invoke a hook if plugins are configured.
Returns (result, possibly-modified-payload).
If plugins are not configured, returns (None, original_payload) immediately.
"""
if not _plugins_enabled or _plugin_manager is None:
return None, payload
if not _plugin_manager.has_hooks_for(hook_type.value):
return None, payload
# Payloads are frozen — use model_copy to set dispatch-time fields
updates: dict[str, Any] = {"hook": hook_type.value}
payload = payload.model_copy(update=updates)
global_ctx = build_global_context(backend=backend, **context_fields)
result, _ = await _plugin_manager.invoke_hook(
hook_type=hook_type.value,
payload=payload,
global_context=global_ctx,
violations_as_exceptions=False,
)
modified = result.modified_payload if result and result.modified_payload else payload
return result, modifiedstart_session() in mellea/stdlib/session.py gains an optional plugins keyword parameter:
def start_session(
...,
plugins: list[Callable | Any | PluginSet] | None = None,
) -> MelleaSession:When plugins is provided, start_session() registers each item with the session's ID via register(items, session_id=session.id). These plugins fire only within this session, in addition to any globally registered plugins. They are automatically deregistered when the session is cleaned up (at session_cleanup).
Both plugin forms — standalone @hook functions and Plugin subclass instances — plus PluginSet support the Python context manager protocol for block-scoped activation. This is a fourth registration scope complementing global, session-scoped, and YAML-configured plugins.
With-block scopes reuse the existing session_id tagging infrastructure from section 4.3. Each with entry generates a fresh UUID scope ID, registers plugins with that scope ID, and deregisters them by scope ID on exit. The _session_tags dict in manager.py tracks these scope IDs alongside session IDs — the manager makes no distinction between them at dispatch time.
A _PluginScope internal class and plugin_scope() public factory serve as the universal entry point, accepting any mix of standalone functions, Plugin instances, and PluginSets:
class _PluginScope:
"""Context manager that activates a set of plugins for a block of code."""
def __init__(self, items: list[Callable | Any | PluginSet]) -> None:
self._items = items
self._scope_id: str | None = None
def _activate(self) -> None:
self._scope_id = str(uuid.uuid4())
register(self._items, session_id=self._scope_id)
def _deactivate(self) -> None:
if self._scope_id is not None:
deregister_session_plugins(self._scope_id)
self._scope_id = None
def __enter__(self) -> _PluginScope:
self._activate()
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self._deactivate()
async def __aenter__(self) -> _PluginScope:
self._activate()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
self._deactivate()
def plugin_scope(*items: Callable | Any | PluginSet) -> _PluginScope:
"""Create a context manager that activates the given plugins for a block of code."""
return _PluginScope(list(items))The Plugin base class provides __enter__, __exit__, __aenter__, __aexit__ directly. The context-manager methods delegate to module-level helpers:
def _plugin_cm_enter(self: Any) -> Any:
if getattr(self, "_scope_id", None) is not None:
meta = getattr(type(self), "_mellea_plugin_meta", None)
plugin_name = meta.name if meta else type(self).__name__
raise RuntimeError(
f"Plugin {plugin_name!r} is already active as a context manager. "
"Concurrent or nested reuse of the same instance is not supported; "
"create a new instance instead."
)
self._scope_id = str(uuid.uuid4())
register(self, session_id=self._scope_id)
return self
def _plugin_cm_exit(self: Any, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
scope_id = getattr(self, "_scope_id", None)
if scope_id is not None:
deregister_session_plugins(scope_id)
self._scope_id = None
class Plugin:
def __init_subclass__(cls, *, name: str = "", priority: int = 50, **kwargs):
super().__init_subclass__(**kwargs)
if name:
cls._mellea_plugin_meta = PluginMeta(name=name, priority=priority)
def __enter__(self) -> Any:
return _plugin_cm_enter(self)
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
_plugin_cm_exit(self, exc_type, exc_val, exc_tb)
async def __aenter__(self) -> Any:
return self.__enter__()
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
self.__exit__(exc_type, exc_val, exc_tb)PluginSet gains the same context manager protocol using the same UUID scope ID pattern:
def __enter__(self) -> PluginSet:
if self._scope_id is not None:
raise RuntimeError(
f"PluginSet {self.name!r} is already active as a context manager. "
"Create a new instance to use in a separate scope."
)
self._scope_id = str(uuid.uuid4())
register(self, session_id=self._scope_id)
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
if self._scope_id is not None:
deregister_session_plugins(self._scope_id)
self._scope_id = None
async def __aenter__(self) -> PluginSet:
return self.__enter__()
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
self.__exit__(exc_type, exc_val, exc_tb)MelleaPlugin also supports the context manager protocol. Because MelleaPlugin subclasses CPEX Plugin (which owns __init__), the scope ID is stored as an instance attribute accessed via getattr with a default rather than declared in __init__:
def __enter__(self) -> MelleaPlugin:
if getattr(self, "_scope_id", None) is not None:
raise RuntimeError(
f"MelleaPlugin {self.name!r} is already active as a context manager. "
"Create a new instance to use in a separate scope."
)
self._scope_id = str(uuid.uuid4())
register(self, session_id=self._scope_id)
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
scope_id = getattr(self, "_scope_id", None)
if scope_id is not None:
deregister_session_plugins(scope_id)
self._scope_id = None
async def __aenter__(self) -> MelleaPlugin:
return self.__enter__()
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
self.__exit__(exc_type, exc_val, exc_tb)A deregister_session_plugins(scope_id) function removes all plugins tagged with a given scope ID from the PluginManager and cleans up the _session_tags entry. This is the same function used by session_cleanup to deregister session-scoped plugins:
def deregister_session_plugins(session_id: str) -> None:
"""Deregister all plugins associated with a given session or scope ID."""
pm = _plugin_manager
if pm is None:
return
plugin_keys = _session_tags.pop(session_id, set())
for key in plugin_keys:
pm.deregister(key)plugin_scope is exported from mellea.plugins:
from mellea.plugins import plugin_scopeAll three forms (plugin_scope, Plugin subclass instance, PluginSet) support both with and async with. MelleaPlugin instances also support the protocol for advanced use cases. The same-instance re-entrant restriction applies to all forms: attempting to re-enter an already-active instance raises RuntimeError. Create separate instances to activate the same plugin logic in nested or concurrent scopes.
Add to pyproject.toml under [project.optional-dependencies]:
plugins = ["contextforge-plugin-framework>=0.1.0"]All imports in mellea/plugins/ are guarded with try/except ImportError.
Global registration happens via register() at application startup:
def register(
items: Callable | Any | PluginSet | list[Callable | Any | PluginSet],
*,
session_id: str | None = None,
) -> None:
"""Register plugins globally or for a specific session.
When session_id is None, plugins are global (fire for all invocations).
When session_id is provided, plugins fire only within that session.
Accepts standalone @hook functions, Plugin subclass instances,
MelleaPlugin instances, PluginSets, or lists thereof.
"""
pm = ensure_plugin_manager()
if not isinstance(items, list):
items = [items]
for item in items:
if isinstance(item, PluginSet):
for flattened_item, priority_override in item.flatten():
_register_single(pm, flattened_item, session_id, priority_override)
else:
_register_single(pm, item, session_id, None)
def _register_single(
pm: PluginManager,
item: Callable | Any,
session_id: str | None,
priority_override: int | None,
) -> None:
"""Register a single hook function or plugin instance.
- Standalone functions with _mellea_hook_meta: wrapped in _FunctionHookAdapter
- Plugin subclass instances (with _mellea_plugin_meta): methods with _mellea_hook_meta discovered and wrapped in _ClassPluginAdapter
- MelleaPlugin instances: registered directly with CPEX
"""
...A _FunctionHookAdapter internal class wraps a standalone @hook-decorated function into a CPEX Plugin for the PluginManager. A _ClassPluginAdapter wraps Plugin subclass instances similarly, discovering all @hook-decorated methods:
class _FunctionHookAdapter(Plugin):
"""Adapts a standalone @hook-decorated function into a CPEX Plugin."""
def __init__(self, fn: Callable, session_id: str | None = None):
meta = fn._mellea_hook_meta
config = PluginConfig(
name=fn.__qualname__,
kind=fn.__module__ + "." + fn.__qualname__,
hooks=[meta.hook_type],
mode=_map_mode(meta.mode),
priority=meta.priority,
)
super().__init__(config)
self._fn = fn
self._session_id = session_id
async def initialize(self):
pass
async def shutdown(self):
passSession context threading: All invoke_hook calls pass session_id when operating within a session. For the functional API path, session_id is None and only globally registered plugins are dispatched. Session-scoped plugins (registered via start_session(plugins=[...])) fire only when the dispatch context matches their session ID.
File: mellea/stdlib/session.py
start_session() gains the plugins parameter for session-scoped registration:
def start_session(
backend_name: ... = "ollama",
model_id: ... = IBM_GRANITE_4_MICRO_3B,
ctx: Context | None = None,
*,
model_options: dict | None = None,
plugins: list[Callable | Any | PluginSet] | None = None,
**backend_kwargs,
) -> MelleaSession:Session-scoped plugins passed via plugins=[...] are registered with this session's ID and deregistered at session_cleanup.
| Hook | Location | Trigger | Result Handling |
|---|---|---|---|
session_pre_init |
start_session(), before backend_class(model_id, ...) (~L163) |
Before backend instantiation | Supports payload modification: updated model_options, model_id. Violation blocks session creation. |
session_post_init |
start_session(), after MelleaSession(backend, ctx) (~L191) |
Session fully created | Observability-only. |
session_reset |
MelleaSession.reset(), before self.ctx.reset_to_new() (~L269) |
Context about to reset | Observability-only. |
session_cleanup |
MelleaSession.cleanup(), at top of method (~L272) |
Before teardown | Observability-only. Must not raise. Deregisters session-scoped plugins. |
Sync/async bridge: These are sync methods. Use _run_async_in_thread(invoke_hook(...)) from mellea/helpers/__init__.py.
Payload examples:
# session_pre_init
SessionPreInitPayload(
backend_name=backend_name,
model_id=str(model_id),
model_options=model_options,
context_type=type(ctx).__name__ if ctx else "SimpleContext",
)
# session_post_init
SessionPostInitPayload(session=session)
# session_cleanup
SessionCleanupPayload(
context=self.ctx,
interaction_count=len(self.ctx.as_list()),
)Files: mellea/stdlib/components/instruction.py (Instruction.__init__), mellea/stdlib/components/chat.py (Message.__init__), mellea/stdlib/functional.py (aact())
component_pre_createandcomponent_post_createare deferred. These hooks are not implemented in this iteration. See the discussion note at the end of this section.
| Hook | Location | Trigger | Result Handling |
|---|---|---|---|
component_pre_create |
(deferred — not implemented) | — | — |
component_post_create |
(deferred — not implemented) | — | — |
component_pre_execute |
aact(), at top before strategy branch (~L492) |
Before generation begins | Supports model_options, requirements, strategy, format, tool_calls_enabled modification. Violation blocks execution. |
component_post_success |
aact(), after result in both branches (~L506, ~L534) |
Successful execution | Observability-only. |
component_post_error |
aact(), in new try/except Exception wrapping the body |
Exception during execution | Observability-only. Always re-raises after hook. |
Key changes to aact():
- Add
time.monotonic()at entry for latency measurement - Wrap body (lines ~492–546) in
try/except Exception excepthandler: firecomponent_post_errorthenerror_occurred, then re-raise- Insert
component_post_successbefore eachreturnpath
Discussion — why
component_pre_createandcomponent_post_createare deferred:
Componentis currently aProtocol, not an abstract base class. Mellea has no ownership over component initialization: there are no guarantees about when or how subclass__init__methods run, and there is no single interception point that covers allComponentimplementations. Calling hooks manually insideInstruction.__init__andMessage.__init__works for those classes, but it is fragile (any user-definedComponentsubclass is invisible) and requires per-class boilerplate.If
Componentwere refactored to an abstract base class, the ABC could wrap__init__and fire these hooks generically for every subclass. Until that refactoring is decided, these hook types are excluded from the active hook system: they are not inHookType, have no payload classes, no registered call sites, and no tests. The hook names and payload designs remain documented here for when the refactoring occurs. Usecomponent_pre_executefor pre-execution interception in the meantime.
Sync/async bridge for PRE/POST_CREATE (relevant if/when these hooks are implemented): Instruction.__init__ and Message.__init__ are synchronous. Use _run_async_in_thread(invoke_hook(...)) from mellea/helpers/__init__.py. backend and context are None.
Payload examples:
# component_pre_create (Instruction case — inside Instruction.__init__, before attrs)
ComponentPreCreatePayload(
component_type="Instruction",
description=description,
images=images,
requirements=requirements,
icl_examples=icl_examples,
grounding_context=grounding_context,
user_variables=user_variables,
prefix=prefix,
)
# component_pre_execute
ComponentPreExecutePayload(
component_type=type(action).__name__,
action=action,
context=context,
requirements=requirements or [],
model_options=model_options or {},
format=format,
strategy_name=type(strategy).__name__ if strategy else None,
tool_calls_enabled=tool_calls,
)
# component_post_success
ComponentPostSuccessPayload(
component_type=type(action).__name__,
action=action,
result=result,
context_before=context,
context_after=new_ctx,
generate_log=result._generate_log,
sampling_results=sampling_result if strategy else None,
latency_ms=int((time.monotonic() - t0) * 1000),
)Approach: Backend.__init_subclass__ in mellea/core/backend.py automatically wraps generate_from_context in every concrete backend subclass that defines it. This avoids modifying all backend implementations (Ollama, OpenAI, HuggingFace, vLLM, Watsonx, LiteLLM) individually. No separate generate_from_context_with_hooks() method is added.
The wrapper is injected at class definition time and applies to all current and future Backend subclasses transparently.
generation_post_call timing: The post-call hook fires via an _on_computed callback set on the returned ModelOutputThunk:
- Lazy MOTs (normal path):
_on_computedfires insideModelOutputThunk.astreamafterpost_processcompletes, when the value is fully materialized.latency_msreflects the full wall-clock time from thegenerate_from_contextcall to value availability.model_outputreplacement is supported — the original MOT's output fields are updated in-place via_copy_from. - Already-computed MOTs (e.g. cached responses or test mocks):
astreamreturns early without firing_on_computed, so the hook is fired inline before the wrapper returns.model_outputreplacement is supported on this path.
Writable fields applied from generation_pre_call: model_options, format, tool_calls.
| Hook | Location | Trigger | Result Handling |
|---|---|---|---|
generation_pre_call |
Backend.__init_subclass__ wrapper, before generate_from_context delegate |
Before LLM API call | Supports model_options, format, tool_calls modification. Violation blocks (e.g., token budget exceeded). |
generation_post_call |
Via _on_computed callback on ModelOutputThunk (lazy path), or inline before return (already-computed path) |
After output fully materialized | Observability-only. |
generation_stream_chunk |
Deferred — requires hooks in ModelOutputThunk.astream() streaming path |
Per streaming chunk | Fire-and-forget to avoid slowing streaming. |
File: mellea/stdlib/functional.py, in avalidate() (~L699–753)
| Hook | Location | Trigger | Result Handling |
|---|---|---|---|
validation_pre_check |
After reqs prepared (~L713), before validation loop |
Before validation | Supports requirements list modification (inject/filter). |
validation_post_check |
After all validations, before return rvs (~L753) |
After validation | Supports results override. Primarily observability. |
Payload examples:
# validation_pre_check
ValidationPreCheckPayload(
requirements=reqs,
target=output,
context=context,
model_options=model_options or {},
)
# validation_post_check
ValidationPostCheckPayload(
requirements=reqs,
results=rvs,
all_validations_passed=all(bool(r) for r in rvs),
passed_count=sum(1 for r in rvs if bool(r)),
failed_count=sum(1 for r in rvs if not bool(r)),
)File: mellea/stdlib/sampling/base.py, in BaseSamplingStrategy.sample() (~L94–256)
| Hook | Location | Trigger | Result Handling |
|---|---|---|---|
sampling_loop_start |
Before for loop (~L157) |
Loop begins | Supports loop_budget modification. |
sampling_iteration |
Inside loop, after validation (~L192) | Each iteration | Observability. Violation can force early termination. |
sampling_repair |
After self.repair() call (~L224) |
Repair invoked | Observability-only. |
sampling_loop_end |
Before return in success (~L209) and failure (~L249) paths | Loop ends | Observability-only. |
Additional change: Add _get_repair_type() -> str method to each sampling strategy subclass:
| Strategy Class | Repair Type |
|---|---|
RejectionSamplingStrategy |
"identity" |
RepairTemplateStrategy |
"template_repair" |
MultiTurnStrategy |
"multi_turn_message" |
SOFAISamplingStrategy |
"sofai_feedback" |
Payload examples:
# sampling_loop_start
SamplingLoopStartPayload(
strategy_name=type(self).__name__,
action=action,
context=context,
requirements=reqs,
loop_budget=self.loop_budget,
)
# sampling_repair
SamplingRepairPayload(
repair_type=self._get_repair_type(),
failed_action=sampled_actions[-1],
failed_result=sampled_results[-1],
failed_validations=sampled_scores[-1],
repair_action=next_action,
repair_context=next_context,
repair_iteration=loop_count,
)File: mellea/stdlib/functional.py, in the _call_tools() helper (~L904)
| Hook | Location | Trigger | Result Handling |
|---|---|---|---|
tool_pre_invoke |
Before tool.call_func() (~L917) |
Before tool call | Supports model_tool_call replacement. Violation blocks tool call. |
tool_post_invoke |
After tool.call_func() (~L919) |
After tool call | Supports tool_output modification. Primarily observability. |
Files: mellea/backends/openai.py (load_adapter ~L907, unload_adapter ~L944), mellea/backends/huggingface.py (load_adapter ~L1192, unload_adapter ~L1224)
| Hook | Location | Trigger | Result Handling |
|---|---|---|---|
adapter_pre_load |
Start of load_adapter() |
Before adapter load | Violation prevents loading. |
adapter_post_load |
End of load_adapter() |
After adapter loaded | Observability. |
adapter_pre_unload |
Start of unload_adapter() |
Before adapter unload | Violation prevents unloading. |
adapter_post_unload |
End of unload_adapter() |
After adapter unloaded | Observability. |
Sync/async bridge: Adapter methods are synchronous. Use _run_async_in_thread(invoke_hook(...)).
Files: mellea/stdlib/context.py (ChatContext.add() ~L17, SimpleContext.add() ~L31)
| Hook | Location | Trigger | Result Handling |
|---|---|---|---|
context_update |
After from_previous() in add() |
Context appended | Observability-only (context is immutable). |
context_prune |
ChatContext.view_for_generation() when window truncates |
Context windowed | Observability-only. |
Performance note: context_update fires on every context addition, which is frequent. The has_hooks_for() guard is critical — when no plugin subscribes to context_update, the overhead is a single boolean check.
File: mellea/stdlib/functional.py (utility function callable from any error path)
| Hook | Location | Trigger | Result Handling |
|---|---|---|---|
error_occurred |
aact() except block + utility fire_error_hook() |
Unrecoverable error | Observability-only. Must never raise from own execution. |
Fires for: ComponentParseError, backend communication errors, assertion violations, unhandled exceptions during component execution, validation, or tool invocation.
Does NOT fire for: Validation failures within sampling loops (handled by sampling_iteration/sampling_repair), controlled PluginViolation blocks (those are policy decisions, not errors).
Utility function:
async def fire_error_hook(
error: Exception,
location: str,
*,
session=None, backend=None, context=None, action=None,
) -> None:
"""Fire the error_occurred hook. Never raises."""
try:
payload = ErrorOccurredPayload(
error=error,
error_type=type(error).__name__,
error_location=location,
stack_trace=traceback.format_exc(),
recoverable=False,
action=action,
)
await invoke_hook(
HookType.ERROR_OCCURRED, payload,
session=session, backend=backend, context=context,
violations_as_exceptions=False,
)
except Exception:
pass # Never propagate errors from error hook| File | Changes |
|---|---|
mellea/stdlib/components/instruction.py |
COMPONENT_PRE_CREATE (before attr assignment) and COMPONENT_POST_CREATE (after attr assignment) hooks in Instruction.__init__ |
mellea/stdlib/components/chat.py |
Same PRE/POST_CREATE hooks in Message.__init__ |
mellea/stdlib/functional.py |
Component execute/success/error hooks in aact(); validation and tool hooks |
mellea/stdlib/session.py |
4 session hooks + plugins param on start_session() + session-scoped plugin registration/deregistration |
mellea/stdlib/sampling/base.py |
4 sampling hooks |
mellea/core/backend.py |
Backend.__init_subclass__ wraps generate_from_context in all subclasses, injecting GENERATION_PRE_CALL and GENERATION_POST_CALL hooks; post-call hook fires via _on_computed callback on the returned ModelOutputThunk |
mellea/stdlib/context.py |
2 context operation hooks in ChatContext.add(), SimpleContext.add() |
mellea/backends/openai.py |
4 adapter hooks in load_adapter() / unload_adapter() |
mellea/backends/huggingface.py |
4 adapter hooks in load_adapter() / unload_adapter() |
pyproject.toml |
Add plugins optional dependency + plugins test marker |
mellea/plugins/__init__.py (new) |
Public API: Plugin, hook, block, PluginSet, register, plugin_scope, PluginMode, HookType, PluginResult, PluginViolationError |
mellea/plugins/decorators.py (new) |
@hook decorator implementation, HookMeta |
mellea/plugins/pluginset.py (new) |
PluginSet class with flatten() for recursive expansion; context manager protocol for with-block scoping |
mellea/plugins/registry.py (new) |
register(), block(), _FunctionHookAdapter, _ClassPluginAdapter, _register_single(); _PluginScope class and plugin_scope() factory for with-block scoping |
mellea/plugins/manager.py (new) |
Singleton wrapper, invoke_hook() with session-tag filtering, ensure_plugin_manager(); deregister_session_plugins() for scope cleanup (used by both session and with-block exit) |
mellea/plugins/base.py (new) |
Plugin base class (lightweight, no cpex dep) with __init_subclass__ and context manager protocol; PluginMeta; MelleaBasePayload (frozen); MelleaPlugin (advanced cpex-based class); PluginViolationError |
mellea/plugins/types.py (new) |
HookType enum, PluginMode enum, register_mellea_hooks() |
mellea/plugins/policies.py (new) |
MELLEA_HOOK_PAYLOAD_POLICIES table, injected into PluginManager at init |
mellea/plugins/context.py (new) |
build_global_context() factory |
mellea/plugins/hooks/ (new) |
Hook payload dataclasses (session, component, generation, etc.) |
test/plugins/ (new) |
Tests for plugins subpackage |
Note: + update docs and add examples.