diff --git a/sentry_sdk/_batcher.py b/sentry_sdk/_batcher.py new file mode 100644 index 0000000000..4a6ee07e67 --- /dev/null +++ b/sentry_sdk/_batcher.py @@ -0,0 +1,139 @@ +import os +import random +import threading +from datetime import datetime, timezone +from typing import TYPE_CHECKING, TypeVar, Generic + +from sentry_sdk.utils import format_timestamp, safe_repr, serialize_attribute +from sentry_sdk.envelope import Envelope, Item, PayloadRef + +if TYPE_CHECKING: + from typing import Optional, Callable, Any + +T = TypeVar("T") + + +class Batcher(Generic[T]): + MAX_BEFORE_FLUSH = 100 + MAX_BEFORE_DROP = 1_000 + FLUSH_WAIT_TIME = 5.0 + + TYPE = "" + CONTENT_TYPE = "" + + def __init__( + self, + capture_func: "Callable[[Envelope], None]", + record_lost_func: "Callable[..., None]", + ) -> None: + self._buffer: "list[T]" = [] + self._capture_func = capture_func + self._record_lost_func = record_lost_func + self._running = True + self._lock = threading.Lock() + + self._flush_event: "threading.Event" = threading.Event() + + self._flusher: "Optional[threading.Thread]" = None + self._flusher_pid: "Optional[int]" = None + + def _ensure_thread(self) -> bool: + """For forking processes we might need to restart this thread. + This ensures that our process actually has that thread running. + """ + if not self._running: + return False + + pid = os.getpid() + if self._flusher_pid == pid: + return True + + with self._lock: + # Recheck to make sure another thread didn't get here and start the + # the flusher in the meantime + if self._flusher_pid == pid: + return True + + self._flusher_pid = pid + + self._flusher = threading.Thread(target=self._flush_loop) + self._flusher.daemon = True + + try: + self._flusher.start() + except RuntimeError: + # Unfortunately at this point the interpreter is in a state that no + # longer allows us to spawn a thread and we have to bail. + self._running = False + return False + + return True + + def _flush_loop(self) -> None: + while self._running: + self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random()) + self._flush_event.clear() + self._flush() + + def add(self, item: "T") -> None: + if not self._ensure_thread() or self._flusher is None: + return None + + with self._lock: + if len(self._buffer) >= self.MAX_BEFORE_DROP: + self._record_lost(item) + return None + + self._buffer.append(item) + if len(self._buffer) >= self.MAX_BEFORE_FLUSH: + self._flush_event.set() + + def kill(self) -> None: + if self._flusher is None: + return + + self._running = False + self._flush_event.set() + self._flusher = None + + def flush(self) -> None: + self._flush() + + def _add_to_envelope(self, envelope: "Envelope") -> None: + envelope.add_item( + Item( + type=self.TYPE, + content_type=self.CONTENT_TYPE, + headers={ + "item_count": len(self._buffer), + }, + payload=PayloadRef( + json={ + "items": [ + self._to_transport_format(item) for item in self._buffer + ] + } + ), + ) + ) + + def _flush(self) -> "Optional[Envelope]": + envelope = Envelope( + headers={"sent_at": format_timestamp(datetime.now(timezone.utc))} + ) + with self._lock: + if len(self._buffer) == 0: + return None + + self._add_to_envelope(envelope) + self._buffer.clear() + + self._capture_func(envelope) + return envelope + + def _record_lost(self, item: "T") -> None: + pass + + @staticmethod + def _to_transport_format(item: "T") -> "Any": + pass diff --git a/sentry_sdk/_log_batcher.py b/sentry_sdk/_log_batcher.py index 51886f48f9..1c59f7379c 100644 --- a/sentry_sdk/_log_batcher.py +++ b/sentry_sdk/_log_batcher.py @@ -1,164 +1,56 @@ -import os -import random -import threading -from datetime import datetime, timezone -from typing import Optional, List, Callable, TYPE_CHECKING, Any +from typing import TYPE_CHECKING -from sentry_sdk.utils import format_timestamp, safe_repr, serialize_attribute +from sentry_sdk._batcher import Batcher +from sentry_sdk.utils import serialize_attribute from sentry_sdk.envelope import Envelope, Item, PayloadRef if TYPE_CHECKING: + from typing import Any from sentry_sdk._types import Log -class LogBatcher: - MAX_LOGS_BEFORE_FLUSH = 100 - MAX_LOGS_BEFORE_DROP = 1_000 +class LogBatcher(Batcher["Log"]): + MAX_BEFORE_FLUSH = 100 + MAX_BEFORE_DROP = 1_000 FLUSH_WAIT_TIME = 5.0 - def __init__( - self, - capture_func: "Callable[[Envelope], None]", - record_lost_func: "Callable[..., None]", - ) -> None: - self._log_buffer: "List[Log]" = [] - self._capture_func = capture_func - self._record_lost_func = record_lost_func - self._running = True - self._lock = threading.Lock() - - self._flush_event: "threading.Event" = threading.Event() - - self._flusher: "Optional[threading.Thread]" = None - self._flusher_pid: "Optional[int]" = None - - def _ensure_thread(self) -> bool: - """For forking processes we might need to restart this thread. - This ensures that our process actually has that thread running. - """ - if not self._running: - return False - - pid = os.getpid() - if self._flusher_pid == pid: - return True - - with self._lock: - # Recheck to make sure another thread didn't get here and start the - # the flusher in the meantime - if self._flusher_pid == pid: - return True - - self._flusher_pid = pid - - self._flusher = threading.Thread(target=self._flush_loop) - self._flusher.daemon = True - - try: - self._flusher.start() - except RuntimeError: - # Unfortunately at this point the interpreter is in a state that no - # longer allows us to spawn a thread and we have to bail. - self._running = False - return False - - return True - - def _flush_loop(self) -> None: - while self._running: - self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random()) - self._flush_event.clear() - self._flush() - - def add( - self, - log: "Log", - ) -> None: - if not self._ensure_thread() or self._flusher is None: - return None - - with self._lock: - if len(self._log_buffer) >= self.MAX_LOGS_BEFORE_DROP: - # Construct log envelope item without sending it to report lost bytes - log_item = Item( - type="log", - content_type="application/vnd.sentry.items.log+json", - headers={ - "item_count": 1, - }, - payload=PayloadRef( - json={"items": [LogBatcher._log_to_transport_format(log)]} - ), - ) - self._record_lost_func( - reason="queue_overflow", - data_category="log_item", - item=log_item, - quantity=1, - ) - return None - - self._log_buffer.append(log) - if len(self._log_buffer) >= self.MAX_LOGS_BEFORE_FLUSH: - self._flush_event.set() - - def kill(self) -> None: - if self._flusher is None: - return - - self._running = False - self._flush_event.set() - self._flusher = None - - def flush(self) -> None: - self._flush() + TYPE = "log" + CONTENT_TYPE = "application/vnd.sentry.items.log+json" @staticmethod - def _log_to_transport_format(log: "Log") -> "Any": - if "sentry.severity_number" not in log["attributes"]: - log["attributes"]["sentry.severity_number"] = log["severity_number"] - if "sentry.severity_text" not in log["attributes"]: - log["attributes"]["sentry.severity_text"] = log["severity_text"] + def _to_transport_format(item: "Log") -> "Any": + if "sentry.severity_number" not in item["attributes"]: + item["attributes"]["sentry.severity_number"] = item["severity_number"] + if "sentry.severity_text" not in item["attributes"]: + item["attributes"]["sentry.severity_text"] = item["severity_text"] res = { - "timestamp": int(log["time_unix_nano"]) / 1.0e9, - "trace_id": log.get("trace_id", "00000000-0000-0000-0000-000000000000"), - "span_id": log.get("span_id"), - "level": str(log["severity_text"]), - "body": str(log["body"]), + "timestamp": int(item["time_unix_nano"]) / 1.0e9, + "trace_id": item.get("trace_id", "00000000-0000-0000-0000-000000000000"), + "span_id": item.get("span_id"), + "level": str(item["severity_text"]), + "body": str(item["body"]), "attributes": { - k: serialize_attribute(v) for (k, v) in log["attributes"].items() + k: serialize_attribute(v) for (k, v) in item["attributes"].items() }, } return res - def _flush(self) -> "Optional[Envelope]": - envelope = Envelope( - headers={"sent_at": format_timestamp(datetime.now(timezone.utc))} + def _record_lost(self, item: "Log") -> None: + # Construct log envelope item without sending it to report lost bytes + log_item = Item( + type=self.TYPE, + content_type=self.CONTENT_TYPE, + headers={ + "item_count": 1, + }, + payload=PayloadRef(json={"items": [self._to_transport_format(item)]}), ) - with self._lock: - if len(self._log_buffer) == 0: - return None - envelope.add_item( - Item( - type="log", - content_type="application/vnd.sentry.items.log+json", - headers={ - "item_count": len(self._log_buffer), - }, - payload=PayloadRef( - json={ - "items": [ - self._log_to_transport_format(log) - for log in self._log_buffer - ] - } - ), - ) - ) - self._log_buffer.clear() - - self._capture_func(envelope) - return envelope + self._record_lost_func( + reason="queue_overflow", + data_category="log_item", + item=log_item, + quantity=1, + ) diff --git a/sentry_sdk/_metrics_batcher.py b/sentry_sdk/_metrics_batcher.py index 6cbac0cbce..2a9f6deffc 100644 --- a/sentry_sdk/_metrics_batcher.py +++ b/sentry_sdk/_metrics_batcher.py @@ -1,101 +1,24 @@ -import os -import random -import threading -from datetime import datetime, timezone -from typing import Optional, List, Callable, TYPE_CHECKING, Any, Union +from typing import TYPE_CHECKING -from sentry_sdk.utils import format_timestamp, safe_repr, serialize_attribute -from sentry_sdk.envelope import Envelope, Item, PayloadRef +from sentry_sdk._batcher import Batcher +from sentry_sdk.utils import serialize_attribute +from sentry_sdk.envelope import Item if TYPE_CHECKING: + from typing import Any from sentry_sdk._types import Metric -class MetricsBatcher: - MAX_METRICS_BEFORE_FLUSH = 1000 - MAX_METRICS_BEFORE_DROP = 10_000 +class MetricsBatcher(Batcher["Metric"]): + MAX_BEFORE_FLUSH = 1000 + MAX_BEFORE_DROP = 10_000 FLUSH_WAIT_TIME = 5.0 - def __init__( - self, - capture_func: "Callable[[Envelope], None]", - record_lost_func: "Callable[..., None]", - ) -> None: - self._metric_buffer: "List[Metric]" = [] - self._capture_func = capture_func - self._record_lost_func = record_lost_func - self._running = True - self._lock = threading.Lock() - - self._flush_event: "threading.Event" = threading.Event() - - self._flusher: "Optional[threading.Thread]" = None - self._flusher_pid: "Optional[int]" = None - - def _ensure_thread(self) -> bool: - if not self._running: - return False - - pid = os.getpid() - if self._flusher_pid == pid: - return True - - with self._lock: - if self._flusher_pid == pid: - return True - - self._flusher_pid = pid - - self._flusher = threading.Thread(target=self._flush_loop) - self._flusher.daemon = True - - try: - self._flusher.start() - except RuntimeError: - self._running = False - return False - - return True - - def _flush_loop(self) -> None: - while self._running: - self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random()) - self._flush_event.clear() - self._flush() - - def add( - self, - metric: "Metric", - ) -> None: - if not self._ensure_thread() or self._flusher is None: - return None - - with self._lock: - if len(self._metric_buffer) >= self.MAX_METRICS_BEFORE_DROP: - self._record_lost_func( - reason="queue_overflow", - data_category="trace_metric", - quantity=1, - ) - return None - - self._metric_buffer.append(metric) - if len(self._metric_buffer) >= self.MAX_METRICS_BEFORE_FLUSH: - self._flush_event.set() - - def kill(self) -> None: - if self._flusher is None: - return - - self._running = False - self._flush_event.set() - self._flusher = None - - def flush(self) -> None: - self._flush() + TYPE = "trace_metric" + CONTENT_TYPE = "application/vnd.sentry.items.trace-metric+json" @staticmethod - def _metric_to_transport_format(metric: "Metric") -> "Any": + def _to_transport_format(metric: "Metric") -> "Any": res = { "timestamp": metric["timestamp"], "trace_id": metric["trace_id"], @@ -115,32 +38,9 @@ def _metric_to_transport_format(metric: "Metric") -> "Any": return res - def _flush(self) -> "Optional[Envelope]": - envelope = Envelope( - headers={"sent_at": format_timestamp(datetime.now(timezone.utc))} + def _record_lost(self, item: "Metric") -> None: + self._record_lost_func( + reason="queue_overflow", + data_category="trace_metric", + quantity=1, ) - with self._lock: - if len(self._metric_buffer) == 0: - return None - - envelope.add_item( - Item( - type="trace_metric", - content_type="application/vnd.sentry.items.trace-metric+json", - headers={ - "item_count": len(self._metric_buffer), - }, - payload=PayloadRef( - json={ - "items": [ - self._metric_to_transport_format(metric) - for metric in self._metric_buffer - ] - } - ), - ) - ) - self._metric_buffer.clear() - - self._capture_func(envelope) - return envelope