Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@
## Bug Fixes

- Improved formula validation: Consistent error messages for invalid formulas and conventional span semantics.

- This fixes a rare power distributor bug where some battery inverters becoming unreachable because of network outages would lead to excess power values getting set. This is fixed by measuring the power of the unreachable inverters through their fallback meters and excluding that power from what is distributed to the other inverters.
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@
from frequenz.client.common.microgrid.components import ComponentId
from frequenz.client.microgrid import ApiClientError, OperationOutOfRange
from frequenz.client.microgrid.component import Battery, Inverter
from frequenz.client.microgrid.metrics import Metric
from frequenz.quantities import Power
from typing_extensions import override

from ....timeseries import Sample
from ... import connection_manager
from ..._old_component_data import BatteryData, InverterData
from .._component_pool_status_tracker import ComponentPoolStatusTracker
from .._component_status import BatteryStatusTracker, ComponentPoolStatus
from .._distribution_algorithm import (
AggregatedBatteryData,
BatteryComponentsData,
BatteryDistributionAlgorithm,
DistributionResult,
InvBatPair,
Expand Down Expand Up @@ -160,6 +163,20 @@ def __init__(
self._battery_caches: dict[ComponentId, LatestValueCache[BatteryData]] = {}
self._inverter_caches: dict[ComponentId, LatestValueCache[InverterData]] = {}

self._unreachable_battery_powers: dict[
frozenset[ComponentId],
tuple[collections.abc.Set[ComponentId], LatestValueCache[Sample[Power]]],
] = {}
"""Map from battery sets to data from inaccessible battery subset.

When batteries are inaccessible, for example, due to network issues and can't be
controlled, they might still be producing or consuming power. This power needs
to be considered when distributing power to the other batteries.

So for each battery set, we track of the inaccessible subset of this battery set
and the result of the power formulas for the inaccessible subset here.
"""

self._component_pool_status_tracker = ComponentPoolStatusTracker(
component_ids=set(self._battery_ids),
component_status_sender=component_pool_status_sender,
Expand Down Expand Up @@ -226,27 +243,24 @@ async def _get_distribution(self, request: Request) -> DistributionResult | Resu
Returns:
Distribution of the batteries.
"""
match self._get_components_data(request.component_ids):
case str() as err:
return Error(request=request, msg=err)
case list() as pairs_data:
pass
case unexpected:
typing.assert_never(unexpected)

if not pairs_data:
try:
components_data = await self._get_components_data(request.component_ids)
except RuntimeError as err:
return Error(request=request, msg=str(err))

if not components_data.inv_bat_pairs:
error_msg = (
"No data for at least one of the given batteries: "
+ self._str_ids(request.component_ids)
)
return Error(request=request, msg=str(error_msg))

error = self._check_request(request, pairs_data)
error = self._check_request(request, components_data.inv_bat_pairs)
if error:
return error

try:
distribution = self._get_power_distribution(request, pairs_data)
distribution = self._get_power_distribution(request, components_data)
except ValueError as err:
_logger.exception("Couldn't distribute power")
error_msg = f"Couldn't distribute power, error: {str(err)}"
Expand Down Expand Up @@ -522,17 +536,91 @@ def nan_metric_in_list(data: list[DataType], metrics: list[str]) -> bool:

return InvBatPair(AggregatedBatteryData(battery_data), inverter_data)

def _get_components_data(
async def _subscribe_to_unreachable_battery_power(
self,
requested_battery_ids: collections.abc.Set[ComponentId],
working_battery_ids: collections.abc.Set[ComponentId],
) -> None:
requested_battery_ids = frozenset(requested_battery_ids)
unreachable_battery_ids = requested_battery_ids - working_battery_ids

if not unreachable_battery_ids:
if unreachable_power := self._unreachable_battery_powers.pop(
requested_battery_ids, None
):
_logger.debug(
"All batteries are reachable, stopping unreachable battery power "
+ "subscription for batteries %s",
self._str_ids(requested_battery_ids),
)
await unreachable_power[1].stop()
return

if unreachable_power := self._unreachable_battery_powers.get(
requested_battery_ids
):
if unreachable_power[0] == unreachable_battery_ids:
return
_logger.debug(
"Unreachable battery set for batteries %s changed from %s to %s, "
+ "restarting subscription",
self._str_ids(requested_battery_ids),
self._str_ids(unreachable_power[0]),
self._str_ids(unreachable_battery_ids),
)
await unreachable_power[1].stop()

formula = connection_manager.get().component_graph.battery_formula(
unreachable_battery_ids
)
_logger.debug(
"Subscribing to unreachable battery power for batteries %s with formula: %s",
self._str_ids(unreachable_battery_ids),
formula,
)

# This is to avoid a circular import. Pylint doesn't detect that this
# is not a circular import, so we need to disable the warning also.
#
# pylint: disable-next=import-outside-toplevel,cyclic-import
from ... import (
logical_meter,
)

formula_cache = LatestValueCache(
logical_meter()
.start_formula(formula, Metric.AC_ACTIVE_POWER)
.new_receiver()
.map(
lambda sample: Sample[Power](
sample.timestamp,
(
Power.from_watts(sample.value.base_value)
if sample.value is not None
else None
),
)
)
)
self._unreachable_battery_powers[requested_battery_ids] = (
unreachable_battery_ids,
formula_cache,
)

async def _get_components_data(
self, batteries: collections.abc.Set[ComponentId]
) -> list[InvBatPair] | str:
) -> BatteryComponentsData:
"""Get data for the given batteries and adjacent inverters.

Args:
batteries: Batteries that needs data.

Returns:
Pairs of battery and adjacent inverter data or an error message if there was
an error while getting the data.
Battery component data including pairs of battery and adjacent inverter
data, and unreachable power.

Raises:
RuntimeError: If there was an error while getting the data.
"""
inverter_ids: collections.abc.Set[ComponentId]
pairs_data: list[InvBatPair] = []
Expand All @@ -541,9 +629,14 @@ def _get_components_data(
batteries
)

await self._subscribe_to_unreachable_battery_power(
batteries,
working_batteries,
)

for battery_id in working_batteries:
if battery_id not in self._battery_caches:
return (
raise RuntimeError(
f"No battery {battery_id}, "
f"available batteries: {self._str_ids(self._battery_caches.keys())}"
)
Expand All @@ -559,7 +652,7 @@ def _get_components_data(
if batteries_from_inverters != batteries:
extra_batteries = batteries_from_inverters - batteries
inverter_ids = _get_all_from_map(self._bat_invs_map, extra_batteries)
return (
raise RuntimeError(
f"Inverter(s) ({self._str_ids(inverter_ids)}) are connected to "
f"battery(ies) ({self._str_ids(extra_batteries)}) that were not requested"
)
Expand All @@ -582,23 +675,39 @@ def _get_components_data(

assert len(data.inverter) > 0
pairs_data.append(data)
return pairs_data

unreachable_power: Power | None = None
if unreachable_power_cache := self._unreachable_battery_powers.get(
frozenset(batteries)
):
if unreachable_power_cache[1].has_value():
unreachable_power = unreachable_power_cache[1].get().value

return BatteryComponentsData(
inv_bat_pairs=pairs_data,
unreachable_power=unreachable_power,
)

def _str_ids(self, ids: collections.abc.Set[ComponentId]) -> str:
return ", ".join(str(cid) for cid in sorted(ids))

def _get_power_distribution(
self, request: Request, inv_bat_pairs: list[InvBatPair]
self,
request: Request,
components: BatteryComponentsData,
) -> DistributionResult:
"""Get power distribution result for the batteries in the request.

Args:
request: the power request to process.
inv_bat_pairs: the battery and adjacent inverter data pairs.
components: the battery component data including pairs of battery
and adjacent inverter data, and unreachable power.

Returns:
the power distribution result.
"""
inv_bat_pairs = components.inv_bat_pairs

available_bat_ids = _get_all_from_map(
self._bat_bats_map, {pair.battery.component_id for pair in inv_bat_pairs}
)
Expand All @@ -611,8 +720,19 @@ def _get_power_distribution(
]:
unavailable_inv_ids = unavailable_inv_ids | inverter_ids

power_to_distribute = request.power
if components.unreachable_power is not None:
power_to_distribute = power_to_distribute - components.unreachable_power
_logger.debug(
"Excluding %s measured on unreachable batteries: %s, from power to "
+ "distribute on working batteries: %s",
components.unreachable_power,
unavailable_bat_ids,
available_bat_ids,
)

result = self._distribution_algorithm.distribute_power(
request.power, inv_bat_pairs
power_to_distribute, inv_bat_pairs
)

return result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from ._battery_distribution_algorithm import (
AggregatedBatteryData,
BatteryComponentsData,
BatteryDistributionAlgorithm,
DistributionResult,
InvBatPair,
Expand All @@ -15,4 +16,5 @@
"DistributionResult",
"InvBatPair",
"AggregatedBatteryData",
"BatteryComponentsData",
]
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,21 @@ class InvBatPair(NamedTuple):
"""The inverter data."""


@dataclass
class BatteryComponentsData:
"""Container for battery component data and unreachable power."""

inv_bat_pairs: list[InvBatPair]
"""The battery and inverter data pairs."""

unreachable_power: Power | None
"""Power from batteries with unreachable inverters.

This power needs to be excluded from the distributed power, because the
inverters producing it are unreachable and cannot be controlled.
"""


@dataclass
class AvailabilityRatio:
"""Availability ratio for a battery-inverter pair."""
Expand Down