Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
- There's a new `Oneshot` channel, which returns a sender and a receiver. A single message can be sent using the sender, after which it will be closed. And the receiver will close as soon as the message is received.

- `Sender`s now have an `aclose`, which must be called, when they are no-longer needed.

## Bug Fixes

Expand Down
20 changes: 18 additions & 2 deletions src/frequenz/channels/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
"""

from ._anycast import Anycast
from ._broadcast import Broadcast
from ._broadcast import Broadcast, BroadcastChannel
from ._exceptions import ChannelClosedError, ChannelError, Error
from ._generic import (
ChannelMessageT,
Expand All @@ -92,6 +92,7 @@
)
from ._latest_value_cache import LatestValueCache
from ._merge import Merger, merge
from ._oneshot import OneshotChannel, OneshotReceiver, OneshotSender
from ._receiver import Receiver, ReceiverError, ReceiverStoppedError
from ._select import (
Selected,
Expand All @@ -100,29 +101,44 @@
select,
selected_from,
)
from ._sender import Sender, SenderError
from ._sender import (
ClonableSender,
ClonableSubscribableSender,
Sender,
SenderClosedError,
SenderError,
SubscribableSender,
)

__all__ = [
"Anycast",
"Broadcast",
"BroadcastChannel",
"ChannelClosedError",
"ChannelError",
"ChannelMessageT",
"ClonableSender",
"ClonableSubscribableSender",
"Error",
"ErroredChannelT_co",
"LatestValueCache",
"MappedMessageT_co",
"Merger",
"OneshotChannel",
"OneshotReceiver",
"OneshotSender",
"Receiver",
"ReceiverError",
"ReceiverMessageT_co",
"ReceiverStoppedError",
"SelectError",
"Selected",
"Sender",
"SenderClosedError",
"SenderError",
"SenderMessageT_co",
"SenderMessageT_contra",
"SubscribableSender",
"UnhandledSelectedError",
"merge",
"select",
Expand Down
19 changes: 18 additions & 1 deletion src/frequenz/channels/_anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from ._exceptions import ChannelClosedError
from ._generic import ChannelMessageT
from ._receiver import Receiver, ReceiverStoppedError
from ._sender import Sender, SenderError
from ._sender import Sender, SenderClosedError, SenderError

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -327,6 +327,9 @@ def __init__(self, channel: Anycast[_T], /) -> None:
self._channel: Anycast[_T] = channel
"""The channel that this sender belongs to."""

self._closed: bool = False
"""Whether the sender is closed."""

@override
async def send(self, message: _T, /) -> None:
"""Send a message across the channel.
Expand All @@ -343,7 +346,11 @@ async def send(self, message: _T, /) -> None:
SenderError: If the underlying channel was closed.
A [ChannelClosedError][frequenz.channels.ChannelClosedError] is
set as the cause.
SenderClosedError: If this sender was closed.
"""
if self._closed:
raise SenderClosedError(self)

# pylint: disable=protected-access
if self._channel._closed:
raise SenderError("The channel was closed", self) from ChannelClosedError(
Expand All @@ -367,6 +374,16 @@ async def send(self, message: _T, /) -> None:
self._channel._recv_cv.notify(1)
# pylint: enable=protected-access

@override
async def aclose(self) -> None:
"""Close this sender.

After closing, the sender will not be able to send any more messages. Any
attempt to send a message through a closed sender will raise a
[SenderError][frequenz.channels.SenderError].
"""
self._closed = True

def __str__(self) -> str:
"""Return a string representation of this sender."""
return f"{self._channel}:{type(self).__name__}"
Expand Down
129 changes: 119 additions & 10 deletions src/frequenz/channels/_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
from ._exceptions import ChannelClosedError
from ._generic import ChannelMessageT
from ._receiver import Receiver, ReceiverStoppedError
from ._sender import Sender, SenderError
from ._sender import ClonableSubscribableSender, SenderClosedError, SenderError

_logger = logging.getLogger(__name__)


class Broadcast(Generic[ChannelMessageT]):
@deprecated("Please use BroadcastChannel channel instead.")
class Broadcast( # pylint: disable=too-many-instance-attributes
Generic[ChannelMessageT]
):
"""A channel that deliver all messages to all receivers.

# Description
Expand Down Expand Up @@ -184,7 +187,13 @@ async def main() -> None:
```
"""

def __init__(self, *, name: str, resend_latest: bool = False) -> None:
def __init__(
self,
*,
name: str,
resend_latest: bool = False,
auto_close: bool = False,
) -> None:
"""Initialize this channel.

Args:
Expand All @@ -197,6 +206,8 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None:
wait for the next message on the channel to arrive. It is safe to be
set in data/reporting channels, but is not recommended for use in
channels that stream control instructions.
auto_close: If True, the channel will be closed when all senders or all
receivers are closed.
"""
self._name: str = name
"""The name of the broadcast channel.
Expand All @@ -207,8 +218,11 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None:
self._recv_cv: Condition = Condition()
"""The condition to wait for data in the channel's buffer."""

self._sender_count: int = 0
"""The number of senders attached to this channel."""

self._receivers: dict[
int, weakref.ReferenceType[_Receiver[ChannelMessageT]]
int, weakref.ReferenceType[BroadcastReceiver[ChannelMessageT]]
] = {}
"""The receivers attached to the channel, indexed by their hash()."""

Expand All @@ -218,6 +232,9 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None:
self._latest: ChannelMessageT | None = None
"""The latest message sent to the channel."""

self._auto_close_enabled: bool = auto_close
"""Whether to close the channel when all senders or all receivers are closed."""

self.resend_latest: bool = resend_latest
"""Whether to resend the latest message to new receivers.

Expand Down Expand Up @@ -269,13 +286,13 @@ async def close(self) -> None: # noqa: D402
"""Close the channel, deprecated alias for `aclose()`.""" # noqa: D402
return await self.aclose()

def new_sender(self) -> Sender[ChannelMessageT]:
def new_sender(self) -> BroadcastSender[ChannelMessageT]:
"""Return a new sender attached to this channel."""
return _Sender(self)
return BroadcastSender(self)

def new_receiver(
self, *, name: str | None = None, limit: int = 50, warn_on_overflow: bool = True
) -> Receiver[ChannelMessageT]:
) -> BroadcastReceiver[ChannelMessageT]:
"""Return a new receiver attached to this channel.

Broadcast receivers have their own buffer, and when messages are not
Expand All @@ -291,7 +308,7 @@ def new_receiver(
Returns:
A new receiver attached to this channel.
"""
recv: _Receiver[ChannelMessageT] = _Receiver(
recv: BroadcastReceiver[ChannelMessageT] = BroadcastReceiver(
self, name=name, limit=limit, warn_on_overflow=warn_on_overflow
)
self._receivers[hash(recv)] = weakref.ref(recv)
Expand All @@ -317,7 +334,7 @@ def __repr__(self) -> str:
_T = TypeVar("_T")


class _Sender(Sender[_T]):
class BroadcastSender(ClonableSubscribableSender[_T]):
"""A sender to send messages to the broadcast channel.

Should not be created directly, but through the
Expand All @@ -334,6 +351,16 @@ def __init__(self, channel: Broadcast[_T], /) -> None:
self._channel: Broadcast[_T] = channel
"""The broadcast channel this sender belongs to."""

self._closed: bool = False
"""Whether this sender is closed."""

self._channel._sender_count += 1

@property
def sender_count(self) -> int:
"""Return the number of open senders attached to this sender's channel."""
return self._channel._sender_count # pylint: disable=protected-access

@override
async def send(self, message: _T, /) -> None:
"""Send a message to all broadcast receivers.
Expand All @@ -345,12 +372,19 @@ async def send(self, message: _T, /) -> None:
SenderError: If the underlying channel was closed.
A [ChannelClosedError][frequenz.channels.ChannelClosedError] is
set as the cause.
SenderClosedError: If this sender was closed.
"""
if self._closed:
raise SenderClosedError(self)
# pylint: disable=protected-access
if self._channel._closed:
raise SenderError("The channel was closed", self) from ChannelClosedError(
self._channel
)
if self._channel._auto_close_enabled and len(self._channel._receivers) == 0:
raise SenderError("The channel was closed", self) from ChannelClosedError(
self._channel
)
self._channel._latest = message
stale_refs = []
for _hash, recv_ref in self._channel._receivers.items():
Expand All @@ -365,6 +399,47 @@ async def send(self, message: _T, /) -> None:
self._channel._recv_cv.notify_all()
# pylint: enable=protected-access

@override
async def aclose(self) -> None:
"""Close this sender.

After a sender is closed, it can no longer be used to send messages. Any
attempt to send a message through a closed sender will raise a
[SenderClosedError][frequenz.channels.SenderClosedError].
"""
if self._closed:
return
self._closed = True
self._channel._sender_count -= 1

if (
self._channel._sender_count == 0 # pylint: disable=protected-access
and self._channel._auto_close_enabled # pylint: disable=protected-access
):
await self._channel.aclose()

def __del__(self) -> None:
"""Clean up this sender."""
if not self._closed:
self._channel._sender_count -= 1

@override
def clone(self) -> BroadcastSender[_T]:
"""Return a clone of this sender."""
return BroadcastSender(self._channel)

@override
def subscribe(
self,
name: str | None = None,
limit: int = 50,
warn_on_overflow: bool = True,
) -> BroadcastReceiver[_T]:
"""Return a new receiver attached to this sender's channel."""
return self._channel.new_receiver(
name=name, limit=limit, warn_on_overflow=warn_on_overflow
)

def __str__(self) -> str:
"""Return a string representation of this sender."""
return f"{self._channel}:{type(self).__name__}"
Expand All @@ -374,7 +449,7 @@ def __repr__(self) -> str:
return f"{type(self).__name__}({self._channel!r})"


class _Receiver(Receiver[_T]):
class BroadcastReceiver(Receiver[_T]):
"""A receiver to receive messages from the broadcast channel.

Should not be created directly, but through the
Expand Down Expand Up @@ -476,6 +551,11 @@ async def ready(self) -> bool:
while len(self._q) == 0:
if self._channel._closed or self._closed:
return False
if self._channel._auto_close_enabled and (
self._channel._sender_count == 0 or len(self._channel._receivers) == 0
):
await self._channel.aclose()
return False
async with self._channel._recv_cv:
await self._channel._recv_cv.wait()
return True
Expand Down Expand Up @@ -525,3 +605,32 @@ def __repr__(self) -> str:
f"{type(self).__name__}(name={self._name!r}, limit={limit!r}, "
f"{self._channel!r}):<id={id(self)!r}, used={len(self._q)!r}>"
)


class BroadcastChannel(
tuple[BroadcastSender[ChannelMessageT], BroadcastReceiver[ChannelMessageT]]
):
"""A broadcast channel, deprecated alias for Broadcast."""

def __new__(
cls,
name: str,
resend_latest: bool = False,
limit: int = 50,
warn_on_overflow: bool = True,
) -> BroadcastChannel[ChannelMessageT]:
"""Create a new broadcast channel, deprecated alias for Broadcast."""
channel = Broadcast[ChannelMessageT](
name=name, resend_latest=resend_latest, auto_close=True
)
return tuple.__new__(
cls,
(
channel.new_sender(),
channel.new_receiver(
name=f"{name}_receiver",
limit=limit,
warn_on_overflow=warn_on_overflow,
),
),
)
Loading