Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion flagsmith/flagsmith.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ def _initialise_local_evaluation(self) -> None:
EnvironmentDataPollingManager(
main=self,
refresh_interval_seconds=self.environment_refresh_interval_seconds,
daemon=True,
)
)

Expand Down
59 changes: 49 additions & 10 deletions flagsmith/polling_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import os
import threading
import time
import typing
Expand All @@ -11,26 +12,64 @@
logger = logging.getLogger(__name__)


class EnvironmentDataPollingManager(threading.Thread):
class EnvironmentDataPollingManager:
"""Owns the worker thread that periodically refreshes the local
evaluation environment document.

Composes (rather than extends) :class:`threading.Thread` so the
worker can be replaced — most importantly after :func:`os.fork`,
where threads do not survive into the child. We register an at-fork
hook so a forked worker (gunicorn, uwsgi, multiprocessing) gets a
freshly-started polling thread for its own PID. See issue #77.
"""

def __init__(
self,
*args: typing.Any,
*,
main: Flagsmith,
refresh_interval_seconds: typing.Union[int, float] = 10,
**kwargs: typing.Any,
):
super(EnvironmentDataPollingManager, self).__init__(*args, **kwargs)
) -> None:
self._main = main
self._refresh_interval_seconds = refresh_interval_seconds
self._stop_event = threading.Event()
self.main = main
self.refresh_interval_seconds = refresh_interval_seconds
self._thread = self._build_thread()
self._at_fork_registered = False

def run(self) -> None:
def _build_thread(self) -> threading.Thread:
return threading.Thread(target=self._run, daemon=True)

def _run(self) -> None:
while not self._stop_event.is_set():
self.main.update_environment()
time.sleep(self.refresh_interval_seconds)
self._main.update_environment()
time.sleep(self._refresh_interval_seconds)

def start(self) -> None:
self._thread.start()
if not self._at_fork_registered and hasattr(os, "register_at_fork"):
os.register_at_fork(after_in_child=self._restart_after_fork)
self._at_fork_registered = True

def _restart_after_fork(self) -> None:
if self._thread.is_alive():
return
# Sockets in the parent's connection pool are inherited as
# shared FDs across fork; reusing them would interleave bytes
# between processes. Drop them so the new thread opens fresh.
if session := getattr(self._main, "session", None):
session.close()
self._stop_event = threading.Event()
self._thread = self._build_thread()
self._thread.start()

def stop(self) -> None:
self._stop_event.set()

def is_alive(self) -> bool:
return self._thread.is_alive()

@property
def ident(self) -> typing.Optional[int]:
return self._thread.ident

def __del__(self) -> None:
self._stop_event.set()
137 changes: 136 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading