Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
## New Features

- `Resampler`: The resampler can now be configured to have the resampling window closed to the right (default) or left, and to also set the resampler timestamp to the right (default) or left end of the window being resampled. You can configure setting the new options `closed` and `label` in the `ResamplerConfig`.
- `EventResampler`: A new event-driven resampler for cascaded resampling stages. Unlike the timer-based `Resampler`, `EventResampler` emits windows when sample timestamps exceed window boundaries, eliminating data loss at window boundaries in cascaded scenarios. See the class documentation for usage guidelines.
- `StreamingHelper`: Added callback mechanism via `register_sample_callback()` to notify external consumers when samples arrive, enabling event-driven resampling without polling internal buffers.
- `Resampler._emit_window()`: Extracted window emission logic into a dedicated method for code sharing between timer-based and event-driven resampler implementations.


## Bug Fixes

Expand Down
213 changes: 213 additions & 0 deletions src/frequenz/sdk/timeseries/_resampling/_event_resampler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
# License: MIT
# Copyright © 2026 Frequenz Energy-as-a-Service GmbH

"""Event-driven resampler for cascaded resampling stages."""

import asyncio
import logging
from datetime import datetime, timedelta, timezone

from frequenz.quantities import Quantity

from .._base_types import Sample
from ._base_types import Sink, Source
from ._config import ResamplerConfig
from ._resampler import Resampler, _ResamplingHelper, _StreamingHelper

_logger = logging.getLogger(__name__)


class EventResampler(Resampler):
"""Event-driven resampler for cascaded resampling stages.

Unlike the standard Timer-based Resampler which uses fixed wall-clock
intervals, EventResampler is triggered by incoming data. Windows are
emitted when a sample arrives that falls outside the current window,
not on a fixed timer schedule.

Problem Solved:
When cascading Timer-based resamplers (e.g., 1s → 10s) with
align_to=UNIX_EPOCH, samples can be lost at window boundaries due to
timing synchronization issues. EventResampler eliminates this by
opening/closing windows based on actual data arrival.

Important: This resampler is optimized for continuous data streams
where samples arrive at regular or semi-regular intervals. It is not
suitable for handling raw, irregular data directly from sources.

Best Used:
Stage 1: Timer-based Resampler (handles raw, irregular data)
Stage 2+: Event-based Resampler (handles continuous data from Stage 1)

Example:
config = ResamplerConfig(
resampling_period=timedelta(seconds=10),
resampling_function=...,
)
resampler = EventResampler(config)
resampler.add_timeseries("my_source", source, sink)
await resampler.resample()

Note: If a long gap occurs without incoming samples (no data for multiple periods),
the corresponding windows will be emitted all at once when data resumes. This is
acceptable for cascaded resampling since the input typically comes from another
Resampler with guaranteed continuous output.
"""

# pylint: disable=super-init-not-called
def __init__(self, config: ResamplerConfig) -> None:
"""Initialize EventResampler.

This does not call super().__init__() to avoid starting any timers

Args:
config: Resampler configuration
"""
self._config = config
"""The configuration for this resampler."""

self._resamplers: dict[Source, _StreamingHelper] = {}
"""A mapping between sources and the streaming helper handling that source."""

window_end, _ = self._calculate_window_end()
self._window_end: datetime = window_end
"""The time in which the current window ends.

This is used to make sure every resampling window is generated at
precise times. We can't rely on the timer timestamp because timers will
never fire at the exact requested time, so if we don't use a precise
time for the end of the window, the resampling windows we produce will
have different sizes.

The window end will also be aligned to the `config.align_to` time, so
the window end is deterministic.
"""

self._window_lock = asyncio.Lock()
"""Lock protecting access to `_window_end` during window state transitions."""

self._sample_queue: asyncio.Queue[Sample[Quantity]] = asyncio.Queue()
"""Queue for samples awaiting processing. Filled by `_StreamingHelper` callbacks,
consumed by the event loop in `resample()`.
"""

# OVERRIDDEN: Register callback to receive samples asynchronously for
# event-driven window management.
def add_timeseries(self, name: str, source: Source, sink: Sink) -> bool:
"""Start resampling a new timeseries.

Registers the timeseries and sets up a sample callback to enqueue
incoming samples for event-driven processing.

Args:
name: The name of the timeseries (for logging purposes).
source: The source of the timeseries to resample.
sink: The sink to use to send the resampled data.

Returns:
`True` if the timeseries was added, `False` if the timeseries was
not added because there already a timeseries using the provided
receiver.
"""
if source in self._resamplers:
return False

resampler = _StreamingHelper(
_ResamplingHelper(name, self._config), source, sink
)

# Register the callback to receive samples from the streaming helper.
resampler.register_sample_callback(self._enqueue_sample)

self._resamplers[source] = resampler
return True

async def _enqueue_sample(self, sample: Sample[Quantity]) -> None:
"""Add a sample to the processing queue.

Args:
sample: The sample to enqueue.
"""
await self._sample_queue.put(sample)

# OVERRIDDEN: no warm-up period needed for event-driven sample accumulation.
def _calculate_window_end(self) -> tuple[datetime, timedelta]:
"""Calculate the end of the first resampling window.

Calculates the next multiple of resampling_period after the current time,
respecting align_to configuration.

Returns:
A tuple (window_end, delay_time) where:
- window_end: datetime when the first window should end
- delay_time: always timedelta(0) for EventResampler
"""
now = datetime.now(timezone.utc)
period = self._config.resampling_period
align_to = self._config.align_to

if align_to is None:
return (now + period, timedelta(0))

elapsed = (now - align_to) % period

return (
(now + (period - elapsed), timedelta(0))
if elapsed > timedelta(0)
else (now, timedelta(0))
)

async def resample(self, *, one_shot: bool = False) -> None:
"""Start event-driven resampling.

Processes incoming samples from the queue continuously. Windows are
emitted when a sample arrives with a timestamp >= current window_end.
This is in contrast to Timer-based resampling which emits windows at
fixed intervals regardless of data arrival.

Args:
one_shot: If True, waits for the first window to be emitted, then exits.

Raises:
asyncio.CancelledError: If the task is cancelled.
"""
try:
while True:
sample = await self._sample_queue.get()
emmitted = await self._process_sample(sample)

if one_shot and emmitted:
return

except asyncio.CancelledError:
_logger.info("EventResampler task cancelled")
raise

async def _process_sample(
self,
sample: Sample[Quantity],
) -> bool:
"""Process an incoming sample and manage window state.

This method checks if the incoming sample falls outside the current
window and emits completed windows as needed. Returns True if any
windows were emitted.

Args:
sample: Incoming sample to process

Returns:
True if at least one window was emitted, False otherwise.
"""
async with self._window_lock:
emmitted = False
while sample.timestamp >= self._window_end:
_logger.debug(
"EventResampler: Sample at %s >= window end %s, closing window",
sample.timestamp,
self._window_end,
)
await self._emit_window(self._window_end)
self._window_end += self._config.resampling_period
emmitted = True
return emmitted
85 changes: 58 additions & 27 deletions src/frequenz/sdk/timeseries/_resampling/_resampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from bisect import bisect, bisect_left
from collections import deque
from datetime import datetime, timedelta, timezone
from typing import assert_never
from typing import Awaitable, Callable, assert_never

from frequenz.channels.timer import Timer, TriggerAllMissed, _to_microseconds
from frequenz.quantities import Quantity
Expand Down Expand Up @@ -160,14 +160,6 @@ async def resample(self, *, one_shot: bool = False) -> None:
Args:
one_shot: Wether the resampling should run only for one resampling
period.

Raises:
ResamplingError: If some timeseries source or sink encounters any
errors while receiving or sending samples. In this case the
timer still runs and the timeseries will keep receiving data.
The user should remove (and re-add if desired) the faulty
timeseries from the resampler before calling this method
again).
"""
# We use a tolerance of 10% of the resampling period
tolerance = timedelta(
Expand Down Expand Up @@ -200,28 +192,45 @@ async def resample(self, *, one_shot: bool = False) -> None:
case unexpected:
assert_never(unexpected)

# We need to make a copy here because we need to match the results to the
# current resamplers, and since we await here, new resamplers could be added
# or removed from the dict while we awaiting the resampling, which would
# cause the results to be out of sync.
resampler_sources = list(self._resamplers)
results = await asyncio.gather(
*[r.resample(next_tick_time) for r in self._resamplers.values()],
return_exceptions=True,
)
await self._emit_window(next_tick_time)

exceptions = {
source: result
for source, result in zip(resampler_sources, results)
# CancelledError inherits from BaseException, but we don't want
# to catch *all* BaseExceptions here.
if isinstance(result, (Exception, asyncio.CancelledError))
}
if exceptions:
raise ResamplingError(exceptions)
if one_shot:
break

async def _emit_window(self, window_end: datetime) -> None:
"""Emit resampled samples for all timeseries at the given window boundary.

Args:
window_end: The timestamp marking the end of the resampling window.

Raises:
ResamplingError: If some timeseries source or sink encounters any
errors while receiving or sending samples. In this case the
timer still runs and the timeseries will keep receiving data.
The user should remove (and re-add if desired) the faulty
timeseries from the resampler before calling this method
again).
"""
# We need to make a copy here because we need to match the results to the
# current resamplers, and since we await here, new resamplers could be added
# or removed from the dict while we awaiting the resampling, which would
# cause the results to be out of sync.
resampler_sources = list(self._resamplers)
results = await asyncio.gather(
*[r.resample(window_end) for r in self._resamplers.values()],
return_exceptions=True,
)

exceptions = {
source: result
for source, result in zip(resampler_sources, results)
# CancelledError inherits from BaseException, but we don't want
# to catch *all* BaseExceptions here.
if isinstance(result, (Exception, asyncio.CancelledError))
}
if exceptions:
raise ResamplingError(exceptions)

def _calculate_window_end(self) -> tuple[datetime, timedelta]:
"""Calculate the end of the current resampling window.

Expand Down Expand Up @@ -528,6 +537,9 @@ def __init__(
self._helper: _ResamplingHelper = helper
self._source: Source = source
self._sink: Sink = sink
self._sample_callback: Callable[[Sample[Quantity]], Awaitable[None]] | None = (
None
)
self._receiving_task: asyncio.Task[None] = asyncio.create_task(
self._receive_samples()
)
Expand All @@ -545,6 +557,22 @@ async def stop(self) -> None:
"""Cancel the receiving task."""
await cancel_and_await(self._receiving_task)

def register_sample_callback(
self,
callback: Callable[[Sample[Quantity]], Awaitable[None]] | None,
) -> None:
"""Register a callback to be invoked when a sample arrives.

The callback is called asynchronously each time a sample is received
from the source. This allows consumers (like EventResampler) to be
notified of incoming samples without polling internal buffers.

Args:
callback: An async function to call when a sample arrives.
If `None`, no callback will be called on new samples.
"""
self._sample_callback = callback

async def _receive_samples(self) -> None:
"""Pass received samples to the helper.

Expand All @@ -555,6 +583,9 @@ async def _receive_samples(self) -> None:
if sample.value is not None and not sample.value.isnan():
self._helper.add_sample((sample.timestamp, sample.value.base_value))

if self._sample_callback:
await self._sample_callback(sample)

# We need the noqa because pydoclint can't figure out that `recv_exception` is an
# `Exception` instance.
async def resample(self, timestamp: datetime) -> None: # noqa: DOC503
Expand Down
Loading