diff --git a/background_worker.go b/background_worker.go new file mode 100644 index 0000000000..fc6e7cf03a --- /dev/null +++ b/background_worker.go @@ -0,0 +1,569 @@ +package frankenphp + +// #include +// #include "frankenphp.h" +import "C" +import ( + "fmt" + "log/slog" + "strings" + "sync" + "sync/atomic" + "time" + "unsafe" +) + +// defaultMaxBackgroundWorkers is the default safety cap for catch-all +// background workers when the user doesn't set max_threads. Caps the +// number of distinct lazy-started instances from a single catch-all. +const defaultMaxBackgroundWorkers = 16 + +// BackgroundScope identifies an isolation boundary for background workers. +// Each php_server block uses a distinct scope so that two blocks can +// declare workers with the same name without conflict. The zero value is +// the global/embed scope (used when no per-block scope was assigned). +// Representation is opaque; obtain values via NextBackgroundWorkerScope. +type BackgroundScope int + +var backgroundScopeCounter atomic.Uint64 + +// NextBackgroundWorkerScope returns a unique scope for background worker +// isolation. Each php_server block should call this once during +// provisioning. +func NextBackgroundWorkerScope() BackgroundScope { + return BackgroundScope(backgroundScopeCounter.Add(1)) +} + +// backgroundLookups maps scopes to their background worker lookup. +// Scope 0 is the global/embed scope; each php_server block gets its own. +var backgroundLookups map[BackgroundScope]*backgroundWorkerLookup + +// backgroundWorkerLookup maps worker names to their registry, with a +// separate slot for the catch-all (name-less) declaration. +type backgroundWorkerLookup struct { + byName map[string]*backgroundWorkerRegistry + catchAll *backgroundWorkerRegistry +} + +func newBackgroundWorkerLookup() *backgroundWorkerLookup { + return &backgroundWorkerLookup{ + byName: make(map[string]*backgroundWorkerRegistry), + } +} + +// Resolve returns the registry for the given name, falling back to +// catch-all. Returns nil if neither matches. +func (l *backgroundWorkerLookup) Resolve(name string) *backgroundWorkerRegistry { + if r, ok := l.byName[name]; ok { + return r + } + return l.catchAll +} + +// backgroundWorkerRegistry tracks the template options from a single +// declaration plus the live instances started from it. Named declarations +// have at most one entry keyed by their declared name; the catch-all can +// have many, up to maxWorkers. +type backgroundWorkerRegistry struct { + entrypoint string + num int // threads per instance; 0 means lazy with 1 thread + maxWorkers int // cap for catch-all instances; 0 = unlimited + + mu sync.Mutex + workers map[string]*backgroundWorkerState + + // Template options preserved so lazy-started workers inherit the same + // scope/env/watch/failure policy as their eagerly-started siblings. + scope BackgroundScope + env PreparedEnv + watch []string + maxConsecutiveFailures int + requestOptions []RequestOption + + // declaredWorker is the pre-existing *worker struct for a named + // declaration (num=0 lazy or num>=1 eager). It lets the lazy-start + // path reuse this worker instead of scanning the global + // workersByName map, which is not scope-aware: scoped bg workers + // with the same user-facing name would otherwise collide into a + // single *worker and overwrite each other's live state pointers. + // nil for catch-all registries (each lazy-started name gets a + // fresh worker). + declaredWorker *worker +} + +func newBackgroundWorkerRegistry(entrypoint string) *backgroundWorkerRegistry { + return &backgroundWorkerRegistry{ + entrypoint: entrypoint, + workers: make(map[string]*backgroundWorkerState), + maxConsecutiveFailures: -1, + } +} + +func (registry *backgroundWorkerRegistry) maxThreads() int { + if registry.num > 0 { + return registry.num + } + return 1 +} + +// reserve atomically looks up or inserts a state for the given name. If +// the maxWorkers cap is reached for a catch-all, returns an error. +func (registry *backgroundWorkerRegistry) reserve(name string) (*backgroundWorkerState, bool, error) { + registry.mu.Lock() + defer registry.mu.Unlock() + + if bgw := registry.workers[name]; bgw != nil { + return bgw, true, nil + } + + if registry.maxWorkers > 0 && len(registry.workers) >= registry.maxWorkers { + return nil, false, fmt.Errorf("cannot start background worker %q: limit of %d reached (increase max_threads on the catch-all background worker or declare it as a named worker)", name, registry.maxWorkers) + } + + bgw := &backgroundWorkerState{ + ready: make(chan struct{}), + aborted: make(chan struct{}), + } + registry.workers[name] = bgw + + return bgw, false, nil +} + +// abortStart removes a reserved-but-never-started entry and wakes any +// concurrent ensure waiters that captured the same state before the start +// was abandoned. Without this, a waiter that raced reserve() against a +// failing start would block on sk.ready until its deadline. +func (registry *backgroundWorkerRegistry) abortStart(name string, bgw *backgroundWorkerState, err error) { + registry.mu.Lock() + if registry.workers[name] == bgw { + delete(registry.workers, name) + } + registry.mu.Unlock() + bgw.abort(err) +} + +// buildBackgroundWorkerLookups constructs a scope->lookup map from declared +// worker options. Each scope (php_server block, or 0 for global/embed) +// gets its own lookup so workers declared with the same name in different +// blocks don't collide. Each declaration gets its own registry so shared- +// entrypoint declarations keep their own template options. +func buildBackgroundWorkerLookups(workers []*worker, opts []workerOpt) map[BackgroundScope]*backgroundWorkerLookup { + lookups := make(map[BackgroundScope]*backgroundWorkerLookup) + + for i, o := range opts { + if !o.isBackgroundWorker { + continue + } + + scope := o.backgroundScope + lookup, ok := lookups[scope] + if !ok { + lookup = newBackgroundWorkerLookup() + lookups[scope] = lookup + } + + w := workers[i] + // Use the worker's normalized filename (EvalSymlinks + FastAbs + // from newWorker) instead of the raw o.fileName so lazy-start + // from a catch-all resolves the same entrypoint even if cwd or + // the symlink target changes after init. + registry := newBackgroundWorkerRegistry(w.fileName) + registry.scope = scope + registry.env = o.env + registry.watch = o.watch + registry.maxConsecutiveFailures = o.maxConsecutiveFailures + registry.requestOptions = o.requestOptions + + w.backgroundScope = scope + phpName := strings.TrimPrefix(w.name, "m#") + if phpName != "" && phpName != w.fileName { + if o.num > 0 { + registry.num = o.num + } + lookup.byName[phpName] = registry + // Named declaration: remember the *worker so lazy-start can + // reuse it instead of scanning workersByName. + registry.declaredWorker = w + + // Pre-reserve the live state for eager (num >= 1) named + // declarations: the worker thread created by initWorkers + // will reserve it in setupScript, but any ensure_background_worker + // call from an HTTP worker bootstrap that races ahead of + // setupScript would otherwise see a missing entry and + // lazy-start a duplicate. Reserving here makes the race + // impossible; setupScript picks up the existing state. + if o.num > 0 { + bgw := &backgroundWorkerState{ + ready: make(chan struct{}), + aborted: make(chan struct{}), + } + registry.workers[phpName] = bgw + w.backgroundWorker = bgw + } + } else { + maxW := defaultMaxBackgroundWorkers + if o.maxThreads > 1 { + maxW = o.maxThreads + } + registry.maxWorkers = maxW + lookup.catchAll = registry + // Catch-all declarations are strictly lazy-started: each + // ensure() with an unmatched name spawns its own threads on + // demand. Force num to 0 so initWorkers does not create + // eager placeholder threads that would call reserve() under + // the catch-all's own filename and consume one of the cap + // slots before any real lazy-start happens. + w.num = 0 + } + + w.backgroundRegistry = registry + } + + if len(lookups) == 0 { + return nil + } + return lookups +} + +// getLookup returns the background-worker lookup for the given thread. +// The scope is resolved from the thread's handler (for worker threads +// inheriting their worker's scope) or from the request context (for +// regular HTTP threads with WithRequestBackgroundScope). +// +// If the resolved scope has no workers declared (its lookup is nil), the +// caller falls through to the global/embed scope (0) so that globally- +// declared workers remain reachable from scoped requests. Scopes that +// declared their own workers stay strictly isolated because their lookup +// is non-nil. +func getLookup(thread *phpThread) *backgroundWorkerLookup { + if backgroundLookups == nil { + return nil + } + var scope BackgroundScope + if handler, ok := thread.handler.(*workerThread); ok { + scope = handler.worker.backgroundScope + } else if handler, ok := thread.handler.(*backgroundWorkerThread); ok { + scope = handler.worker.backgroundScope + } else if fc, ok := fromContext(thread.context()); ok { + scope = fc.backgroundScope + } + if scope != 0 { + if l := backgroundLookups[scope]; l != nil { + return l + } + } + return backgroundLookups[0] +} + +// startBackgroundWorker lazy-starts the named worker if it is not already +// running. Safe to call concurrently; only the first caller actually +// starts the worker, the rest observe the existing state. +func startBackgroundWorker(thread *phpThread, bgWorkerName string) error { + if bgWorkerName == "" { + return fmt.Errorf("background worker name must not be empty") + } + lookup := getLookup(thread) + if lookup == nil { + return fmt.Errorf("no background worker configured") + } + registry := lookup.Resolve(bgWorkerName) + if registry == nil || registry.entrypoint == "" { + return fmt.Errorf("no background worker configured for name %q", bgWorkerName) + } + return startBackgroundWorkerWithRegistry(registry, bgWorkerName) +} + +func startBackgroundWorkerWithRegistry(registry *backgroundWorkerRegistry, bgWorkerName string) error { + bgw, exists, err := registry.reserve(bgWorkerName) + if err != nil { + return err + } + if exists { + return nil + } + + numThreads := registry.maxThreads() + + // Named declarations (num=0 lazy or num>=1 eager) already have a + // pre-existing *worker struct recorded on the registry. Reuse it so + // lazy-start doesn't create a duplicate and - crucially for per- + // php_server isolation - doesn't route through the global + // workersByName map, which is scope-agnostic and would make two + // scopes sharing a user-facing name collide into the same *worker. + // Catch-all registries leave declaredWorker nil so each lazy-started + // name gets a fresh worker struct of its own. + var w *worker + freshWorker := false + if registry.declaredWorker != nil { + w = registry.declaredWorker + } else { + freshWorker = true + // Clone env and slices: newWorker mutates env (writes + // FRANKENPHP_WORKER) and appends to requestOptions, so sharing + // these across lazy-started instances would race with HTTP + // threads reading the originals. + env := make(PreparedEnv, len(registry.env)+1) + for k, v := range registry.env { + env[k] = v + } + watch := append([]string(nil), registry.watch...) + requestOptions := append([]RequestOption(nil), registry.requestOptions...) + + w, err = newWorker(workerOpt{ + name: bgWorkerName, + fileName: registry.entrypoint, + num: numThreads, + isBackgroundWorker: true, + backgroundScope: registry.scope, + env: env, + watch: watch, + maxConsecutiveFailures: registry.maxConsecutiveFailures, + requestOptions: requestOptions, + }) + if err != nil { + startErr := fmt.Errorf("failed to create background worker: %w", err) + registry.abortStart(bgWorkerName, bgw, startErr) + + return startErr + } + } + + w.isBackgroundWorker = true + w.backgroundWorker = bgw + w.backgroundRegistry = registry + // Redundant with newWorker's backgroundScope opt for fresh workers, + // but necessary for declared workers whose scope is set on the + // registry rather than on the workerOpt struct. + w.backgroundScope = registry.scope + + for i := 0; i < numThreads; i++ { + t := getInactivePHPThread() + if t == nil { + if i == 0 { + startErr := fmt.Errorf("no available PHP thread for background worker (increase max_threads)") + registry.abortStart(bgWorkerName, bgw, startErr) + + return startErr + } + globalLogger.LogAttrs(globalCtx, slog.LevelWarn, "background worker started with fewer threads than requested (increase max_threads)", + slog.String("worker", bgWorkerName), + slog.Int("requested", numThreads), + slog.Int("attached", i)) + break + } + if i == 0 && freshWorker { + // Freshly-created catch-all instance: add to the global list so + // RestartWorkers/DrainWorkers iterate it. Intentionally NOT + // registered in workersByName - bg workers are resolved per- + // scope via backgroundLookups, not via the global name map. + scalingMu.Lock() + workers = append(workers, w) + scalingMu.Unlock() + } + convertToBackgroundWorkerThread(t, w) + } + + if globalLogger.Enabled(globalCtx, slog.LevelInfo) { + globalLogger.LogAttrs(globalCtx, slog.LevelInfo, "background worker started", + slog.String("name", bgWorkerName), slog.Int("threads", numThreads)) + } + + return nil +} + +// isBootstrapEnsure reports whether the calling thread is inside an HTTP +// worker's boot phase (before the first frankenphp_handle_request). That +// context takes the strict fail-fast path; everywhere else (bg worker +// runtime, classic request-per-process) uses the tolerant lazy-start path. +func isBootstrapEnsure(thread *phpThread) bool { + handler, ok := thread.handler.(*workerThread) + return ok && handler.isBootingScript +} + +// go_frankenphp_ensure_background_worker declares a dependency on one or +// more background workers by name. Each named worker is lazy-started if +// not already running; the call blocks until every one has reached ready +// (set_vars called at least once) or the shared deadline expires. +// +// Bootstrap mode (HTTP worker before frankenphp_handle_request): fail-fast. +// Any boot failure throws immediately with captured details, without +// waiting for the restart/backoff cycle. A broken dependency visibly fails +// the HTTP worker instead of letting it serve degraded traffic. +// +// Runtime mode (inside frankenphp_handle_request, classic request path): +// tolerant. Waits up to the timeout, letting the restart-with-backoff +// cycle recover from transient boot failures. +// +//export go_frankenphp_ensure_background_worker +func go_frankenphp_ensure_background_worker(threadIndex C.uintptr_t, names **C.char, nameLens *C.size_t, nameCount C.int, timeoutMs C.int) *C.char { + thread := phpThreads[threadIndex] + lookup := getLookup(thread) + if lookup == nil { + return C.CString("no background worker configured") + } + + n := int(nameCount) + nameSlice := unsafe.Slice(names, n) + nameLenSlice := unsafe.Slice(nameLens, n) + bootstrap := isBootstrapEnsure(thread) + + // Start each named worker first. Reserve their states so a shared + // deadline applies across the whole group (the caller gets one + // timeout value, not one per worker). + sks := make([]*backgroundWorkerState, n) + goNames := make([]string, n) + for i := 0; i < n; i++ { + goNames[i] = C.GoStringN(nameSlice[i], C.int(nameLenSlice[i])) + if err := startBackgroundWorker(thread, goNames[i]); err != nil { + return C.CString(err.Error()) + } + registry := lookup.Resolve(goNames[i]) + if registry == nil { + return C.CString("background worker not found: " + goNames[i]) + } + registry.mu.Lock() + sks[i] = registry.workers[goNames[i]] + registry.mu.Unlock() + if sks[i] == nil { + return C.CString("background worker not found: " + goNames[i]) + } + } + + deadline := time.After(time.Duration(timeoutMs) * time.Millisecond) + if bootstrap { + ticker := time.NewTicker(50 * time.Millisecond) + defer ticker.Stop() + for i, sk := range sks { + wait: + for { + select { + case <-sk.ready: + break wait + case <-sk.aborted: + return C.CString(sk.abortErr) + case <-deadline: + return C.CString(formatBackgroundWorkerTimeoutError(goNames[i], sk)) + case <-globalCtx.Done(): + return C.CString("frankenphp is shutting down") + case <-ticker.C: + if sk.bootFailure.Load() != nil { + return C.CString(formatBackgroundWorkerTimeoutError(goNames[i], sk)) + } + } + } + } + return nil + } + + for i, sk := range sks { + select { + case <-sk.ready: + case <-sk.aborted: + return C.CString(sk.abortErr) + case <-deadline: + return C.CString(formatBackgroundWorkerTimeoutError(goNames[i], sk)) + case <-globalCtx.Done(): + return C.CString("frankenphp is shutting down") + } + } + return nil +} + +func formatBackgroundWorkerTimeoutError(name string, sk *backgroundWorkerState) string { + info := sk.bootFailure.Load() + if info == nil { + return fmt.Sprintf("timeout waiting for background worker %q", name) + } + msg := fmt.Sprintf( + "timeout waiting for background worker %q (entrypoint: %s); last boot failure after %d attempt(s), exit status %d", + name, info.entrypoint, info.failureCount, info.exitStatus, + ) + if info.phpError != "" { + msg += ": " + info.phpError + } + return msg +} + +// go_frankenphp_set_vars is called from PHP when a background worker +// publishes its shared vars. The caller has already deep-copied the vars +// into persistent memory; here we swap the pointer under the state lock +// and hand back the old pointer so the C side can free it after the call. +// +//export go_frankenphp_set_vars +func go_frankenphp_set_vars(threadIndex C.uintptr_t, varsPtr unsafe.Pointer, oldPtr *unsafe.Pointer) *C.char { + thread := phpThreads[threadIndex] + + bgHandler, ok := thread.handler.(*backgroundWorkerThread) + if !ok || bgHandler.worker.backgroundWorker == nil { + return C.CString("frankenphp_set_vars() can only be called from a background worker") + } + + sk := bgHandler.worker.backgroundWorker + + sk.mu.Lock() + *oldPtr = sk.varsPtr + sk.varsPtr = varsPtr + sk.varsVersion.Add(1) + sk.mu.Unlock() + + bgHandler.markBackgroundReady() + + return nil +} + +// go_frankenphp_get_vars resolves the named worker through the lookup +// (named or catch-all), waits on sk.ready without starting the worker, +// and copies its vars into the return value. +// +// callerVersion / outVersion implement a per-request cache: +// - If callerVersion is non-nil and equals the current varsVersion, +// the copy is skipped; outVersion is still set so the C side can +// reuse its cached zval with pointer equality. +// - Otherwise returnValue receives a fresh deep copy and outVersion +// reports the version that copy corresponds to. +// +//export go_frankenphp_get_vars +func go_frankenphp_get_vars(threadIndex C.uintptr_t, name *C.char, nameLen C.size_t, returnValue *C.zval, callerVersion *C.uint64_t, outVersion *C.uint64_t) *C.char { + thread := phpThreads[threadIndex] + lookup := getLookup(thread) + if lookup == nil { + return C.CString("no background worker configured") + } + + goName := C.GoStringN(name, C.int(nameLen)) + registry := lookup.Resolve(goName) + if registry == nil { + return C.CString("background worker not found: " + goName + " (call frankenphp_ensure_background_worker first)") + } + registry.mu.Lock() + sk := registry.workers[goName] + registry.mu.Unlock() + if sk == nil { + return C.CString("background worker not running: " + goName + " (call frankenphp_ensure_background_worker first)") + } + + select { + case <-sk.ready: + default: + return C.CString("background worker not ready: " + goName + " (no set_vars call yet)") + } + + // Fast path: caller's cached version matches current. Skip the copy; + // the caller will reuse its cached zval. + if callerVersion != nil && outVersion != nil { + v := sk.varsVersion.Load() + *outVersion = C.uint64_t(v) + if uint64(*callerVersion) == v { + return nil + } + } + + sk.mu.RLock() + C.frankenphp_copy_persistent_vars(returnValue, sk.varsPtr) + if outVersion != nil { + *outVersion = C.uint64_t(sk.varsVersion.Load()) + } + sk.mu.RUnlock() + + return nil +} diff --git a/background_worker_batch_test.go b/background_worker_batch_test.go new file mode 100644 index 0000000000..1958bc71d1 --- /dev/null +++ b/background_worker_batch_test.go @@ -0,0 +1,154 @@ +package frankenphp_test + +import ( + "errors" + "io" + "net/http/httptest" + "os" + "testing" + + "github.com/dunglas/frankenphp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestEnsureBackgroundWorkerBatch ensures multiple workers in one call, +// each publishing its own identity. Verifies the batch path (array arg) +// shares one deadline across all workers. +func TestEnsureBackgroundWorkerBatch(t *testing.T) { + cwd, _ := os.Getwd() + testDataDir := cwd + "/testdata/" + + require.NoError(t, frankenphp.Init( + frankenphp.WithWorkers("worker-a", testDataDir+"background-worker-named.php", 0, + frankenphp.WithWorkerBackground()), + frankenphp.WithWorkers("worker-b", testDataDir+"background-worker-named.php", 0, + frankenphp.WithWorkerBackground()), + frankenphp.WithWorkers("worker-c", testDataDir+"background-worker-named.php", 0, + frankenphp.WithWorkerBackground()), + frankenphp.WithNumThreads(6), + )) + t.Cleanup(frankenphp.Shutdown) + + req := httptest.NewRequest("GET", "http://example.com/background-worker-batch-ensure.php", nil) + fr, err := frankenphp.NewRequestWithContext(req, frankenphp.WithRequestDocumentRoot(testDataDir, false)) + require.NoError(t, err) + + w := httptest.NewRecorder() + if err := frankenphp.ServeHTTP(w, fr); err != nil && !errors.As(err, &frankenphp.ErrRejected{}) { + t.Fatalf("serve: %v", err) + } + body, _ := io.ReadAll(w.Result().Body) + out := string(body) + + assert.NotContains(t, out, "MISSING", "batch ensure should have started and published all workers:\n"+out) + assert.Contains(t, out, "worker-a=worker-a") + assert.Contains(t, out, "worker-b=worker-b") + assert.Contains(t, out, "worker-c=worker-c") +} + +// TestEnsureBackgroundWorkerBatchEmpty verifies that an empty array is +// rejected with a clear error rather than silently succeeding. +func TestEnsureBackgroundWorkerBatchEmpty(t *testing.T) { + cwd, _ := os.Getwd() + testDataDir := cwd + "/testdata/" + + require.NoError(t, frankenphp.Init( + frankenphp.WithWorkers("bg", testDataDir+"background-worker-named.php", 0, + frankenphp.WithWorkerBackground()), + frankenphp.WithNumThreads(3), + )) + t.Cleanup(frankenphp.Shutdown) + + php := `getMessage(); +}` + tmp := testDataDir + "bg-batch-empty.php" + require.NoError(t, os.WriteFile(tmp, []byte(php), 0644)) + t.Cleanup(func() { _ = os.Remove(tmp) }) + + req := httptest.NewRequest("GET", "http://example.com/bg-batch-empty.php", nil) + fr, _ := frankenphp.NewRequestWithContext(req, frankenphp.WithRequestDocumentRoot(testDataDir, false)) + w := httptest.NewRecorder() + _ = frankenphp.ServeHTTP(w, fr) + body, _ := io.ReadAll(w.Result().Body) + assert.Contains(t, string(body), "OK ") + assert.Contains(t, string(body), "must not be empty") + assert.NotContains(t, string(body), "FAIL") +} + +// TestEnsureBackgroundWorkerBatchNonString verifies array-entry type +// validation: non-string elements produce a TypeError. +func TestEnsureBackgroundWorkerBatchNonString(t *testing.T) { + cwd, _ := os.Getwd() + testDataDir := cwd + "/testdata/" + + require.NoError(t, frankenphp.Init( + frankenphp.WithWorkers("bg", testDataDir+"background-worker-named.php", 0, + frankenphp.WithWorkerBackground()), + frankenphp.WithNumThreads(3), + )) + t.Cleanup(frankenphp.Shutdown) + + php := `getMessage(); +}` + tmp := testDataDir + "bg-batch-nonstring.php" + require.NoError(t, os.WriteFile(tmp, []byte(php), 0644)) + t.Cleanup(func() { _ = os.Remove(tmp) }) + + req := httptest.NewRequest("GET", "http://example.com/bg-batch-nonstring.php", nil) + fr, _ := frankenphp.NewRequestWithContext(req, frankenphp.WithRequestDocumentRoot(testDataDir, false)) + w := httptest.NewRecorder() + _ = frankenphp.ServeHTTP(w, fr) + body, _ := io.ReadAll(w.Result().Body) + assert.Contains(t, string(body), "OK ") + assert.Contains(t, string(body), "must contain only strings") + assert.NotContains(t, string(body), "FAIL") +} + +// TestBackgroundWorkerServerFlag confirms that a bg worker sees +// FRANKENPHP_WORKER_BACKGROUND=true alongside FRANKENPHP_WORKER_NAME in +// $_SERVER, so scripts can branch without checking every function +// independently. +func TestBackgroundWorkerServerFlag(t *testing.T) { + cwd, _ := os.Getwd() + testDataDir := cwd + "/testdata/" + + require.NoError(t, frankenphp.Init( + frankenphp.WithWorkers("flag-worker", testDataDir+"background-worker-bg-flag.php", 1, + frankenphp.WithWorkerBackground()), + frankenphp.WithNumThreads(3), + )) + t.Cleanup(frankenphp.Shutdown) + + // ensure() removes the race between Init returning and the eager + // bg-worker thread reaching its first set_vars. + php := `getMessage(); + return; + } catch (\RuntimeException $e) { + usleep(10000); + } +} +echo 'TIMEOUT'; +` + out := serveInlinePHP(t, testDataDir, "bg-enum-missing-reader.php", php) + + assert.NotContains(t, out, "NO_ERROR", "enum should not have materialized:\n"+out) + assert.NotContains(t, out, "TIMEOUT", "worker never published:\n"+out) + assert.Contains(t, out, "LogicException") + assert.Contains(t, out, "WorkerOnlyEnum", "missing class name must appear in the error:\n"+out) +} + +// TestBackgroundWorkerSignalingStreamResource confirms that the value +// returned by frankenphp_get_worker_handle() is a real PHP stream +// resource. Complements the bounded-wall-clock force-kill test: that +// one proves the pipe closes on shutdown, this one proves the handle +// is a proper resource in the first place (not null, not an int, not +// a user object). +func TestBackgroundWorkerSignalingStreamResource(t *testing.T) { + cwd, _ := os.Getwd() + testDataDir := cwd + "/testdata/" + + require.NoError(t, frankenphp.Init( + frankenphp.WithWorkers("stream-worker", testDataDir+"background-worker-stream-probe.php", 1, + frankenphp.WithWorkerBackground()), + frankenphp.WithNumThreads(3), + )) + t.Cleanup(frankenphp.Shutdown) + + php := `getMessage(); +}` + tmp := testDataDir + "bg-ensure-undeclared.php" + require.NoError(t, os.WriteFile(tmp, []byte(php), 0644)) + t.Cleanup(func() { _ = os.Remove(tmp) }) + + req := httptest.NewRequest("GET", "http://example.com/bg-ensure-undeclared.php", nil) + fr, _ := frankenphp.NewRequestWithContext(req, frankenphp.WithRequestDocumentRoot(testDataDir, false)) + w := httptest.NewRecorder() + _ = frankenphp.ServeHTTP(w, fr) + body, _ := io.ReadAll(w.Result().Body) + assert.Contains(t, string(body), "OK no background worker configured for name", "ensure of undeclared name should error:\n"+string(body)) + assert.NotContains(t, string(body), "FAIL") + _ = fmt.Sprintf // keep fmt imported for potential future asserts + + _ = strings.TrimSpace // keep strings imported +} + +// TestBackgroundWorkerBootFailureError confirms that an entrypoint which +// throws during boot surfaces the captured details through ensure()'s +// timeout error message: entrypoint path, attempt count, and the PHP +// RuntimeException message. Runs as a non-worker request so ensure uses +// the tolerant lazy-start path (no fail-fast). +func TestBackgroundWorkerBootFailureError(t *testing.T) { + cwd, _ := os.Getwd() + testDataDir := cwd + "/testdata/" + + require.NoError(t, frankenphp.Init( + frankenphp.WithWorkers("boot-fail-worker", testDataDir+"background-worker-boot-fail.php", 0, + frankenphp.WithWorkerBackground()), + frankenphp.WithNumThreads(3), + )) + t.Cleanup(frankenphp.Shutdown) + + php := `getMessage(); +}` + tmp := testDataDir + "bg-boot-fail.php" + require.NoError(t, os.WriteFile(tmp, []byte(php), 0644)) + t.Cleanup(func() { _ = os.Remove(tmp) }) + + req := httptest.NewRequest("GET", "http://example.com/bg-boot-fail.php", nil) + fr, _ := frankenphp.NewRequestWithContext(req, frankenphp.WithRequestDocumentRoot(testDataDir, false)) + w := httptest.NewRecorder() + _ = frankenphp.ServeHTTP(w, fr) + body, _ := io.ReadAll(w.Result().Body) + out := string(body) + + assert.NotContains(t, out, "FAIL", "ensure should have thrown:\n"+out) + assert.Contains(t, out, `"boot-fail-worker"`) + assert.Contains(t, out, "background-worker-boot-fail.php", "entrypoint path must appear in the error:\n"+out) + assert.Contains(t, out, "attempt", "attempt count must appear:\n"+out) + assert.Contains(t, out, "intentional boot failure for test", "PHP exception message must be captured:\n"+out) +} diff --git a/background_worker_pool_test.go b/background_worker_pool_test.go new file mode 100644 index 0000000000..5357fe1399 --- /dev/null +++ b/background_worker_pool_test.go @@ -0,0 +1,98 @@ +package frankenphp_test + +import ( + "errors" + "io" + "net/http/httptest" + "os" + "testing" + + "github.com/dunglas/frankenphp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestBackgroundWorkerPool declares a named bg worker with num=3 (pool +// of three threads). All three threads should boot, share the same +// registered backgroundWorkerState, and the reader can see the pool's +// vars. This covers the lifted num>1 + max_threads>1 constraint. +func TestBackgroundWorkerPool(t *testing.T) { + cwd, _ := os.Getwd() + testDataDir := cwd + "/testdata/" + + require.NoError(t, frankenphp.Init( + frankenphp.WithWorkers("pool-worker", testDataDir+"background-worker-pool.php", 3, + frankenphp.WithWorkerBackground(), + frankenphp.WithWorkerMaxThreads(3)), + frankenphp.WithNumThreads(6), + )) + t.Cleanup(frankenphp.Shutdown) + + // Read the pool worker's vars via get_vars; all three threads share + // the same state so we don't need to target a specific one. ensure() + // waits for at least one pool thread's first set_vars so the eager + // start can't race the reader. + php := `getMessage(); +}` + tmp := testDataDir + "bg-stuck-reader.php" + require.NoError(t, os.WriteFile(tmp, []byte(readerPHP), 0644)) + t.Cleanup(func() { _ = os.Remove(tmp) }) + + require.Eventually(t, func() bool { + req := httptest.NewRequest("GET", "http://example.com/bg-stuck-reader.php", nil) + fr, err := frankenphp.NewRequestWithContext(req, frankenphp.WithRequestDocumentRoot(testDataDir, false)) + if err != nil { + return false + } + w := httptest.NewRecorder() + _ = frankenphp.ServeHTTP(w, fr) + body, _ := io.ReadAll(w.Result().Body) + return strings.Contains(string(body), "ready=1") + }, 5*time.Second, 25*time.Millisecond, "bg worker never entered sleep()") + + start := time.Now() + err := frankenphp.RestartWorkers() + elapsed := time.Since(start) + + // Grace period is 5s; allow slack for signal dispatch and drain + // completion. On Linux/FreeBSD pthread_kill(SIGRTMIN+3) wakes the + // sleep, so the abandon path should never trigger - RestartWorkers + // returns nil. + const budget = 8 * time.Second + assert.Less(t, elapsed, budget, "drain must force-kill the stuck bg worker within the grace period") + assert.NoError(t, err, "force-kill should wake the stuck bg worker and let it restart fully") +} diff --git a/caddy/admin.go b/caddy/admin.go index 8515f11326..8d7b7f3df3 100644 --- a/caddy/admin.go +++ b/caddy/admin.go @@ -39,7 +39,13 @@ func (admin *FrankenPHPAdmin) restartWorkers(w http.ResponseWriter, r *http.Requ return admin.error(http.StatusMethodNotAllowed, fmt.Errorf("method not allowed")) } - frankenphp.RestartWorkers() + if err := frankenphp.RestartWorkers(); err != nil { + // Restart is incomplete: at least one worker thread was stuck in + // an uninterruptible blocking call and did not reload code. Do + // not let the admin endpoint lie to automation with a 200. + caddy.Log().Sugar().Errorf("workers restart incomplete: %v", err) + return admin.error(http.StatusInternalServerError, err) + } caddy.Log().Info("workers restarted from admin api") admin.success(w, "workers restarted successfully\n") diff --git a/caddy/app.go b/caddy/app.go index fbe72eb620..6ba3e77b98 100644 --- a/caddy/app.go +++ b/caddy/app.go @@ -167,6 +167,10 @@ func (f *FrankenPHPApp) Start() error { frankenphp.WithWorkerRequestOptions(w.requestOptions...), ) + if w.Background { + w.options = append(w.options, frankenphp.WithWorkerBackground()) + } + f.opts = append(f.opts, frankenphp.WithWorkers(w.Name, repl.ReplaceKnown(w.FileName, ""), w.Num, w.options...)) } diff --git a/caddy/caddy_test.go b/caddy/caddy_test.go index b8961b96d8..4eb86f830b 100644 --- a/caddy/caddy_test.go +++ b/caddy/caddy_test.go @@ -3,6 +3,7 @@ package caddy_test import ( "bytes" "fmt" + "io" "net/http" "os" "path/filepath" @@ -72,6 +73,57 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } +// TestBackgroundWorker wires a named background worker through the +// Caddyfile `worker { background; name ...; file ... }` syntax and reads +// its vars from an HTTP request, proving the parser + app.Start wiring + +// PHP API all cooperate. +func TestBackgroundWorker(t *testing.T) { + tester := caddytest.NewTester(t) + initServer(t, tester, ` + { + skip_install_trust + admin localhost:2999 + http_port `+testPort+` + https_port 9443 + frankenphp { + worker { + name bg-basic + num 1 + file ../testdata/background-worker.php + background + } + } + } + + localhost:`+testPort+` { + route { + php { + root ../testdata + } + } + } + `, "caddyfile") + + // Background workers boot asynchronously with Init. Poll briefly for a + // non-MISSING response so we don't race against the first set_vars. + deadline := time.Now().Add(3 * time.Second) + for { + resp, err := http.Get("http://localhost:" + testPort + "/background-worker-reader.php") + require.NoError(t, err) + body, _ := io.ReadAll(resp.Body) + _ = resp.Body.Close() + if !strings.Contains(string(body), "MISSING") { + require.Contains(t, string(body), "message=hello from background worker") + require.Contains(t, string(body), "count=42") + return + } + if time.Now().After(deadline) { + t.Fatalf("background worker never published vars; last body: %q", body) + } + time.Sleep(50 * time.Millisecond) + } +} + func TestPHP(t *testing.T) { var wg sync.WaitGroup tester := caddytest.NewTester(t) diff --git a/caddy/module.go b/caddy/module.go index fe14818105..47350b38f4 100644 --- a/caddy/module.go +++ b/caddy/module.go @@ -51,6 +51,7 @@ type FrankenPHPModule struct { preparedEnvNeedsReplacement bool logger *slog.Logger requestOptions []frankenphp.RequestOption + backgroundScope frankenphp.BackgroundScope } // CaddyModule returns the Caddy module information. @@ -78,6 +79,14 @@ func (f *FrankenPHPModule) Provision(ctx caddy.Context) error { f.assignMercureHub(ctx) + // Each php_server block gets its own background-worker scope so + // workers declared with the same name in different blocks don't + // collide. Provision can be called more than once for the same + // module; only assign once. + if f.backgroundScope == 0 { + f.backgroundScope = frankenphp.NextBackgroundWorkerScope() + } + loggerOpt := frankenphp.WithRequestLogger(f.logger) for i, wc := range f.Workers { // make the file path absolute from the public directory @@ -92,6 +101,7 @@ func (f *FrankenPHPModule) Provision(ctx caddy.Context) error { } wc.requestOptions = append(wc.requestOptions, loggerOpt) + wc.options = append(wc.options, frankenphp.WithWorkerBackgroundScope(f.backgroundScope)) f.Workers[i] = wc } @@ -241,6 +251,7 @@ func (f *FrankenPHPModule) ServeHTTP(w http.ResponseWriter, r *http.Request, _ c opts, frankenphp.WithOriginalRequest(new(ctx.Value(caddyhttp.OriginalRequestCtxKey).(http.Request))), frankenphp.WithWorkerName(workerName), + frankenphp.WithRequestBackgroundScope(f.backgroundScope), )..., ) diff --git a/caddy/workerconfig.go b/caddy/workerconfig.go index c50f0d0688..d9d34bf456 100644 --- a/caddy/workerconfig.go +++ b/caddy/workerconfig.go @@ -41,6 +41,13 @@ type workerConfig struct { MatchPath []string `json:"match_path,omitempty"` // MaxConsecutiveFailures sets the maximum number of consecutive failures before panicking (defaults to 6, set to -1 to never panick) MaxConsecutiveFailures int `json:"max_consecutive_failures,omitempty"` + // Background marks this worker as a background (non-HTTP) worker. + // No `omitempty`: Caddy can reuse module instances across config reloads, + // and json.Unmarshal only overwrites fields present in the JSON. With + // `omitempty` on a bool, a previous true value would persist into a new + // config that doesn't specify the field, silently turning an HTTP worker + // into a background worker. + Background bool `json:"background"` options []frankenphp.WorkerOption requestOptions []frankenphp.RequestOption @@ -145,8 +152,10 @@ func unmarshalWorker(d *caddyfile.Dispenser) (workerConfig, error) { } wc.MaxConsecutiveFailures = v + case "background": + wc.Background = true default: - return wc, wrongSubDirectiveError("worker", "name, file, num, env, watch, match, max_consecutive_failures, max_threads", v) + return wc, wrongSubDirectiveError("worker", "name, file, num, env, watch, match, max_consecutive_failures, max_threads, background", v) } } @@ -154,6 +163,16 @@ func unmarshalWorker(d *caddyfile.Dispenser) (workerConfig, error) { return wc, d.Err(`the "file" argument must be specified`) } + if wc.Background { + // Named bg workers: num and max_threads mean "threads per worker" + // (pool size and pool cap). Catch-all bg workers: num is the pool + // size of the placeholder instance (usually 1), max_threads caps + // how many distinct names can be lazy-started. + if len(wc.MatchPath) != 0 { + return wc, d.Err(`"match" is not supported for background workers`) + } + } + if frankenphp.EmbeddedAppPath != "" && filepath.IsLocal(wc.FileName) { wc.FileName = filepath.Join(frankenphp.EmbeddedAppPath, wc.FileName) } diff --git a/context.go b/context.go index add8eff7bd..410042de70 100644 --- a/context.go +++ b/context.go @@ -38,6 +38,10 @@ type frankenPHPContext struct { handlerParameters any handlerReturn any + // backgroundScope selects the per-php_server background worker lookup + // for ensure/get_vars calls made from this request. + backgroundScope BackgroundScope + done chan any startedAt time.Time } diff --git a/docs/background-workers.md b/docs/background-workers.md new file mode 100644 index 0000000000..4d66263273 --- /dev/null +++ b/docs/background-workers.md @@ -0,0 +1,254 @@ +# Background Workers + +Background workers are long-running PHP scripts that run outside the HTTP request cycle. +They observe their environment and publish variables that HTTP threads (both [workers](worker.md) and classic requests) can read in real time. + +## How It Works + +1. A background worker runs its own event loop (subscribe to Redis, watch files, poll an API, etc.). +2. It calls `frankenphp_set_vars()` to publish a snapshot of key-value pairs. +3. HTTP threads call `frankenphp_ensure_background_worker()` to declare a dependency and make sure the worker is running (lazy-started if needed, blocks until it has published at least once). +4. HTTP threads then call `frankenphp_get_vars()` to read the latest snapshot (pure read, no blocking, identical zval across repeated reads in one request). + +## Configuration + +Add `worker` directives with `background` to your [`php_server` or `php` block](config.md#caddyfile-config): + +```caddyfile +example.com { + php_server { + # Named background workers + worker /app/bin/console { + background + name config-watcher + } + worker /app/bin/console { + background + name feature-flags + } + + # Catch-all: handles any unlisted name via ensure_background_worker() + worker /app/bin/console { + background + } + } +} +``` + +- **Named** (with `name`): lazy-started on first `ensure_background_worker()` call. If `num` is set to a positive integer, that many threads start eagerly at boot (pool mode); with `num 0` (default) the first `ensure()` starts one thread. +- **Catch-all** (no `name`): lazy-started on demand for any name not matched by a `name` directive. `max_threads` caps the number of distinct names it can lazy-start (default 16). Without a catch-all, only declared names can be ensured. +- Each `php_server` block has its own isolated scope: two blocks can use the same worker names without conflict. +- `max_consecutive_failures`, `env`, and `watch` work the same as for HTTP workers. + +## PHP API + +### `frankenphp_ensure_background_worker(string|array $name, float $timeout = 30.0): void` + +Declares a dependency on one or more background workers. Pass a single name or an array of names for batch dependency declaration; the timeout applies across all names in one call. Behaviour depends on the caller context: + +- **In an HTTP worker script, before `frankenphp_handle_request()` (bootstrap)**: lazy-starts the worker (at-most-once) if not already running and blocks until it has called `set_vars()` at least once. Fails fast on boot failure (no exponential-backoff tolerance): if the first boot attempts fail, the exception is thrown right away with the captured details. Use this to declare dependencies up front so broken deps visibly fail the HTTP worker rather than let it serve degraded traffic. +- **Everywhere else (inside `frankenphp_handle_request()`, or classic request-per-process)**: lazy-starts the worker and waits up to `$timeout`, tolerating transient boot failures via exponential backoff. The first caller pays the startup cost; subsequent callers in the same FrankenPHP process see the worker already reserved and return almost immediately. This supports the common pattern of library code loaded after bootstrap declaring its own dependencies lazily. + +```php +// HTTP worker, bootstrap phase +frankenphp_ensure_background_worker('redis-watcher'); // fail-fast + +while (frankenphp_handle_request(function () { + $cfg = frankenphp_get_vars('redis-watcher'); // pure read +})) { gc_collect_cycles(); } + +// Non-worker mode, every request +frankenphp_ensure_background_worker('redis-watcher'); // tolerant +$cfg = frankenphp_get_vars('redis-watcher'); + +// Batch form, shared deadline across workers +frankenphp_ensure_background_worker(['redis-watcher', 'config-watcher'], 5.0); +``` + +- Throws `RuntimeException` on timeout, missing entrypoint, or boot failure. The exception contains the captured failure details when available: resolved entrypoint path, exit status, number of attempts, and the last PHP error (message, file, line). +- Pick a short `$timeout` (e.g. `1.0`) to fail fast; pick a longer one to tolerate slow/flaky startups. +- `ValueError` is raised for an empty names array; `TypeError` is raised if the array contains non-strings. + +### `frankenphp_get_vars(string $name): array` + +Pure read: returns the latest published vars from a running background worker. Does not start workers or wait for readiness. + +```php +$redis = frankenphp_get_vars('redis-watcher'); +// ['MASTER_HOST' => '10.0.0.1', 'MASTER_PORT' => 6379] +``` + +- Throws `RuntimeException` if the worker isn't running or hasn't called `set_vars()` yet. Call `frankenphp_ensure_background_worker()` first to ensure readiness. +- Within a single HTTP request, repeated calls with the same name return the **same** cached array: `$a === $b` holds, and the lookup is O(1) after the first call. +- Works in both worker and non-worker mode. + +### `frankenphp_set_vars(array $vars): void` + +Publishes vars from inside a background worker. Each call **replaces** the entire vars array atomically. + +Allowed value types: `null`, scalars (`bool`, `int`, `float`, `string`), nested `array`s whose values are also allowed types, and **enum** instances. Objects (other than enum cases), resources, and references are rejected. + +- Throws `RuntimeException` if not called from a background worker thread. +- Throws `ValueError` if values contain unsupported types. + +### `frankenphp_get_worker_handle(): resource` + +Returns a readable stream for receiving signals from FrankenPHP. On shutdown or restart the write end of the underlying pipe is closed, so `fgets()` returns `false` (EOF). Use `stream_select()` to wait between iterations instead of `sleep()`: + +```php +function background_worker_should_stop(float $timeout = 0): bool +{ + static $stream; + $stream ??= frankenphp_get_worker_handle(); + $s = (int) $timeout; + + return match (@stream_select(...[[$stream], [], [], $s, (int) (($timeout - $s) * 1e6)])) { + 0 => false, // timeout, keep going + false => true, // error, stop + default => false === fgets($stream), // EOF = stop + }; +} +``` + +> [!WARNING] +> Avoid `sleep()` or `usleep()` in background workers: they block at the C level and cannot be interrupted cleanly. Use `stream_select()` with the signaling stream instead. If a worker ignores the signal, FrankenPHP force-kills it on Linux, FreeBSD and Windows after a 5-second grace period (see `Runtime Behaviour`). + +## Examples + +### Simple polling worker + +```php + run_config_watcher(), + 'feature-flags' => run_feature_flags(), + default => throw new \RuntimeException("Unknown background worker: $command"), +}; + +function run_config_watcher(): void +{ + $redis = new Redis(); + $redis->pconnect('127.0.0.1'); + + do { + frankenphp_set_vars([ + 'maintenance' => (bool) $redis->get('maintenance_mode'), + 'feature_flags' => json_decode($redis->get('features'), true), + ]); + } while (!background_worker_should_stop(5.0)); // check every 5s +} +``` + +### Event-driven worker + +For real-time subscriptions (Redis pub/sub, SSE, WebSocket), use an async library and register the signaling stream on the event loop: + +```php +function run_redis_watcher(): void +{ + $signalingStream = frankenphp_get_worker_handle(); + $sentinel = Amp\Redis\createRedisClient('tcp://sentinel-host:26379'); + + $subscription = $sentinel->subscribe('+switch-master'); + + Amp\async(function () use ($subscription) { + foreach ($subscription as $message) { + [$name, $oldIp, $oldPort, $newIp, $newPort] = explode(' ', $message); + frankenphp_set_vars([ + 'MASTER_HOST' => $newIp, + 'MASTER_PORT' => (int) $newPort, + ]); + } + }); + + $master = $sentinel->rawCommand('SENTINEL', 'get-master-addr-by-name', 'mymaster'); + frankenphp_set_vars([ + 'MASTER_HOST' => $master[0], + 'MASTER_PORT' => (int) $master[1], + ]); + + Amp\EventLoop::onReadable($signalingStream, function ($id) use ($signalingStream) { + if (false === fgets($signalingStream)) { + Amp\EventLoop::cancel($id); // EOF = stop + } + }); + Amp\EventLoop::run(); +} +``` + +### HTTP worker depending on a background worker + +```php +boot(); + +// Declare dependencies once at bootstrap (fail-fast) +frankenphp_ensure_background_worker(['config-watcher', 'feature-flags']); + +while (frankenphp_handle_request(function () use ($app) { + $config = frankenphp_get_vars('config-watcher'); // pure read + + $_SERVER += [ + 'APP_REDIS_HOST' => $config['MASTER_HOST'], + 'APP_REDIS_PORT' => $config['MASTER_PORT'], + ]; + $app->handle($_GET, $_POST, $_COOKIE, $_FILES, $_SERVER); +})) { + gc_collect_cycles(); +} +``` + +### Non-worker mode + +```php + getenv('REDIS_HOST') ?: '127.0.0.1']; +} +``` + +## Runtime Behaviour + +- Background workers get dedicated threads: they do not reduce HTTP capacity. +- `max_execution_time` is automatically disabled for background workers. +- `$_SERVER['FRANKENPHP_WORKER_NAME']` carries the worker's declared (or catch-all-resolved) name. +- `$_SERVER['FRANKENPHP_WORKER_BACKGROUND']` is `true` for background workers. +- `$_SERVER['argv'] = [$entrypoint, $name]` in background workers (for `bin/console`-style dispatching). +- Crash recovery: workers are automatically restarted with exponential backoff. During the restart window, `get_vars()` returns the last published data (stale but available) because vars are held in persistent memory across crashes. A warning is logged on crash. +- On shutdown/restart the signaling stream is closed (EOF). Well-behaved workers that check the stream exit within the 5-second grace period. Stuck workers are force-killed on Linux, FreeBSD, and Windows. + +## Scoping + +Each `php_server` block gets its own isolated background-worker scope, so workers declared with the same `name` in different blocks do not collide. Resolution rules for `ensure()` / `get_vars()`: + +- A request inside a `php_server` block resolves first against that block's own declarations. If the block declares any background workers of its own, that lookup is authoritative and scope-isolated from every other block. +- A request inside a `php_server` block that declares **no** background workers falls back to the global/embed scope (workers declared at the top-level `frankenphp` directive or via the Go library). This makes a single globally-declared worker reachable from all otherwise-unconfigured blocks. +- Requests made outside any `php_server` block (e.g. when embedding FrankenPHP as a library) always resolve to the global/embed scope. + +## Limits + +- Named background workers with `num > 1` spin up a pool of threads that share the same published vars; `get_vars()` sees one consistent snapshot. +- Multiple named background workers in the same block can share the same entrypoint file. Each declaration keeps its own `env`, `watch`, and failure policy. +- Calling `ensure()` on a name that isn't declared and isn't covered by a catch-all raises `RuntimeException`. diff --git a/frankenphp.c b/frankenphp.c index 0cc294e397..e22a70dfe7 100644 --- a/frankenphp.c +++ b/frankenphp.c @@ -16,6 +16,7 @@ #else #include #endif +#include
#include #include #include @@ -37,6 +38,7 @@ #include "_cgo_export.h" #include "frankenphp_arginfo.h" +#include "persistent_zval.h" #if defined(PHP_WIN32) && defined(ZTS) ZEND_TSRMLS_CACHE_DEFINE() @@ -85,20 +87,274 @@ HashTable *main_thread_env = NULL; __thread uintptr_t thread_index; __thread bool is_worker_thread = false; +__thread bool is_background_worker = false; +__thread char *worker_name = NULL; +__thread int worker_stop_fds[2] = {-1, -1}; +__thread php_stream *worker_signaling_stream = NULL; +__thread char *captured_last_php_error = NULL; __thread HashTable *sandboxed_env = NULL; +/* Per-thread cache for frankenphp_get_vars results. Maps worker name to + * { version, cached_zval }. When the Go side reports the version hasn't + * changed, the cached zval is returned with a refcount bump, giving the + * PHP caller the same HashTable pointer so repeated reads within a + * request run at O(1) without walking persistent memory every time. */ +typedef struct { + uint64_t version; + zval value; +} bg_vars_cache_entry; +__thread HashTable *bg_vars_cache = NULL; + +static void bg_vars_cache_dtor(zval *zv) { + bg_vars_cache_entry *entry = Z_PTR_P(zv); + zval_ptr_dtor(&entry->value); + free(entry); +} + +static void bg_vars_cache_reset(void) { + if (bg_vars_cache) { + zend_hash_destroy(bg_vars_cache); + free(bg_vars_cache); + bg_vars_cache = NULL; + } +} + #ifndef PHP_WIN32 static bool is_forked_child = false; static void frankenphp_fork_child(void) { is_forked_child = true; } #endif +/* Best-effort force-kill for stuck PHP threads. + * + * Each thread captures &EG(vm_interrupt) / &EG(timed_out) at boot and + * hands them to Go via go_frankenphp_store_force_kill_slot. To kill, + * Go passes the slot back to frankenphp_force_kill_thread, which stores + * true into both bools (the VM bails through zend_timeout() at the next + * opcode boundary) and then wakes any in-flight syscall: + * - Linux/FreeBSD: pthread_kill(SIGRTMIN+3) -> EINTR. + * - Windows: CancelSynchronousIo + QueueUserAPC for alertable I/O + + * SleepEx. Non-alertable Sleep (including PHP's usleep) stays stuck. + * - macOS: atomic-bool only; busy loops bail, blocking syscalls don't. + * + * Reserved signal: SIGRTMIN+3. PHP's pcntl_signal(SIGRTMIN+3, ...) + * clobbers it. glibc NPTL reserves SIGRTMIN..SIGRTMIN+2; embedders with + * their own Go signal usage may need to patch this constant. + * + * The slot lives Go-side on phpThread; the C side has no global table. + * The signal handler is installed once via pthread_once. */ +#ifdef PHP_WIN32 +static void CALLBACK frankenphp_noop_apc(ULONG_PTR param) { (void)param; } +#endif + +#ifdef FRANKENPHP_HAS_KILL_SIGNAL +/* No-op: delivery itself is what unblocks the syscall via EINTR. */ +static void frankenphp_kill_signal_handler(int sig) { (void)sig; } + +static pthread_once_t kill_signal_handler_installed = PTHREAD_ONCE_INIT; +static void install_kill_signal_handler(void) { + /* No SA_RESTART so syscalls return EINTR rather than being restarted. + * SA_ONSTACK guards against an accidental process-level delivery to a + * Go-managed thread, where Go requires the alternate signal stack. */ + struct sigaction sa; + memset(&sa, 0, sizeof(sa)); + sa.sa_handler = frankenphp_kill_signal_handler; + sigemptyset(&sa.sa_mask); + sa.sa_flags = SA_ONSTACK; + sigaction(FRANKENPHP_KILL_SIGNAL, &sa, NULL); +} +#endif + +/* Set by frankenphp_set_shutdown_in_progress to gate the unhealthy-thread + * respawn loop off once Shutdown begins. */ +static zend_atomic_bool shutdown_in_progress; + +void frankenphp_set_shutdown_in_progress(bool v) { + zend_atomic_bool_store(&shutdown_in_progress, v); +} + +/* Must run on the PHP thread itself: EG() resolves to its own TSRM + * context and pthread_self() captures the right tid. */ +void frankenphp_register_thread_for_kill(uintptr_t idx) { + force_kill_slot slot; + memset(&slot, 0, sizeof(slot)); + slot.vm_interrupt = &EG(vm_interrupt); + slot.timed_out = &EG(timed_out); +#ifdef FRANKENPHP_HAS_KILL_SIGNAL + slot.tid = pthread_self(); + pthread_once(&kill_signal_handler_installed, install_kill_signal_handler); +#elif defined(PHP_WIN32) + if (!DuplicateHandle(GetCurrentProcess(), GetCurrentThread(), + GetCurrentProcess(), &slot.thread_handle, 0, FALSE, + DUPLICATE_SAME_ACCESS)) { + /* On failure, force_kill falls back to atomic-bool only. */ + slot.thread_handle = NULL; + } +#endif + go_frankenphp_store_force_kill_slot(idx, slot); +} + +void frankenphp_force_kill_thread(force_kill_slot slot) { + if (slot.vm_interrupt == NULL) { + /* Boot aborted before register_thread_for_kill. */ + return; + } + /* Atomic stores first: by the time the thread wakes (signal-driven or + * natural) the VM sees them and bails through zend_timeout(). */ + zend_atomic_bool_store(slot.timed_out, true); + zend_atomic_bool_store(slot.vm_interrupt, true); + +#ifdef FRANKENPHP_HAS_KILL_SIGNAL + /* ESRCH (thread already exited) / EINVAL are both benign here. */ + pthread_kill(slot.tid, FRANKENPHP_KILL_SIGNAL); +#elif defined(PHP_WIN32) + if (slot.thread_handle != NULL) { + CancelSynchronousIo(slot.thread_handle); + QueueUserAPC((PAPCFUNC)frankenphp_noop_apc, slot.thread_handle, 0); + } +#endif +} + +/* CloseHandle on Windows; no-op on POSIX. */ +void frankenphp_release_thread_for_kill(force_kill_slot slot) { +#ifdef PHP_WIN32 + if (slot.thread_handle != NULL) { + CloseHandle(slot.thread_handle); + } +#else + (void)slot; +#endif +} + void frankenphp_update_local_thread_context(bool is_worker) { is_worker_thread = is_worker; + /* A thread that was previously a background worker can be recycled into an + * HTTP worker or a regular request thread; reset the background-worker TLS + * state so frankenphp_handle_request() and friends don't reject the caller + * based on a stale flag. frankenphp_set_worker_name() re-sets this for + * threads that actually become background workers. */ + is_background_worker = false; + free(worker_name); + worker_name = NULL; + /* workers should keep running if the user aborts the connection */ PG(ignore_user_abort) = is_worker ? 1 : original_user_abort_setting; } +/* Background worker stop-pipe: anonymous pipe whose read end is exposed to + * the PHP script via frankenphp_get_worker_handle. When the Go side closes + * the write end (on drain), the read end reaches EOF so the script can + * return from stream_select and exit its loop. */ +static int frankenphp_worker_open_stop_pipe(void) { +#ifdef PHP_WIN32 + return _pipe(worker_stop_fds, 4096, _O_BINARY); +#else + return pipe(worker_stop_fds); +#endif +} + +static void frankenphp_worker_close_stop_fds(void) { + for (int i = 0; i < 2; i++) { + if (worker_stop_fds[i] >= 0) { +#ifdef PHP_WIN32 + _close(worker_stop_fds[i]); +#else + close(worker_stop_fds[i]); +#endif + worker_stop_fds[i] = -1; + } + } +} + +void frankenphp_set_worker_name(char *name, bool background) { + free(worker_name); + if (name) { + size_t len = strlen(name) + 1; + worker_name = malloc(len); + memcpy(worker_name, name, len); + } else { + worker_name = NULL; + } + is_background_worker = background; + if (!background) { + return; + } + worker_signaling_stream = NULL; + /* Disarm any lingering max_execution_time timer from a previous request + * (background workers don't enforce it). zend_unset_timeout is always + * available and safe to call whether PHP was built with POSIX per-thread + * timers or the setitimer/SIGPROF fallback. */ + zend_unset_timeout(); + + frankenphp_worker_close_stop_fds(); + if (frankenphp_worker_open_stop_pipe() != 0) { + worker_stop_fds[0] = -1; + worker_stop_fds[1] = -1; + } +} + +int frankenphp_worker_get_stop_fd_write(void) { return worker_stop_fds[1]; } + +void frankenphp_worker_close_fd(int fd) { + if (fd < 0) { + return; + } + /* Closing the write end of the stop pipe lands as EOF on the read end, + * so the PHP side's stream_select returns promptly. */ +#ifdef PHP_WIN32 + _close(fd); +#else + close(fd); +#endif +} + +static int frankenphp_worker_dup_fd(int fd) { +#ifdef PHP_WIN32 + return _dup(fd); +#else + return dup(fd); +#endif +} + +void frankenphp_copy_persistent_vars(zval *dst, void *persistent_ht) { + zval src; + ZVAL_ARR(&src, (HashTable *)persistent_ht); + persistent_zval_to_request(dst, &src); +} + +/* Capture PG(last_error_*) into the thread-local captured_last_php_error. + * Called before php_request_shutdown, which clears PG(last_error_*). + * Format: " in on line ". */ +static void frankenphp_capture_last_php_error(void) { + if (captured_last_php_error != NULL) { + free(captured_last_php_error); + captured_last_php_error = NULL; + } + if (PG(last_error_message) == NULL) { + return; + } + const char *msg = ZSTR_VAL(PG(last_error_message)); + size_t msg_len = ZSTR_LEN(PG(last_error_message)); + const char *file = + PG(last_error_file) ? ZSTR_VAL(PG(last_error_file)) : "unknown"; + size_t file_len = PG(last_error_file) ? ZSTR_LEN(PG(last_error_file)) : 7; + int line = PG(last_error_lineno); + size_t buf_len = msg_len + file_len + 32; + captured_last_php_error = malloc(buf_len); + if (captured_last_php_error != NULL) { + snprintf(captured_last_php_error, buf_len, "%.*s in %.*s on line %d", + (int)msg_len, msg, (int)file_len, file, line); + } +} + +/* Return and take ownership of the captured error; caller frees with + * C.free. NULL if nothing was captured. */ +char *frankenphp_get_last_php_error(void) { + char *s = captured_last_php_error; + captured_last_php_error = NULL; + return s; +} + static void frankenphp_update_request_context() { /* the server context is stored on the go side, still SG(server_context) needs * to not be NULL */ @@ -708,6 +964,221 @@ PHP_FUNCTION(frankenphp_log) { } } +PHP_FUNCTION(frankenphp_set_vars) { + zval *vars = NULL; + + ZEND_PARSE_PARAMETERS_START(1, 1); + Z_PARAM_ARRAY(vars); + ZEND_PARSE_PARAMETERS_END(); + + /* Validate every value up front so allocation only happens for a tree + * we can fully round-trip. */ + zval *val; + ZEND_HASH_FOREACH_VAL(Z_ARRVAL_P(vars), val) { + if (!persistent_zval_validate(val)) { + zend_value_error( + "frankenphp_set_vars(): values must be null, scalars, arrays, or " + "enums; objects (other than enums) and resources are not allowed"); + RETURN_THROWS(); + } + } + ZEND_HASH_FOREACH_END(); + + zval persistent; + persistent_zval_persist(&persistent, vars); + + void *old = NULL; + char *error = + go_frankenphp_set_vars(thread_index, Z_ARRVAL(persistent), &old); + if (error) { + persistent_zval_free(&persistent); + zend_throw_exception(spl_ce_RuntimeException, error, 0); + free(error); + RETURN_THROWS(); + } + if (old != NULL) { + zval old_zv; + ZVAL_ARR(&old_zv, (HashTable *)old); + persistent_zval_free(&old_zv); + } +} + +PHP_FUNCTION(frankenphp_get_vars) { + zend_string *name = NULL; + + ZEND_PARSE_PARAMETERS_START(1, 1); + Z_PARAM_STR(name); + ZEND_PARSE_PARAMETERS_END(); + + /* Look up a cached entry first so Go can short-circuit the copy when + * the background worker has not changed its vars since we last read + * them. On a cache hit we reuse the same zval, so $vars === $prev_vars + * holds across repeated reads within one request. */ + uint64_t caller_version = 0; + uint64_t out_version = 0; + bg_vars_cache_entry *cached = NULL; + if (bg_vars_cache) { + zval *entry_zv = zend_hash_find(bg_vars_cache, name); + if (entry_zv) { + cached = Z_PTR_P(entry_zv); + caller_version = cached->version; + } + } + + char *error = go_frankenphp_get_vars( + thread_index, (char *)ZSTR_VAL(name), ZSTR_LEN(name), return_value, + cached ? &caller_version : NULL, &out_version); + if (error) { + zend_throw_exception(spl_ce_RuntimeException, error, 0); + free(error); + RETURN_THROWS(); + } + + if (cached && out_version == caller_version) { + /* Cache hit: Go skipped the copy. Return the cached zval. */ + ZVAL_COPY(return_value, &cached->value); + return; + } + + /* Cache miss: store the fresh copy so subsequent reads within this + * request can be served without walking persistent memory. */ + if (!bg_vars_cache) { + bg_vars_cache = malloc(sizeof(HashTable)); + zend_hash_init(bg_vars_cache, 4, NULL, bg_vars_cache_dtor, 1); + } + bg_vars_cache_entry *entry = malloc(sizeof(*entry)); + entry->version = out_version; + ZVAL_COPY(&entry->value, return_value); + zval entry_zv; + ZVAL_PTR(&entry_zv, entry); + zend_hash_update(bg_vars_cache, name, &entry_zv); +} + +PHP_FUNCTION(frankenphp_ensure_background_worker) { + zval *names_zv; + double timeout = 30.0; + + ZEND_PARSE_PARAMETERS_START(1, 2); + Z_PARAM_ZVAL(names_zv); + Z_PARAM_OPTIONAL; + Z_PARAM_DOUBLE(timeout); + ZEND_PARSE_PARAMETERS_END(); + + if (timeout < 0) { + zend_value_error("frankenphp_ensure_background_worker(): timeout must be " + "non-negative"); + RETURN_THROWS(); + } + int timeout_ms = (int)(timeout * 1000.0); + + /* Accept either a single string or an array of strings. For a single + * string we avoid the allocation by pointing at ZSTR fields directly. */ + char **name_ptrs = NULL; + size_t *name_lens = NULL; + int name_count = 0; + char *single_name_ptr = NULL; + size_t single_name_len = 0; + + if (Z_TYPE_P(names_zv) == IS_STRING) { + single_name_ptr = Z_STRVAL_P(names_zv); + single_name_len = Z_STRLEN_P(names_zv); + name_ptrs = &single_name_ptr; + name_lens = &single_name_len; + name_count = 1; + } else if (Z_TYPE_P(names_zv) == IS_ARRAY) { + HashTable *ht = Z_ARRVAL_P(names_zv); + name_count = zend_hash_num_elements(ht); + if (name_count == 0) { + zend_value_error("frankenphp_ensure_background_worker(): names array " + "must not be empty"); + RETURN_THROWS(); + } + name_ptrs = emalloc(name_count * sizeof(*name_ptrs)); + name_lens = emalloc(name_count * sizeof(*name_lens)); + int idx = 0; + zval *v; + ZEND_HASH_FOREACH_VAL(ht, v) { + if (Z_TYPE_P(v) != IS_STRING) { + efree(name_ptrs); + efree(name_lens); + zend_type_error("frankenphp_ensure_background_worker(): names array " + "must contain only strings"); + RETURN_THROWS(); + } + name_ptrs[idx] = Z_STRVAL_P(v); + name_lens[idx] = Z_STRLEN_P(v); + idx++; + } + ZEND_HASH_FOREACH_END(); + } else { + zend_type_error("frankenphp_ensure_background_worker(): name must be a " + "string or an array of strings"); + RETURN_THROWS(); + } + + char *error = go_frankenphp_ensure_background_worker( + thread_index, name_ptrs, name_lens, name_count, timeout_ms); + + if (name_count > 1) { + efree(name_ptrs); + efree(name_lens); + } + + if (error) { + zend_throw_exception(spl_ce_RuntimeException, error, 0); + free(error); + RETURN_THROWS(); + } +} + +PHP_FUNCTION(frankenphp_get_worker_handle) { + ZEND_PARSE_PARAMETERS_NONE(); + + if (!is_background_worker) { + zend_throw_exception(spl_ce_RuntimeException, + "frankenphp_get_worker_handle() can only be called " + "from a background worker", + 0); + RETURN_THROWS(); + } + + /* Return the cached stream on repeat calls. */ + if (worker_signaling_stream != NULL) { + php_stream_to_zval(worker_signaling_stream, return_value); + GC_ADDREF(Z_COUNTED_P(return_value)); + return; + } + + if (worker_stop_fds[0] < 0) { + zend_throw_exception(spl_ce_RuntimeException, + "failed to create background worker stop pipe", 0); + RETURN_THROWS(); + } + + /* DUP so closing the PHP stream doesn't affect worker_stop_fds[0]; the + * original stays owned by the C side for cleanup at worker restart. */ + int fd = frankenphp_worker_dup_fd(worker_stop_fds[0]); + if (fd < 0) { + zend_throw_exception(spl_ce_RuntimeException, + "failed to dup background worker stop fd", 0); + RETURN_THROWS(); + } + + php_stream *stream = php_stream_fopen_from_fd(fd, "rb", NULL); + if (!stream) { + frankenphp_worker_close_fd(fd); + zend_throw_exception(spl_ce_RuntimeException, + "failed to create stream from stop fd", 0); + RETURN_THROWS(); + } + + worker_signaling_stream = stream; + php_stream_to_zval(stream, return_value); + + /* Extra ref so PHP can't destroy the stream while TLS caches the pointer. */ + GC_ADDREF(Z_COUNTED_P(return_value)); +} + PHP_MINIT_FUNCTION(frankenphp) { register_frankenphp_symbols(module_number); #ifndef PHP_WIN32 @@ -1065,6 +1536,20 @@ static void *php_thread(void *arg) { snprintf(thread_name, 16, "php-%" PRIxPTR, thread_index); set_thread_name(thread_name); + /* Paired with go_frankenphp_thread_exited at the exit: label below; + * lets initPHPThreads wait for prior-generation threads. */ + go_frankenphp_thread_spawned(); + +#ifdef FRANKENPHP_HAS_KILL_SIGNAL + /* The spawning Go-managed M may block realtime signals, which the + * new pthread inherits. Unblock FRANKENPHP_KILL_SIGNAL here so + * force-kill deliveries are not silently dropped. */ + sigset_t unblock; + sigemptyset(&unblock); + sigaddset(&unblock, FRANKENPHP_KILL_SIGNAL); + pthread_sigmask(SIG_UNBLOCK, &unblock, NULL); +#endif + /* Initial allocation of all global PHP memory for this thread */ #ifdef ZTS (void)ts_resource(0); @@ -1073,6 +1558,11 @@ static void *php_thread(void *arg) { #endif #endif + /* Register this thread's vm_interrupt/timed_out addresses so the Go side + * can force-kill it after the graceful-drain grace period if it gets stuck + * in a busy PHP loop. */ + frankenphp_register_thread_for_kill(thread_index); + bool thread_is_healthy = true; bool has_attempted_shutdown = false; @@ -1092,6 +1582,45 @@ static void *php_thread(void *arg) { zend_bailout(); } + /* Background workers run indefinitely; disable max_execution_time + * so PHP's default 30s timer doesn't interrupt their main loop. + * php_request_startup re-arms the timer from INI, so we have to + * disarm it after the call. Also surface the worker name in $_SERVER + * and $argv so catch-all workers can tell which instance they are. */ + if (is_background_worker) { + zend_unset_timeout(); + zend_is_auto_global_str("_SERVER", sizeof("_SERVER") - 1); + zval *server = &PG(http_globals)[TRACK_VARS_SERVER]; + if (server && Z_TYPE_P(server) == IS_ARRAY) { + zval bg_zval; + ZVAL_TRUE(&bg_zval); + zend_hash_str_update( + Z_ARRVAL_P(server), "FRANKENPHP_WORKER_BACKGROUND", + sizeof("FRANKENPHP_WORKER_BACKGROUND") - 1, &bg_zval); + + if (worker_name != NULL) { + zval name_zval; + ZVAL_STRING(&name_zval, worker_name); + zend_hash_str_update(Z_ARRVAL_P(server), "FRANKENPHP_WORKER_NAME", + sizeof("FRANKENPHP_WORKER_NAME") - 1, + &name_zval); + + zval argv_array; + array_init(&argv_array); + add_next_index_string(&argv_array, scriptName); + add_next_index_string(&argv_array, worker_name); + + zval argc_zval; + ZVAL_LONG(&argc_zval, 2); + + zend_hash_str_update(Z_ARRVAL_P(server), "argv", sizeof("argv") - 1, + &argv_array); + zend_hash_str_update(Z_ARRVAL_P(server), "argc", sizeof("argc") - 1, + &argc_zval); + } + } + } + zend_file_handle file_handle; zend_stream_init_filename(&file_handle, scriptName); @@ -1114,6 +1643,15 @@ static void *php_thread(void *arg) { has_attempted_shutdown = true; + /* Capture the last PHP error before php_request_shutdown clears it, + * so background-worker boot failures can surface the cause. */ + frankenphp_capture_last_php_error(); + + /* Invalidate the per-request get_vars cache before php_request_shutdown + * tears down request memory: the cached zvals live in request memory + * and can't be freed after shutdown runs. */ + bg_vars_cache_reset(); + /* shutdown the request, potential bailout to zend_catch */ php_request_shutdown((void *)0); frankenphp_free_request_context(); @@ -1133,6 +1671,7 @@ static void *php_thread(void *arg) { if (!has_attempted_shutdown) { /* php_request_shutdown() was not called, force a shutdown now */ reset_sandboxed_environment(); + bg_vars_cache_reset(); zend_try { php_request_shutdown((void *)0); } zend_catch {} zend_end_try(); @@ -1150,6 +1689,11 @@ static void *php_thread(void *arg) { } zend_end_try(); + /* Must precede ts_free_thread: that frees the TSRM storage backing + * the slot's &EG() pointers. Clearing first means any concurrent + * force-kill either ran before us or sees a zero slot. */ + go_frankenphp_clear_force_kill_slot(thread_index); + /* free all global PHP memory reserved for this thread */ #ifdef ZTS ts_free_thread(); @@ -1158,12 +1702,17 @@ static void *php_thread(void *arg) { /* Thread is healthy, signal to Go that the thread has shut down */ if (thread_is_healthy) { go_frankenphp_on_thread_shutdown(thread_index); - - return NULL; + goto exit; } - /* Thread is unhealthy, PHP globals might be in a bad state after a bailout, - * restart the entire thread */ + /* Unhealthy: respawn unless Shutdown is in progress; respawning then + * would hand a fresh pthread a phpThreads slice already untracked. */ + if (zend_atomic_bool_load(&shutdown_in_progress)) { + frankenphp_log_message( + "Unhealthy thread unwinding after Shutdown; not restarting", + LOG_WARNING); + goto exit; + } frankenphp_log_message("Restarting unhealthy thread", LOG_WARNING); if (!frankenphp_new_php_thread(thread_index)) { @@ -1171,6 +1720,10 @@ static void *php_thread(void *arg) { frankenphp_log_message("Failed to restart an unhealthy thread", LOG_ERR); } +exit: + /* Single exit so spawned()/exited() pairing can never drift if a new + * return is added above. */ + go_frankenphp_thread_exited(); return NULL; } @@ -1265,7 +1818,12 @@ static void *php_main(void *arg) { go_frankenphp_main_thread_is_ready(); - /* channel closed, shutdown gracefully */ + /* channel closed, shutdown gracefully. Abandoned threads (stuck in an + * uninterruptible syscall after the force-kill deadline) are by + * definition not expected to resume; we run SAPI/TSRM teardown here + * unconditionally rather than try to outlive them. If one ever does + * unwind after teardown, the embedder's right move is process exit - + * see RestartWorkers' doc. */ frankenphp_sapi_module.shutdown(&frankenphp_sapi_module); sapi_shutdown(); @@ -1470,6 +2028,10 @@ int frankenphp_reset_opcache(void) { int frankenphp_get_current_memory_limit() { return PG(memory_limit); } void frankenphp_init_thread_metrics(int max_threads) { + /* Frees any prior generation's allocation; Shutdown leaves it alive + * for abandoned threads. lingeringThreads.Wait() upstream guarantees + * those have all exited before we get here. free(NULL) is a no-op. */ + free(thread_metrics); thread_metrics = calloc(max_threads, sizeof(frankenphp_thread_metrics)); } diff --git a/frankenphp.go b/frankenphp.go index 52246d01c7..62d9bec52b 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -157,8 +157,46 @@ func Config() PHPConfig { func calculateMaxThreads(opt *opt) (numWorkers int, _ error) { maxProcs := runtime.GOMAXPROCS(0) * 2 maxThreadsFromWorkers := 0 + reservedThreads := 0 for i, w := range opt.workers { + if w.isBackgroundWorker { + // Background workers default to one eager thread. For catch-all + // declarations, max_threads is the cap on lazy-started instances; + // reserve room for each so ensure() has a slot to schedule into. + // Bg workers are accounted for via reservedThreads only so they + // don't double-count against HTTP worker admission checks. + if w.maxThreads == 0 && w.num == 0 { + // Distinguish catch-all (no name / name == file) from a + // named lazy worker. Catch-all can host up to + // defaultMaxBackgroundWorkers distinct lazy-started names, + // so the thread budget must reserve that many slots; + // otherwise the second ensure() falls through to + // "no available PHP thread" long before hitting the + // advertised cap. + phpName := strings.TrimPrefix(w.name, "m#") + if phpName == "" || phpName == w.fileName { + opt.workers[i].maxThreads = defaultMaxBackgroundWorkers + } else { + opt.workers[i].maxThreads = 1 + } + } + extra := w.num + if extra < 1 { + extra = 1 + } + if w.maxThreads > extra { + extra = w.maxThreads + } + reservedThreads += extra + // Register the expected worker count for metrics too: without + // this, a bg-worker-only deployment never initialises + // totalWorkers, and every StartWorker/ReadyWorker call inside + // threadbackgroundworker.go becomes a silent no-op. + metrics.TotalWorkers(w.name, extra) + continue + } + if w.num <= 0 { // https://github.com/php/frankenphp/issues/126 opt.workers[i].num = maxProcs @@ -200,7 +238,9 @@ func calculateMaxThreads(opt *opt) (numWorkers int, _ error) { return 0, fmt.Errorf("num_threads (%d) must be greater than the number of worker threads (%d)", opt.numThreads, numWorkers) } - return numWorkers, nil + opt.numThreads += reservedThreads + opt.maxThreads += reservedThreads + return numWorkers + reservedThreads, nil } if maxThreadsIsSet && !numThreadsIsSet { @@ -209,7 +249,9 @@ func calculateMaxThreads(opt *opt) (numWorkers int, _ error) { return 0, fmt.Errorf("max_threads (%d) must be greater than the number of worker threads (%d)", opt.maxThreads, numWorkers) } - return numWorkers, nil + opt.numThreads += reservedThreads + opt.maxThreads += reservedThreads + return numWorkers + reservedThreads, nil } if !numThreadsIsSet { @@ -221,7 +263,9 @@ func calculateMaxThreads(opt *opt) (numWorkers int, _ error) { } opt.maxThreads = opt.numThreads - return numWorkers, nil + opt.numThreads += reservedThreads + opt.maxThreads += reservedThreads + return numWorkers + reservedThreads, nil } // both num_threads and max_threads are set @@ -233,7 +277,9 @@ func calculateMaxThreads(opt *opt) (numWorkers int, _ error) { return 0, fmt.Errorf("max_threads (%d) must be greater than or equal to num_threads (%d)", opt.maxThreads, opt.numThreads) } - return numWorkers, nil + opt.numThreads += reservedThreads + opt.maxThreads += reservedThreads + return numWorkers + reservedThreads, nil } // Init starts the PHP runtime and the configured workers. diff --git a/frankenphp.h b/frankenphp.h index 0ea8c80f41..05238afd30 100644 --- a/frankenphp.h +++ b/frankenphp.h @@ -46,6 +46,28 @@ static inline HRESULT LongLongSub(LONGLONG llMinuend, LONGLONG llSubtrahend, #include #include +#ifndef PHP_WIN32 +#include +#include +#endif + +/* Platform capabilities for the force-kill primitive; declared in the + * header so Go (via CGo) gets the correct struct layout too. */ +#if !defined(PHP_WIN32) && defined(SIGRTMIN) +#define FRANKENPHP_HAS_KILL_SIGNAL 1 +#define FRANKENPHP_KILL_SIGNAL (SIGRTMIN + 3) +#endif + +typedef struct { + zend_atomic_bool *vm_interrupt; + zend_atomic_bool *timed_out; +#ifdef FRANKENPHP_HAS_KILL_SIGNAL + pthread_t tid; +#elif defined(PHP_WIN32) + HANDLE thread_handle; +#endif +} force_kill_slot; + #ifndef FRANKENPHP_VERSION #define FRANKENPHP_VERSION dev #endif @@ -193,6 +215,26 @@ void frankenphp_init_thread_metrics(int max_threads); void frankenphp_destroy_thread_metrics(void); size_t frankenphp_get_thread_memory_usage(uintptr_t thread_index); +/* Best-effort force-kill primitives. The slot is populated by each PHP + * thread at boot (frankenphp_register_thread_for_kill calls back into Go + * via go_frankenphp_store_force_kill_slot) and lives in the Go-side + * phpThread. force_kill_thread interrupts the Zend VM at the next opcode + * boundary; on POSIX it also delivers SIGRTMIN+3 to the target thread, + * on Windows it calls CancelSynchronousIo + QueueUserAPC. release_thread + * drops any OS-owned resource tied to the slot (currently the Windows + * thread handle). */ +void frankenphp_set_shutdown_in_progress(bool v); +void frankenphp_register_thread_for_kill(uintptr_t thread_index); +void frankenphp_force_kill_thread(force_kill_slot slot); +void frankenphp_release_thread_for_kill(force_kill_slot slot); + +/* Background worker primitives. */ +void frankenphp_set_worker_name(char *name, bool background); +int frankenphp_worker_get_stop_fd_write(void); +void frankenphp_worker_close_fd(int fd); +void frankenphp_copy_persistent_vars(zval *dst, void *persistent_ht); +char *frankenphp_get_last_php_error(void); + void register_extensions(zend_module_entry **m, int len); #endif diff --git a/frankenphp.stub.php b/frankenphp.stub.php index d6c85aa05f..25d2debfd6 100644 --- a/frankenphp.stub.php +++ b/frankenphp.stub.php @@ -54,3 +54,33 @@ function mercure_publish(string|array $topics, string $data = '', bool $private * array $context Values of the array will be converted to the corresponding Go type (if supported by FrankenPHP) and added to the context of the structured logs using https://pkg.go.dev/log/slog#Attr */ function frankenphp_log(string $message, int $level = 0, array $context = []): void {} + +/** + * Declare a dependency on one or more background workers. Lazy-starts each + * worker that isn't already running, then blocks until every one has + * published its vars (set_vars) or the shared deadline expires. Timeout + * is in seconds. + * + * @param string|string[] $name + */ +function frankenphp_ensure_background_worker(string|array $name, float $timeout = 30.0): void {} + +/** + * Publish the given vars from a background worker. Only callable from a + * worker started with the `background` flag. Values must be null, scalars, + * arrays of allowed values, or enum cases. + */ +function frankenphp_set_vars(array $vars): void {} + +/** + * Read the shared vars published by the named background worker. Throws if + * the worker is not declared, not running, or has not yet called set_vars. + */ +function frankenphp_get_vars(string $name): array {} + +/** + * Return the stop-signal stream for the current background worker. The + * stream closes when FrankenPHP is draining the worker so the script can + * exit its loop gracefully. Only callable from inside a background worker. + */ +function frankenphp_get_worker_handle() {} diff --git a/frankenphp_arginfo.h b/frankenphp_arginfo.h index 4f2707cbca..4ec0058d83 100644 --- a/frankenphp_arginfo.h +++ b/frankenphp_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 60f0d27c04f94d7b24c052e91ef294595a2bc421 */ + * Stub hash: stub-hash-placeholder-minimal-bg-worker */ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_handle_request, 0, 1, _IS_BOOL, 0) ZEND_ARG_TYPE_INFO(0, callback, IS_CALLABLE, 0) @@ -41,6 +41,22 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_log, 0, 1, IS_VOID, 0 ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, context, IS_ARRAY, 0, "[]") ZEND_END_ARG_INFO() +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_ensure_background_worker, 0, 1, IS_VOID, 0) + ZEND_ARG_TYPE_MASK(0, name, MAY_BE_STRING|MAY_BE_ARRAY, NULL) + ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, timeout, IS_DOUBLE, 0, "30.0") +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_set_vars, 0, 1, IS_VOID, 0) + ZEND_ARG_TYPE_INFO(0, vars, IS_ARRAY, 0) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_get_vars, 0, 1, IS_ARRAY, 0) + ZEND_ARG_TYPE_INFO(0, name, IS_STRING, 0) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginfo_frankenphp_get_worker_handle, 0, 0, 0) +ZEND_END_ARG_INFO() + ZEND_FUNCTION(frankenphp_handle_request); ZEND_FUNCTION(headers_send); @@ -49,6 +65,10 @@ ZEND_FUNCTION(frankenphp_request_headers); ZEND_FUNCTION(frankenphp_response_headers); ZEND_FUNCTION(mercure_publish); ZEND_FUNCTION(frankenphp_log); +ZEND_FUNCTION(frankenphp_ensure_background_worker); +ZEND_FUNCTION(frankenphp_set_vars); +ZEND_FUNCTION(frankenphp_get_vars); +ZEND_FUNCTION(frankenphp_get_worker_handle); static const zend_function_entry ext_functions[] = { @@ -63,6 +83,10 @@ static const zend_function_entry ext_functions[] = { ZEND_FALIAS(apache_response_headers, frankenphp_response_headers, arginfo_apache_response_headers) ZEND_FE(mercure_publish, arginfo_mercure_publish) ZEND_FE(frankenphp_log, arginfo_frankenphp_log) + ZEND_FE(frankenphp_ensure_background_worker, arginfo_frankenphp_ensure_background_worker) + ZEND_FE(frankenphp_set_vars, arginfo_frankenphp_set_vars) + ZEND_FE(frankenphp_get_vars, arginfo_frankenphp_get_vars) + ZEND_FE(frankenphp_get_worker_handle, arginfo_frankenphp_get_worker_handle) ZEND_FE_END }; diff --git a/internal/state/state.go b/internal/state/state.go index f8d2b3acb7..a708ce205c 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -35,6 +35,13 @@ const ( Rebooting // C thread has exited and ZTS state is cleaned up, ready to spawn a new C thread RebootReady + + // Abandoned is set by RestartWorkers on threads that did not yield + // within the force-kill deadline. Handlers treat it like ShuttingDown + // on the next callback, so an abandoned thread that eventually + // unwinds exits cleanly instead of re-entering the serve loop with + // stale request state. + Abandoned ) func (s State) String() string { @@ -67,6 +74,8 @@ func (s State) String() string { return "rebooting" case RebootReady: return "reboot ready" + case Abandoned: + return "abandoned" default: return "unknown" } @@ -172,12 +181,12 @@ func (ts *ThreadState) WaitFor(states ...State) { func (ts *ThreadState) RequestSafeStateChange(nextState State) bool { ts.mu.Lock() switch ts.currentState { - // disallow state changes if shutting down or done - case ShuttingDown, Done, Reserved: + // Terminal: Abandoned never transitions to Ready/Inactive/ShuttingDown, + // so waiting would park forever. + case ShuttingDown, Done, Reserved, Abandoned: ts.mu.Unlock() return false - // ready and inactive are safe states to transition from case Ready, Inactive: ts.currentState = nextState ts.notifySubscribers(nextState) @@ -187,8 +196,9 @@ func (ts *ThreadState) RequestSafeStateChange(nextState State) bool { } ts.mu.Unlock() - // wait for the state to change to a safe state - ts.WaitFor(Ready, Inactive, ShuttingDown) + // Done and Abandoned in the set so a concurrent terminal transition + // wakes us; the recursive call below then hits the reject branch. + ts.WaitFor(Ready, Inactive, ShuttingDown, Done, Abandoned) return ts.RequestSafeStateChange(nextState) } diff --git a/options.go b/options.go index a9cd2a2630..34d454380a 100644 --- a/options.go +++ b/options.go @@ -50,6 +50,8 @@ type workerOpt struct { onThreadShutdown func(int) onServerStartup func() onServerShutdown func() + isBackgroundWorker bool + backgroundScope BackgroundScope } // WithContext sets the main context to use. @@ -258,6 +260,31 @@ func WithWorkerOnServerShutdown(f func()) WorkerOption { } } +// EXPERIMENTAL: WithWorkerBackground marks this worker as a background +// (non-HTTP) worker. Background workers run outside the request cycle and +// publish shared variables via frankenphp_set_vars for HTTP threads to read +// via frankenphp_get_vars. +func WithWorkerBackground() WorkerOption { + return func(w *workerOpt) error { + w.isBackgroundWorker = true + + return nil + } +} + +// EXPERIMENTAL: WithWorkerBackgroundScope assigns this worker to a given +// background-worker scope. Workers in the same scope share a background +// lookup; each php_server block gets its own scope so workers with the +// same name in different blocks don't collide. The zero value is the +// global/embed scope and is the default. +func WithWorkerBackgroundScope(scope BackgroundScope) WorkerOption { + return func(w *workerOpt) error { + w.backgroundScope = scope + + return nil + } +} + func withExtensionWorkers(w *extensionWorkers) WorkerOption { return func(wo *workerOpt) error { wo.extensionWorkers = w diff --git a/persistent_zval.h b/persistent_zval.h new file mode 100644 index 0000000000..204430194a --- /dev/null +++ b/persistent_zval.h @@ -0,0 +1,277 @@ +/* persistent_zval.h - Deep-copy zval trees to and from persistent memory. + * + * Provides a small, self-contained toolkit for moving zval trees across + * thread boundaries. The supported shape is a whitelist: scalars, arrays, + * and enums. Everything else is rejected by persistent_zval_validate so + * callers can fail fast before allocating. + * + * Fast paths: + * - Interned strings: shared memory, no copy. + * - Opcache-immutable arrays: shared pointer, no copy, no free. + * + * Included by frankenphp.c; not a standalone compilation unit. */ + +#ifndef PERSISTENT_ZVAL_H +#define PERSISTENT_ZVAL_H + +#include + +/* Enum payload stored in persistent memory: the class name + case name + * are kept as persistent zend_strings and the case object is re-resolved + * via zend_lookup_class + zend_enum_get_case_cstr on each read. */ +typedef struct { + zend_string *class_name; + zend_string *case_name; +} persistent_zval_enum_t; + +/* Whitelist check: only scalars, arrays of allowed values, and enum + * instances pass. Returns false for objects other than enums, resources, + * closures, references, etc. */ +static bool persistent_zval_validate(zval *z) { + switch (Z_TYPE_P(z)) { + case IS_NULL: + case IS_FALSE: + case IS_TRUE: + case IS_LONG: + case IS_DOUBLE: + case IS_STRING: + return true; + case IS_OBJECT: + return (Z_OBJCE_P(z)->ce_flags & ZEND_ACC_ENUM) != 0; + case IS_ARRAY: { + /* Opcache-immutable arrays are compile-time constants in shared + * memory; their leaves are guaranteed scalars or further immutable + * arrays. The copy/free paths below already trust this flag, so a + * recursive walk here would just be cycles. */ + if ((GC_FLAGS(Z_ARRVAL_P(z)) & IS_ARRAY_IMMUTABLE) != 0) + return true; + zval *val; + ZEND_HASH_FOREACH_VAL(Z_ARRVAL_P(z), val) { + if (!persistent_zval_validate(val)) + return false; + } + ZEND_HASH_FOREACH_END(); + return true; + } + default: + return false; + } +} + +/* Deep-copy a zval from request memory into persistent (pemalloc) memory. + * Callers must have already passed persistent_zval_validate on src. + * + * Storage convention for enums: dst becomes IS_PTR holding a + * persistent_zval_enum_t. This is an internal representation; the caller + * should never expose a persistent zval to PHP directly, only via + * persistent_zval_to_request. */ +static void persistent_zval_persist(zval *dst, zval *src) { + switch (Z_TYPE_P(src)) { + case IS_NULL: + case IS_FALSE: + case IS_TRUE: + ZVAL_COPY_VALUE(dst, src); + break; + case IS_LONG: + ZVAL_LONG(dst, Z_LVAL_P(src)); + break; + case IS_DOUBLE: + ZVAL_DOUBLE(dst, Z_DVAL_P(src)); + break; + case IS_STRING: { + zend_string *s = Z_STR_P(src); + if (ZSTR_IS_INTERNED(s)) { + ZVAL_STR(dst, s); /* interned strings live process-wide */ + } else { + ZVAL_NEW_STR(dst, zend_string_init(ZSTR_VAL(s), ZSTR_LEN(s), 1)); + } + break; + } + case IS_OBJECT: { + /* Must be an enum (validated earlier). */ + zend_class_entry *ce = Z_OBJCE_P(src); + persistent_zval_enum_t *e = pemalloc(sizeof(*e), 1); + e->class_name = + ZSTR_IS_INTERNED(ce->name) + ? ce->name + : zend_string_init(ZSTR_VAL(ce->name), ZSTR_LEN(ce->name), 1); + zval *case_name_zval = zend_enum_fetch_case_name(Z_OBJ_P(src)); + zend_string *case_str = Z_STR_P(case_name_zval); + e->case_name = + ZSTR_IS_INTERNED(case_str) + ? case_str + : zend_string_init(ZSTR_VAL(case_str), ZSTR_LEN(case_str), 1); + ZVAL_PTR(dst, e); + break; + } + case IS_ARRAY: { + HashTable *src_ht = Z_ARRVAL_P(src); + if ((GC_FLAGS(src_ht) & IS_ARRAY_IMMUTABLE) != 0) { + /* Opcache-immutable arrays live for the process lifetime and are + * safe to share across threads by pointer. Zero-copy, zero-free. */ + ZVAL_ARR(dst, src_ht); + break; + } + HashTable *dst_ht = pemalloc(sizeof(HashTable), 1); + zend_hash_init(dst_ht, zend_hash_num_elements(src_ht), NULL, NULL, 1); + ZVAL_ARR(dst, dst_ht); + + zend_string *key; + zend_ulong idx; + zval *val; + ZEND_HASH_FOREACH_KEY_VAL(src_ht, idx, key, val) { + zval pval; + persistent_zval_persist(&pval, val); + if (key) { + if (ZSTR_IS_INTERNED(key)) { + zend_hash_add_new(dst_ht, key, &pval); + } else { + zend_string *pkey = zend_string_init(ZSTR_VAL(key), ZSTR_LEN(key), 1); + /* Iteration guarantees the source key has its hash set. + * Propagating it lets zend_hash_add_new skip the re-hash. */ + ZSTR_H(pkey) = ZSTR_H(key); + zend_hash_add_new(dst_ht, pkey, &pval); + zend_string_release(pkey); + } + } else { + zend_hash_index_add_new(dst_ht, idx, &pval); + } + } + ZEND_HASH_FOREACH_END(); + break; + } + default: + /* Unreachable: persistent_zval_validate is the gatekeeper. */ + ZEND_UNREACHABLE(); + } +} + +/* Deep-free a persistent zval tree. Idempotent on scalars. Skips + * interned strings and immutable arrays (they are borrowed, not owned). */ +static void persistent_zval_free(zval *z) { + switch (Z_TYPE_P(z)) { + case IS_STRING: + if (!ZSTR_IS_INTERNED(Z_STR_P(z))) { + zend_string_free(Z_STR_P(z)); + } + break; + case IS_PTR: { + persistent_zval_enum_t *e = (persistent_zval_enum_t *)Z_PTR_P(z); + if (!ZSTR_IS_INTERNED(e->class_name)) + zend_string_free(e->class_name); + if (!ZSTR_IS_INTERNED(e->case_name)) + zend_string_free(e->case_name); + pefree(e, 1); + break; + } + case IS_ARRAY: { + HashTable *ht = Z_ARRVAL_P(z); + if ((GC_FLAGS(ht) & IS_ARRAY_IMMUTABLE) != 0) { + /* Borrowed from opcache, do not touch. */ + break; + } + zval *val; + ZEND_HASH_FOREACH_VAL(ht, val) { persistent_zval_free(val); } + ZEND_HASH_FOREACH_END(); + zend_hash_destroy(ht); + pefree(ht, 1); + break; + } + default: + break; + } +} + +/* Deep-copy a persistent zval tree back into request memory. Enums are + * resolved from their class+case names on each call. If the enum class + * or case can't be found in the current thread's class table, an + * exception is thrown and dst is set to IS_NULL. */ +static void persistent_zval_to_request(zval *dst, zval *src) { + switch (Z_TYPE_P(src)) { + case IS_NULL: + case IS_FALSE: + case IS_TRUE: + ZVAL_COPY_VALUE(dst, src); + break; + case IS_LONG: + ZVAL_LONG(dst, Z_LVAL_P(src)); + break; + case IS_DOUBLE: + ZVAL_DOUBLE(dst, Z_DVAL_P(src)); + break; + case IS_STRING: + if (ZSTR_IS_INTERNED(Z_STR_P(src))) { + ZVAL_STR(dst, Z_STR_P(src)); + } else { + ZVAL_STRINGL(dst, Z_STRVAL_P(src), Z_STRLEN_P(src)); + } + break; + case IS_PTR: { + persistent_zval_enum_t *e = (persistent_zval_enum_t *)Z_PTR_P(src); + zend_class_entry *ce = zend_lookup_class(e->class_name); + if (EG(exception)) { + /* Autoloader threw; let that exception propagate untouched. */ + ZVAL_NULL(dst); + break; + } + if (!ce || !(ce->ce_flags & ZEND_ACC_ENUM)) { + zend_throw_exception_ex(spl_ce_LogicException, 0, + "persistent_zval: enum class \"%s\" not found", + ZSTR_VAL(e->class_name)); + ZVAL_NULL(dst); + break; + } + zend_object *enum_obj = zend_enum_get_case_cstr(ce, ZSTR_VAL(e->case_name)); + if (!enum_obj) { + zend_throw_exception_ex(spl_ce_LogicException, 0, + "persistent_zval: enum case \"%s::%s\" not found", + ZSTR_VAL(e->class_name), ZSTR_VAL(e->case_name)); + ZVAL_NULL(dst); + break; + } + ZVAL_OBJ_COPY(dst, enum_obj); + break; + } + case IS_ARRAY: { + HashTable *src_ht = Z_ARRVAL_P(src); + if ((GC_FLAGS(src_ht) & IS_ARRAY_IMMUTABLE) != 0) { + /* Zero-copy: immutable arrays are safe to expose directly. */ + ZVAL_ARR(dst, src_ht); + break; + } + array_init_size(dst, zend_hash_num_elements(src_ht)); + HashTable *dst_ht = Z_ARRVAL_P(dst); + + zend_string *key; + zend_ulong idx; + zval *val; + ZEND_HASH_FOREACH_KEY_VAL(src_ht, idx, key, val) { + zval rval; + persistent_zval_to_request(&rval, val); + if (EG(exception)) { + zval_ptr_dtor(&rval); + break; + } + if (key) { + if (ZSTR_IS_INTERNED(key)) { + zend_hash_add_new(dst_ht, key, &rval); + } else { + zend_string *rkey = zend_string_init(ZSTR_VAL(key), ZSTR_LEN(key), 0); + ZSTR_H(rkey) = ZSTR_H(key); + zend_hash_add_new(dst_ht, rkey, &rval); + zend_string_release(rkey); + } + } else { + zend_hash_index_add_new(dst_ht, idx, &rval); + } + } + ZEND_HASH_FOREACH_END(); + break; + } + default: + /* Unreachable: only types produced by persistent_zval_persist land here. */ + ZEND_UNREACHABLE(); + } +} + +#endif /* PERSISTENT_ZVAL_H */ diff --git a/phpmainthread.go b/phpmainthread.go index 7f9b8fb947..5677d1833b 100644 --- a/phpmainthread.go +++ b/phpmainthread.go @@ -34,7 +34,19 @@ var ( // initPHPThreads starts the main PHP thread, // a fixed number of inactive PHP threads // and reserves a fixed number of possible PHP threads +// +// Precondition: every prior initPHPThreads must be paired with a +// drainPHPThreads first. Init without an intervening Shutdown blocks here +// forever because the prior generation's C threads still hold lingeringThreads. func initPHPThreads(numThreads int, numMaxThreads int, phpIni map[string]string) (*phpMainThread, error) { + // Wait for every prior C thread to fully exit before reassigning + // phpThreads / thread_metrics; otherwise a late callback from an + // abandoned thread would index the new generation's structures. + lingeringThreads.Wait() + + // Re-arm the unhealthy-restart respawn path for this Init cycle. + C.frankenphp_set_shutdown_in_progress(false) + mainThread = &phpMainThread{ state: state.NewThreadState(), done: make(chan struct{}), @@ -54,6 +66,9 @@ func initPHPThreads(numThreads int, numMaxThreads int, phpIni map[string]string) return nil, err } + // Must follow start(): maxThreads is only final once + // setAutomaticMaxThreads runs on the main PHP thread (before Ready). + // Frees any prior allocation; first call sees zero-init (free(NULL)). C.frankenphp_init_thread_metrics(C.int(mainThread.maxThreads)) // initialize all other threads @@ -75,21 +90,35 @@ func initPHPThreads(numThreads int, numMaxThreads int, phpIni map[string]string) return mainThread, nil } +// drainPHPThreads tears down the PHP thread pool. Abandoned threads (the +// force-kill deadline expired) stay alive until their syscall unwinds. +// +// Consequences: +// - Force-kill slots are cleared by each thread before its +// ts_free_thread, not here, so we don't double-CloseHandle on Windows. +// - phpThreads / thread_metrics stay allocated so a late callback from +// an abandoned thread indexes safely. The next initPHPThreads waits +// on lingeringThreads before reallocating. func drainPHPThreads() { if mainThread == nil { - return // mainThread was never initialized + return + } + // Idempotent: post-drain state is Reserved; a re-entry (e.g. a + // failed-Init cleanup) must not double-close mainThread.done. + if mainThread.state.Is(state.Reserved) { + return } + // Stop the unhealthy-restart respawn path before any thread exits. + C.frankenphp_set_shutdown_in_progress(true) doneWG := sync.WaitGroup{} doneWG.Add(len(phpThreads)) mainThread.state.Set(state.ShuttingDown) close(mainThread.done) for _, thread := range phpThreads { - // shut down all reserved threads if thread.state.CompareAndSwap(state.Reserved, state.Done) { doneWG.Done() continue } - // shut down all active threads go func(thread *phpThread) { thread.shutdown() doneWG.Done() @@ -99,8 +128,6 @@ func drainPHPThreads() { doneWG.Wait() mainThread.state.Set(state.Done) mainThread.state.WaitFor(state.Reserved) - C.frankenphp_destroy_thread_metrics() - phpThreads = nil } func (mainThread *phpMainThread) start() error { diff --git a/phpmainthread_test.go b/phpmainthread_test.go index d274991863..f244505cdd 100644 --- a/phpmainthread_test.go +++ b/phpmainthread_test.go @@ -35,7 +35,10 @@ func TestStartAndStopTheMainThreadWithOneInactiveThread(t *testing.T) { drainPHPThreads() - assert.Nil(t, phpThreads) + // phpThreads is intentionally kept allocated after drain so late + // callbacks from abandoned C threads can still index into it; see + // drainPHPThreads' doc. Verify each thread reached Done instead. + assert.True(t, phpThreads[0].state.Is(state.Done)) } func TestTransitionRegularThreadToWorkerThread(t *testing.T) { @@ -60,7 +63,7 @@ func TestTransitionRegularThreadToWorkerThread(t *testing.T) { assert.Len(t, worker.threads, 0) drainPHPThreads() - assert.Nil(t, phpThreads) + assert.True(t, phpThreads[0].state.Is(state.Done)) } func TestTransitionAThreadBetween2DifferentWorkers(t *testing.T) { @@ -86,7 +89,7 @@ func TestTransitionAThreadBetween2DifferentWorkers(t *testing.T) { assert.Len(t, secondWorker.threads, 1) drainPHPThreads() - assert.Nil(t, phpThreads) + assert.True(t, phpThreads[0].state.Is(state.Done)) } // try all possible handler transitions @@ -180,7 +183,7 @@ func TestFinishBootingAWorkerScript(t *testing.T) { ) drainPHPThreads() - assert.Nil(t, phpThreads) + assert.True(t, phpThreads[0].state.Is(state.Done)) } func TestReturnAnErrorIf2WorkersHaveTheSameFileName(t *testing.T) { diff --git a/phpthread.go b/phpthread.go index a941de9348..59b57277cc 100644 --- a/phpthread.go +++ b/phpthread.go @@ -5,9 +5,11 @@ package frankenphp import "C" import ( "context" + "log/slog" "runtime" "sync" "sync/atomic" + "time" "unsafe" "github.com/dunglas/frankenphp/internal/state" @@ -25,6 +27,12 @@ type phpThread struct { contextMu sync.RWMutex state *state.ThreadState requestCount atomic.Int64 + // forceKill holds &EG() pointers captured on the PHP thread itself. + // forceKillMu pairs with go_frankenphp_clear_force_kill_slot's write + // lock so a concurrent kill never dereferences pointers freed by + // ts_free_thread. + forceKillMu sync.RWMutex + forceKill C.force_kill_slot } // threadHandler defines how the callbacks from the C thread should be handled @@ -34,6 +42,12 @@ type threadHandler interface { afterScriptExecution(exitStatus int) context() context.Context frankenPHPContext() *frankenPHPContext + // drain is a hook called by drainWorkerThreads right before drainChan is + // closed. Handlers that need to wake up a thread parked in a blocking C + // call (e.g. by closing a stop pipe) plug their signal in here. All + // current handlers are no-ops; this is the seam later handler types use + // without having to modify drainWorkerThreads. + drain() } func newPHPThread(threadIndex int) *phpThread { @@ -86,14 +100,41 @@ func (thread *phpThread) reboot() bool { // shutdown the underlying PHP thread func (thread *phpThread) shutdown() { if !thread.state.RequestSafeStateChange(state.ShuttingDown) { - // already shutting down or done, wait for the C thread to finish - thread.state.WaitFor(state.Done, state.Reserved) + // Already terminal. Abandoned counts as terminal: the C thread + // either never unwinds or eventually transitions to Done. + thread.state.WaitFor(state.Done, state.Reserved, state.Abandoned) return } + // Wake up handlers parked in a blocking C call (background workers' + // stream_select on the stop pipe). No-op for regular/worker handlers. + thread.handler.drain() close(thread.drainChan) - thread.state.WaitFor(state.Done) + + // Bounded grace period, then force-kill, then abandon - so a thread + // stuck in an uninterruptible syscall can't hang Shutdown forever. + done := make(chan struct{}) + go func() { + thread.state.WaitFor(state.Done) + close(done) + }() + select { + case <-done: + case <-time.After(drainGracePeriod): + thread.forceKillMu.RLock() + C.frankenphp_force_kill_thread(thread.forceKill) + thread.forceKillMu.RUnlock() + select { + case <-done: + case <-time.After(forceKillDeadline): + if globalLogger.Enabled(globalCtx, slog.LevelWarn) { + globalLogger.LogAttrs(globalCtx, slog.LevelWarn, + "PHP thread did not exit after force-kill; abandoning to unblock Shutdown") + } + } + } + thread.drainChan = make(chan struct{}) // threads go back to the reserved state from which they can be booted again @@ -203,6 +244,45 @@ func go_frankenphp_after_script_execution(threadIndex C.uintptr_t, exitStatus C. thread.Unpin() } +// lingeringThreads counts live native PHP threads. initPHPThreads waits +// on it so re-Init can't reassign phpThreads / thread_metrics out from +// under an abandoned thread's pending callbacks. Each C thread does +// one Add at php_thread entry, one Done at exit. +var lingeringThreads sync.WaitGroup + +//export go_frankenphp_thread_spawned +func go_frankenphp_thread_spawned() { + lingeringThreads.Add(1) +} + +//export go_frankenphp_thread_exited +func go_frankenphp_thread_exited() { + lingeringThreads.Done() +} + +//export go_frankenphp_store_force_kill_slot +func go_frankenphp_store_force_kill_slot(threadIndex C.uintptr_t, slot C.force_kill_slot) { + thread := phpThreads[threadIndex] + thread.forceKillMu.Lock() + // Release any prior slot's OS resource (Windows HANDLE) before + // overwriting; a phpThread can reboot and re-register. + C.frankenphp_release_thread_for_kill(thread.forceKill) + thread.forceKill = slot + thread.forceKillMu.Unlock() +} + +//export go_frankenphp_clear_force_kill_slot +func go_frankenphp_clear_force_kill_slot(threadIndex C.uintptr_t) { + // Called from C before ts_free_thread on both exit paths. Zeroing + // the slot under the write lock guarantees any concurrent kill + // either completed before we got the lock or sees a zero slot. + thread := phpThreads[threadIndex] + thread.forceKillMu.Lock() + C.frankenphp_release_thread_for_kill(thread.forceKill) + thread.forceKill = C.force_kill_slot{} + thread.forceKillMu.Unlock() +} + //export go_frankenphp_on_thread_shutdown func go_frankenphp_on_thread_shutdown(threadIndex C.uintptr_t) { thread := phpThreads[threadIndex] diff --git a/requestoptions.go b/requestoptions.go index 42cc3cf7c0..2d85c13667 100644 --- a/requestoptions.go +++ b/requestoptions.go @@ -154,6 +154,17 @@ func WithRequestLogger(logger *slog.Logger) RequestOption { } } +// WithRequestBackgroundScope selects the background-worker scope for +// ensure/get_vars calls made from this request. Used by the Caddy module +// so each php_server block resolves its own set of background workers. +func WithRequestBackgroundScope(scope BackgroundScope) RequestOption { + return func(o *frankenPHPContext) error { + o.backgroundScope = scope + + return nil + } +} + // WithWorkerName sets the worker that should handle the request func WithWorkerName(name string) RequestOption { return func(o *frankenPHPContext) error { diff --git a/scaling.go b/scaling.go index c606925fdf..a08866ff24 100644 --- a/scaling.go +++ b/scaling.go @@ -213,8 +213,9 @@ func deactivateThreads() { for i := len(autoScaledThreads) - 1; i >= 0; i-- { thread := autoScaledThreads[i] - // the thread might have been stopped otherwise, remove it - if thread.state.Is(state.Reserved) { + // Drop terminal entries so an abandoned auto-scaled thread + // doesn't permanently occupy a global scaling slot. + if thread.state.Is(state.Reserved) || thread.state.Is(state.Done) || thread.state.Is(state.Abandoned) { autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...) continue } diff --git a/testdata/background-worker-batch-ensure.php b/testdata/background-worker-batch-ensure.php new file mode 100644 index 0000000000..46c2b84de8 --- /dev/null +++ b/testdata/background-worker-batch-ensure.php @@ -0,0 +1,9 @@ + $_SERVER['FRANKENPHP_WORKER_NAME'] ?? 'MISSING', + 'is_background' => $_SERVER['FRANKENPHP_WORKER_BACKGROUND'] ?? 'MISSING', +]); + +$stream = frankenphp_get_worker_handle(); +if ($stream !== null) { + $read = [$stream]; + $write = null; + $except = null; + stream_select($read, $write, $except, null); +} diff --git a/testdata/background-worker-binary.php b/testdata/background-worker-binary.php new file mode 100644 index 0000000000..b2545482ab --- /dev/null +++ b/testdata/background-worker-binary.php @@ -0,0 +1,23 @@ + "hello\x00world", + 'UTF8' => "héllo wörld 🚀", + 'EMPTY' => "", +]); + +$stream = frankenphp_get_worker_handle(); +$read = [$stream]; +$write = null; +$except = null; +stream_select($read, $write, $except, null); diff --git a/testdata/background-worker-boot-fail.php b/testdata/background-worker-boot-fail.php new file mode 100644 index 0000000000..ab6d43d0bb --- /dev/null +++ b/testdata/background-worker-boot-fail.php @@ -0,0 +1,6 @@ + 'cached-value', +]); + +$stream = frankenphp_get_worker_handle(); +if ($stream !== null) { + $read = [$stream]; + $write = null; + $except = null; + stream_select($read, $write, $except, null); +} diff --git a/testdata/background-worker-cache-identity.php b/testdata/background-worker-cache-identity.php new file mode 100644 index 0000000000..6200dd2072 --- /dev/null +++ b/testdata/background-worker-cache-identity.php @@ -0,0 +1,14 @@ + 1, 'phase' => 'pre-crash']); + file_put_contents($marker, '1'); + exit(1); +} + +frankenphp_set_vars(['count' => 2, 'phase' => 'post-restart']); + +$stream = frankenphp_get_worker_handle(); +$read = [$stream]; +$write = null; +$except = null; +stream_select($read, $write, $except, null); +@unlink($marker); diff --git a/testdata/background-worker-ensure-from-handler.php b/testdata/background-worker-ensure-from-handler.php new file mode 100644 index 0000000000..665cf8e0c1 --- /dev/null +++ b/testdata/background-worker-ensure-from-handler.php @@ -0,0 +1,7 @@ + WorkerOnlyEnum::Foo]); + +$stream = frankenphp_get_worker_handle(); +$read = [$stream]; +$write = null; +$except = null; +stream_select($read, $write, $except, null); diff --git a/testdata/background-worker-errors.php b/testdata/background-worker-errors.php new file mode 100644 index 0000000000..43b2b21bb2 --- /dev/null +++ b/testdata/background-worker-errors.php @@ -0,0 +1,25 @@ +getMessage(), "\n"; +} + +try { + frankenphp_set_vars(['foo' => 'bar']); + echo "FAIL set_vars from non-background\n"; +} catch (RuntimeException $e) { + echo "OK reject-non-bg: ", $e->getMessage(), "\n"; +} + +try { + frankenphp_get_worker_handle(); + echo "FAIL get_worker_handle from non-background\n"; +} catch (RuntimeException $e) { + echo "OK reject-handle: ", $e->getMessage(), "\n"; +} diff --git a/testdata/background-worker-named.php b/testdata/background-worker-named.php new file mode 100644 index 0000000000..a122b2f0d1 --- /dev/null +++ b/testdata/background-worker-named.php @@ -0,0 +1,24 @@ + $name, + 'count' => 1, +]); + +$stream = frankenphp_get_worker_handle(); +if ($stream !== null) { + $read = [$stream]; + $write = null; + $except = null; + stream_select($read, $write, $except, null); +} diff --git a/testdata/background-worker-pool.php b/testdata/background-worker-pool.php new file mode 100644 index 0000000000..9caec354ab --- /dev/null +++ b/testdata/background-worker-pool.php @@ -0,0 +1,23 @@ + 1 means multiple threads share this worker name. +// Each thread is a separate instance. We publish the thread's $_SERVER +// identifying fields so the test can see both instances are live. +frankenphp_set_vars([ + 'name' => $_SERVER['FRANKENPHP_WORKER_NAME'] ?? 'unknown', + 'pid' => getmypid(), +]); + +$stream = frankenphp_get_worker_handle(); +if ($stream !== null) { + $read = [$stream]; + $write = null; + $except = null; + stream_select($read, $write, $except, null); +} diff --git a/testdata/background-worker-reader.php b/testdata/background-worker-reader.php new file mode 100644 index 0000000000..45cc1fe286 --- /dev/null +++ b/testdata/background-worker-reader.php @@ -0,0 +1,8 @@ + 'A', +]); + +$stream = frankenphp_get_worker_handle(); +if ($stream !== null) { + $read = [$stream]; + $write = null; + $except = null; + stream_select($read, $write, $except, null); +} diff --git a/testdata/background-worker-scope-b.php b/testdata/background-worker-scope-b.php new file mode 100644 index 0000000000..05fa9679ef --- /dev/null +++ b/testdata/background-worker-scope-b.php @@ -0,0 +1,19 @@ + 'B', +]); + +$stream = frankenphp_get_worker_handle(); +if ($stream !== null) { + $read = [$stream]; + $write = null; + $except = null; + stream_select($read, $write, $except, null); +} diff --git a/testdata/background-worker-scope-reader.php b/testdata/background-worker-scope-reader.php new file mode 100644 index 0000000000..7a75030dd1 --- /dev/null +++ b/testdata/background-worker-scope-reader.php @@ -0,0 +1,9 @@ + get_resource_type($stream), + 'is_resource' => is_resource($stream), +]); + +$read = [$stream]; +$write = null; +$except = null; +stream_select($read, $write, $except, null); diff --git a/testdata/background-worker-stuck.php b/testdata/background-worker-stuck.php new file mode 100644 index 0000000000..28da9ce288 --- /dev/null +++ b/testdata/background-worker-stuck.php @@ -0,0 +1,17 @@ + 1]); + +// sleep() is interruptible by SIGRTMIN+3 on Linux/FreeBSD and by +// alertable-wait APCs on Windows; the test skips platforms where +// neither mechanism can interrupt it. +sleep(60); diff --git a/testdata/background-worker-types.php b/testdata/background-worker-types.php new file mode 100644 index 0000000000..29a8364cc0 --- /dev/null +++ b/testdata/background-worker-types.php @@ -0,0 +1,67 @@ + 123]); + $results[] = 'INT_VAL:allowed'; +} catch (\Throwable $e) { + $results[] = 'INT_VAL:blocked'; +} + +try { + frankenphp_set_vars([0 => 'val']); + $results[] = 'INT_KEY:allowed'; +} catch (\Throwable $e) { + $results[] = 'INT_KEY:blocked'; +} + +try { + frankenphp_set_vars(['nested' => ['a' => 1, 'b' => [true, null]]]); + $results[] = 'NESTED:allowed'; +} catch (\Throwable $e) { + $results[] = 'NESTED:blocked'; +} + +try { + frankenphp_set_vars(['KEY' => new \stdClass()]); + $results[] = 'OBJECT:allowed'; +} catch (\ValueError $e) { + $results[] = 'OBJECT:blocked'; +} + +try { + $ref = 'hello'; + $arr = ['KEY' => &$ref]; + frankenphp_set_vars($arr); + $results[] = 'REFERENCE:allowed'; +} catch (\ValueError $e) { + $results[] = 'REFERENCE:blocked'; +} + +frankenphp_set_vars([ + 'status' => BgTestStatus::Active, + 'RESULTS' => implode(',', $results), +]); + +$stream = frankenphp_get_worker_handle(); +$read = [$stream]; +$write = null; +$except = null; +stream_select($read, $write, $except, null); diff --git a/testdata/background-worker.php b/testdata/background-worker.php new file mode 100644 index 0000000000..67a68b17e7 --- /dev/null +++ b/testdata/background-worker.php @@ -0,0 +1,24 @@ + 'hello from background worker', + 'count' => 42, + 'ready_at' => microtime(true), +]); + +$stream = frankenphp_get_worker_handle(); +if ($stream !== null) { + // Block until the stream closes (frankenphp's drain signals EOF). + $read = [$stream]; + $write = null; + $except = null; + stream_select($read, $write, $except, null); +} diff --git a/testdata/worker-sleep.php b/testdata/worker-sleep.php new file mode 100644 index 0000000000..20eeb61bf8 --- /dev/null +++ b/testdata/worker-sleep.php @@ -0,0 +1,21 @@ + 1) can each be drained independently; the + // bg script uses the read end via frankenphp_get_worker_handle(). + stopFdWrite atomic.Int32 +} + +func convertToBackgroundWorkerThread(thread *phpThread, worker *worker) { + handler := &backgroundWorkerThread{ + state: thread.state, + thread: thread, + worker: worker, + } + thread.setHandler(handler) + worker.attachThread(thread) +} + +func (handler *backgroundWorkerThread) name() string { + return "Background Worker PHP Thread - " + handler.worker.fileName +} + +func (handler *backgroundWorkerThread) frankenPHPContext() *frankenPHPContext { + return handler.dummyFrankenPHPContext +} + +func (handler *backgroundWorkerThread) context() context.Context { + if handler.dummyContext != nil { + return handler.dummyContext + } + return globalCtx +} + +// drain is called by drainWorkerThreads (and thread.shutdown) right before +// drainChan is closed. We close the stop-pipe's write end so the PHP worker +// script, which is typically parked in stream_select on the read end, wakes +// up and can finish its loop gracefully. Per-thread fd so pool workers +// drain their threads independently. +func (handler *backgroundWorkerThread) drain() { + if fd := handler.stopFdWrite.Swap(-1); fd >= 0 { + C.frankenphp_worker_close_fd(C.int(fd)) + } +} + +func (handler *backgroundWorkerThread) beforeScriptExecution() string { + switch handler.state.Get() { + case state.TransitionRequested: + if handler.worker.onThreadShutdown != nil { + handler.worker.onThreadShutdown(handler.thread.threadIndex) + } + handler.worker.detachThread(handler.thread) + return handler.thread.transitionToNewHandler() + case state.Restarting: + if handler.worker.onThreadShutdown != nil { + handler.worker.onThreadShutdown(handler.thread.threadIndex) + } + handler.state.Set(state.Yielding) + // Abandoned covers the TOCTOU window where force-kill lands + // between drainWorkerThreads' classify and RestartWorkers' + // state write: we reach Yielding here after the classifier + // already wrote us off, and RestartWorkers then sets Abandoned + // instead of Ready. Without it, this thread would park forever. + handler.state.WaitFor(state.Ready, state.ShuttingDown, state.Abandoned) + return handler.beforeScriptExecution() + case state.Ready, state.TransitionComplete: + handler.thread.updateContext(true) + if handler.worker.onThreadReady != nil { + handler.worker.onThreadReady(handler.thread.threadIndex) + } + + handler.setupScript() + + return handler.worker.fileName + case state.ShuttingDown, state.Abandoned: + if handler.worker.onThreadShutdown != nil { + handler.worker.onThreadShutdown(handler.thread.threadIndex) + } + handler.worker.detachThread(handler.thread) + return "" + } + + panic("unexpected state: " + handler.state.Name()) +} + +func (handler *backgroundWorkerThread) setupScript() { + // Reserve the shared state from the registry on first setup. For lazy + // starts this has already been done by startBackgroundWorkerWithRegistry; + // for eager inits it runs here. sync.Once lets pool workers (num > 1) + // share the same reservation. + handler.worker.backgroundReserveOnce.Do(func() { + if handler.worker.backgroundWorker == nil && handler.worker.backgroundRegistry != nil { + bgw, _, err := handler.worker.backgroundRegistry.reserve(strings.TrimPrefix(handler.worker.name, "m#")) + if err == nil { + handler.worker.backgroundWorker = bgw + } + } + }) + + metrics.StartWorker(handler.worker.name) + + opts := append([]RequestOption(nil), handler.worker.requestOptions...) + C.frankenphp_set_worker_name(handler.thread.pinCString(strings.TrimPrefix(handler.worker.name, "m#")), C._Bool(true)) + handler.stopFdWrite.Store(int32(C.frankenphp_worker_get_stop_fd_write())) + + fc, err := newDummyContext( + filepath.Base(handler.worker.fileName), + opts..., + ) + if err != nil { + panic(err) + } + + ctx := context.WithValue(globalCtx, contextKey, fc) + + fc.worker = handler.worker + handler.dummyFrankenPHPContext = fc + handler.dummyContext = ctx + handler.isBootingScript = true + + if globalLogger.Enabled(ctx, slog.LevelDebug) { + globalLogger.LogAttrs(ctx, slog.LevelDebug, "starting background worker", slog.String("worker", handler.worker.name), slog.Int("thread", handler.thread.threadIndex)) + } + + handler.thread.state.Set(state.Ready) + fc.scriptFilename = handler.worker.fileName +} + +func (handler *backgroundWorkerThread) afterScriptExecution(exitStatus int) { + handler.stopFdWrite.Store(-1) + worker := handler.worker + handler.dummyFrankenPHPContext = nil + handler.dummyContext = nil + + // on exit status 0 we just run the worker script again + if exitStatus == 0 && !handler.isBootingScript { + metrics.StopWorker(worker.name, StopReasonRestart) + + if globalLogger.Enabled(globalCtx, slog.LevelDebug) { + globalLogger.LogAttrs(globalCtx, slog.LevelDebug, "restarting background worker", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("exit_status", exitStatus)) + } + + return + } + + if handler.isBootingScript { + metrics.StopWorker(worker.name, StopReasonBootFailure) + } else { + metrics.StopWorker(worker.name, StopReasonCrash) + } + + if !handler.isBootingScript { + if globalLogger.Enabled(globalCtx, slog.LevelWarn) { + globalLogger.LogAttrs(globalCtx, slog.LevelWarn, "background worker crashed, restarting", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("exit_status", exitStatus)) + } + + return + } + + // Boot failure: capture the failure info (including PG(last_error_*) + // via a C-side TLS grab done before php_request_shutdown cleared it) + // so ensure's bootstrap mode can surface the actionable cause. + if worker.backgroundWorker != nil { + var phpError string + if cErr := C.frankenphp_get_last_php_error(); cErr != nil { + phpError = C.GoString(cErr) + C.free(unsafe.Pointer(cErr)) + } + worker.backgroundWorker.bootFailure.Store(&bootFailureInfo{ + entrypoint: worker.fileName, + exitStatus: exitStatus, + failureCount: handler.failureCount + 1, + phpError: phpError, + }) + } + + if worker.maxConsecutiveFailures >= 0 && startupFailChan != nil && !watcherIsEnabled && handler.failureCount >= worker.maxConsecutiveFailures { + startupFailChan <- fmt.Errorf("too many consecutive failures: background worker %s has not reached frankenphp_set_vars()", worker.fileName) + handler.thread.state.Set(state.ShuttingDown) + return + } + + if globalLogger.Enabled(globalCtx, slog.LevelWarn) { + globalLogger.LogAttrs(globalCtx, slog.LevelWarn, "background worker boot failed, restarting", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("failures", handler.failureCount), slog.Int("exit_status", exitStatus)) + } + + backoffDuration := time.Duration(handler.failureCount*handler.failureCount*100) * time.Millisecond + if backoffDuration > time.Second { + backoffDuration = time.Second + } + handler.failureCount++ + time.Sleep(backoffDuration) +} + +// markBackgroundReady flips isBootingScript to false on the first set_vars +// call after each (re)boot and resets failure counters. Idempotent within +// a given boot: subsequent set_vars calls before the next crash-restart +// are no-ops here. +func (handler *backgroundWorkerThread) markBackgroundReady() { + if !handler.isBootingScript { + return + } + + handler.failureCount = 0 + handler.isBootingScript = false + if handler.worker.backgroundWorker != nil { + handler.worker.backgroundWorker.bootFailure.Store(nil) + } + + // Close the ready channel at-most-once; a second close would panic. + if handler.worker.backgroundWorker != nil { + handler.worker.backgroundWorker.readyOnce.Do(func() { + close(handler.worker.backgroundWorker.ready) + }) + } + + metrics.ReadyWorker(handler.worker.name) +} diff --git a/threadinactive.go b/threadinactive.go index b5d11fcdfc..b97998257a 100644 --- a/threadinactive.go +++ b/threadinactive.go @@ -35,7 +35,7 @@ func (handler *inactiveThread) beforeScriptExecution() string { return handler.beforeScriptExecution() - case state.ShuttingDown: + case state.ShuttingDown, state.Abandoned: // signal to stop return "" } @@ -58,3 +58,5 @@ func (handler *inactiveThread) context() context.Context { func (handler *inactiveThread) name() string { return "Inactive PHP Thread" } + +func (handler *inactiveThread) drain() {} diff --git a/threadregular.go b/threadregular.go index 49db71106b..61cd388789 100644 --- a/threadregular.go +++ b/threadregular.go @@ -57,7 +57,7 @@ func (handler *regularThread) beforeScriptExecution() string { handler.state.Set(state.Ready) return handler.waitForRequest() - case state.ShuttingDown: + case state.ShuttingDown, state.Abandoned: detachRegularThread(handler.thread) // signal to stop return "" @@ -83,6 +83,8 @@ func (handler *regularThread) name() string { return "Regular PHP Thread" } +func (handler *regularThread) drain() {} + func (handler *regularThread) waitForRequest() string { // max_requests reached: restart the thread to clean up all ZTS state if maxRequestsPerThread > 0 && handler.requestCount >= maxRequestsPerThread { diff --git a/threadtasks_test.go b/threadtasks_test.go index 2e74b12e93..b47fda1c75 100644 --- a/threadtasks_test.go +++ b/threadtasks_test.go @@ -56,7 +56,7 @@ func (handler *taskThread) beforeScriptExecution() string { handler.waitForTasks() return handler.beforeScriptExecution() - case state.ShuttingDown: + case state.ShuttingDown, state.Abandoned: // signal to stop return "" } @@ -79,6 +79,8 @@ func (handler *taskThread) name() string { return "Task PHP Thread" } +func (handler *taskThread) drain() {} + func (handler *taskThread) waitForTasks() { for { select { diff --git a/threadworker.go b/threadworker.go index 0fb315d1dc..d1eb7a17fc 100644 --- a/threadworker.go +++ b/threadworker.go @@ -52,7 +52,10 @@ func (handler *workerThread) beforeScriptExecution() string { handler.worker.onThreadShutdown(handler.thread.threadIndex) } handler.state.Set(state.Yielding) - handler.state.WaitFor(state.Ready, state.ShuttingDown) + // Abandoned releases us if we yielded after drainWorkerThreads + // already classified us as not-yielded; otherwise we'd park + // waiting for a Ready that RestartWorkers won't set. + handler.state.WaitFor(state.Ready, state.ShuttingDown, state.Abandoned) return handler.beforeScriptExecution() case state.Ready, state.TransitionComplete: handler.thread.updateContext(true) @@ -69,13 +72,14 @@ func (handler *workerThread) beforeScriptExecution() string { handler.requestCount = 0 handler.state.Set(state.Ready) return handler.beforeScriptExecution() - case state.ShuttingDown: + case state.ShuttingDown, state.Abandoned: + // Abandoned threads exit instead of re-entering the serve loop + // with stale context when their syscall finally unwinds. if handler.worker.onThreadShutdown != nil { handler.worker.onThreadShutdown(handler.thread.threadIndex) } handler.worker.detachThread(handler.thread) - // signal to stop return "" } @@ -105,6 +109,8 @@ func (handler *workerThread) name() string { return "Worker PHP Thread - " + handler.worker.fileName } +func (handler *workerThread) drain() {} + func setupWorkerScript(handler *workerThread, worker *worker) { metrics.StartWorker(worker.name) diff --git a/watcher.go b/watcher.go index cfe133e5ab..f1997f21a5 100644 --- a/watcher.go +++ b/watcher.go @@ -3,6 +3,7 @@ package frankenphp import ( + "log/slog" "sync/atomic" "github.com/dunglas/frankenphp/internal/watcher" @@ -33,7 +34,9 @@ func initWatchers(o *opt) error { watchPatterns = append(watchPatterns, &watcher.PatternGroup{ Callback: func(_ []*watcherGo.Event) { if restartWorkers.Swap(false) { - RestartWorkers() + if err := RestartWorkers(); err != nil && globalLogger.Enabled(globalCtx, slog.LevelError) { + globalLogger.LogAttrs(globalCtx, slog.LevelError, "watcher-triggered restart incomplete", slog.String("err", err.Error())) + } } }, }) diff --git a/worker.go b/worker.go index c97cc4a3a7..990b065cd9 100644 --- a/worker.go +++ b/worker.go @@ -4,6 +4,7 @@ package frankenphp import "C" import ( "fmt" + "log/slog" "os" "path/filepath" "runtime" @@ -33,6 +34,11 @@ type worker struct { onThreadReady func(int) onThreadShutdown func(int) queuedRequests atomic.Int32 + isBackgroundWorker bool + backgroundScope BackgroundScope + backgroundWorker *backgroundWorkerState + backgroundRegistry *backgroundWorkerRegistry + backgroundReserveOnce sync.Once } var ( @@ -65,18 +71,31 @@ func initWorkers(opt []workerOpt) error { totalThreadsToStart += w.num workers = append(workers, w) - workersByName[w.name] = w + // Background workers are resolved per-scope via backgroundLookups + // so the same user-facing name can appear in multiple scopes + // without colliding in the global workersByName map. + if !w.isBackgroundWorker { + workersByName[w.name] = w + } if w.allowPathMatching { workersByPath[w.fileName] = w } } + // Build the per-scope lookups (named + catch-all per scope). Each + // php_server block gets its own scope; the global/embed scope is 0. + backgroundLookups = buildBackgroundWorkerLookups(workers, opt) + startupFailChan = make(chan error, totalThreadsToStart) for _, w := range workers { for i := 0; i < w.num; i++ { thread := getInactivePHPThread() - convertToWorkerThread(thread, w) + if w.isBackgroundWorker { + convertToBackgroundWorkerThread(thread, w) + } else { + convertToWorkerThread(thread, w) + } workersReady.Go(func() { thread.state.WaitFor(state.Ready, state.ShuttingDown, state.Done) @@ -121,14 +140,21 @@ func newWorker(o workerOpt) (*worker, error) { } // workers that have a name starting with "m#" are module workers - // they can only be matched by their name, not by their path - allowPathMatching := !strings.HasPrefix(o.name, "m#") + // they can only be matched by their name, not by their path. + // Background workers are matched only by name, never by path, since + // they don't handle HTTP requests. + allowPathMatching := !strings.HasPrefix(o.name, "m#") && !o.isBackgroundWorker if w := workersByPath[absFileName]; w != nil && allowPathMatching { return w, fmt.Errorf("two workers cannot have the same filename: %q", absFileName) } - if w := workersByName[o.name]; w != nil { - return w, fmt.Errorf("two workers cannot have the same name: %q", o.name) + // Background workers are resolved through per-scope lookups, not the + // global workersByName map; the same user-facing name can appear in + // multiple php_server scopes without collision. + if !o.isBackgroundWorker { + if w := workersByName[o.name]; w != nil { + return w, fmt.Errorf("two workers cannot have the same name: %q", o.name) + } } if o.env == nil { @@ -148,8 +174,14 @@ func newWorker(o workerOpt) (*worker, error) { maxConsecutiveFailures: o.maxConsecutiveFailures, onThreadReady: o.onThreadReady, onThreadShutdown: o.onThreadShutdown, + isBackgroundWorker: o.isBackgroundWorker, } + // backgroundWorker state is reserved lazily via the registry at + // thread-setup time, not here; lazy-start callers set it directly + // and eager inits go through setupScript's sync.Once. The stop-pipe + // write fd is per-thread (handler field), not per-worker. + w.configureMercure(&o) w.requestOptions = append( @@ -165,16 +197,22 @@ func newWorker(o workerOpt) (*worker, error) { return w, nil } -// EXPERIMENTAL: DrainWorkers finishes all worker scripts before a graceful shutdown +// drainGracePeriod: time to wait for threads to yield before arming force-kill. +const drainGracePeriod = 5 * time.Second + +// forceKillDeadline: time to wait after arming force-kill before abandoning. +// Some syscalls (macOS, Windows non-alertable Sleep) can't be interrupted. +const forceKillDeadline = 5 * time.Second + +// EXPERIMENTAL: DrainWorkers initiates a graceful drain of all worker scripts. +// Best-effort: threads stuck in an uninterruptible blocking call are abandoned +// so the function cannot hang indefinitely. func DrainWorkers() { - _ = drainWorkerThreads() + _, _ = drainWorkerThreads() } -func drainWorkerThreads() []*phpThread { - var ( - ready sync.WaitGroup - drainedThreads []*phpThread - ) +func drainWorkerThreads() (drainedThreads []*phpThread, abandoned []*phpThread) { + var ready sync.WaitGroup for _, worker := range workers { worker.threadMutex.RLock() @@ -189,11 +227,14 @@ func drainWorkerThreads() []*phpThread { continue } + thread.handler.drain() close(thread.drainChan) drainedThreads = append(drainedThreads, thread) go func(thread *phpThread) { - thread.state.WaitFor(state.Yielding) + // Yielding = drained; Abandoned = force-kill timeout; + // ShuttingDown/Done = shutdown or late unwind. + thread.state.WaitFor(state.Yielding, state.Abandoned, state.ShuttingDown, state.Done) ready.Done() }(thread) } @@ -201,24 +242,121 @@ func drainWorkerThreads() []*phpThread { worker.threadMutex.RUnlock() } - ready.Wait() + done := make(chan struct{}) + go func() { + ready.Wait() + close(done) + }() - return drainedThreads + select { + case <-done: + case <-time.After(drainGracePeriod): + // Log before arming so operators see the signal even if the + // process crashes mid-delivery. + if globalLogger.Enabled(globalCtx, slog.LevelWarn) { + globalLogger.LogAttrs(globalCtx, slog.LevelWarn, "worker threads did not yield within grace period, force-killing stuck threads") + } + for _, thread := range drainedThreads { + if !thread.state.Is(state.Yielding) { + // RLock pairs with go_frankenphp_clear_force_kill_slot's + // write lock so the slot's EG() pointers don't race + // ts_free_thread on the target. + thread.forceKillMu.RLock() + C.frankenphp_force_kill_thread(thread.forceKill) + thread.forceKillMu.RUnlock() + } + } + select { + case <-done: + case <-time.After(forceKillDeadline): + if globalLogger.Enabled(globalCtx, slog.LevelWarn) { + globalLogger.LogAttrs(globalCtx, slog.LevelWarn, "worker threads did not yield after force-kill; abandoning to unblock drain") + } + } + } + + // Threads still not in Yielding never drained; surface them so + // RestartWorkers can wrap errIncompleteRestart for the caller. + for _, thread := range drainedThreads { + if !thread.state.Is(state.Yielding) { + abandoned = append(abandoned, thread) + } + } + + return drainedThreads, abandoned } -// RestartWorkers attempts to restart all workers gracefully -// All workers must be restarted at the same time to prevent issues with opcache resetting. -func RestartWorkers() { +// errIncompleteRestart indicates that RestartWorkers completed without +// actually reloading code on every worker thread: some threads were +// stuck in an uninterruptible blocking syscall that force-kill could not +// wake. The workers that did reload are already running the new code; +// the stuck threads will continue running the old code until they next +// yield. The error is wrapped with abandoned/restarted counts; callers +// that rely on the restart signal (watcher / admin endpoints / automation) +// should surface the wrapped error message. +var errIncompleteRestart = fmt.Errorf("workers restart incomplete: some threads were stuck in an uninterruptible blocking call and did not reload code") + +// RestartWorkers attempts to restart all workers gracefully. +// All workers must be restarted at the same time to prevent issues with +// opcache resetting. Returns errIncompleteRestart (wrapped with +// abandoned/restarted counts) if any worker thread could not be drained +// within the force-kill deadline; the function itself never hangs. +// +// When errIncompleteRestart is returned, abandoned C threads remain alive +// until their blocked syscall unwinds (if it ever does). Shutdown still +// runs SAPI/TSRM teardown unconditionally - by definition we already gave +// up on those threads. If one does unwind after teardown, it would touch +// torn-down state; embedders that observe errIncompleteRestart and want a +// fully clean process should terminate rather than re-Init. +func RestartWorkers() error { // disallow scaling threads while restarting workers scalingMu.Lock() defer scalingMu.Unlock() - threadsToRestart := drainWorkerThreads() + threadsToRestart, abandoned := drainWorkerThreads() + + abandonedSet := make(map[*phpThread]struct{}, len(abandoned)) + for _, thread := range abandoned { + abandonedSet[thread] = struct{}{} + } for _, thread := range threadsToRestart { + if _, ab := abandonedSet[thread]; ab { + // Abandoned threads must not re-enter the serve loop with + // stale context when their syscall finally unwinds. + thread.state.Set(state.Abandoned) + continue + } thread.drainChan = make(chan struct{}) thread.state.Set(state.Ready) } + + // Detach abandoned threads now so isAtThreadLimit and the scaler see + // accurate capacity. The handler's Abandoned arm calls detachThread + // again when the syscall unwinds; that's a no-op by then. + if len(abandoned) > 0 { + for _, w := range workers { + w.threadMutex.Lock() + next := w.threads[:0] + for _, t := range w.threads { + if _, ab := abandonedSet[t]; !ab { + next = append(next, t) + } + } + w.threads = next + w.threadMutex.Unlock() + } + + if globalLogger.Enabled(globalCtx, slog.LevelError) { + globalLogger.LogAttrs(globalCtx, slog.LevelError, + errIncompleteRestart.Error(), + slog.Int("abandoned", len(abandoned)), + slog.Int("restarted", len(threadsToRestart)-len(abandoned))) + } + return fmt.Errorf("%w (abandoned=%d, restarted=%d)", + errIncompleteRestart, len(abandoned), len(threadsToRestart)-len(abandoned)) + } + return nil } func (worker *worker) attachThread(thread *phpThread) { diff --git a/worker_test.go b/worker_test.go index 3fd2d63f94..bab73a7f2c 100644 --- a/worker_test.go +++ b/worker_test.go @@ -9,13 +9,18 @@ import ( "net/http" "net/http/httptest" "net/url" + "os" + "path/filepath" + "runtime" "strconv" "strings" "sync" "testing" + "time" "github.com/dunglas/frankenphp" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestWorker(t *testing.T) { @@ -45,6 +50,68 @@ func TestWorker(t *testing.T) { }, &testOptions{workerScript: "worker.php", nbWorkers: 1, nbParallelRequests: 1}) } +// TestRestartWorkersForceKillsStuckThread verifies the drain path does +// not hang when a worker is stuck in a blocking PHP call (sleep, etc.). +// macOS has no realtime signals so we can't unblock sleep() there; skip. +func TestRestartWorkersForceKillsStuckThread(t *testing.T) { + if runtime.GOOS != "linux" && runtime.GOOS != "freebsd" && runtime.GOOS != "windows" { + t.Skipf("force-kill cannot interrupt blocking syscalls on %s", runtime.GOOS) + } + + cwd, _ := os.Getwd() + testDataDir := cwd + "/testdata/" + + require.NoError(t, frankenphp.Init( + frankenphp.WithWorkers("sleep-worker", testDataDir+"worker-sleep.php", 1), + frankenphp.WithNumThreads(2), + )) + t.Cleanup(frankenphp.Shutdown) + + // Marker file the worker touches right before sleep(); per-run path + // so a stale file from a prior test can't fool the poll below. + markerFile := filepath.Join(t.TempDir(), "sleep-worker-in-sleep") + + // Worker handles the request, then sleep(60). Recorder lets us + // assert post-sleep code never runs (would indicate the VM interrupt + // didn't fire and only drainChan got picked up). + recorder := httptest.NewRecorder() + served := make(chan struct{}) + go func() { + defer close(served) + req := httptest.NewRequest("GET", "http://example.com/worker-sleep.php", nil) + req.Header.Set("Sleep-Marker", markerFile) + fr, err := frankenphp.NewRequestWithContext(req, frankenphp.WithRequestDocumentRoot(testDataDir, false)) + if err != nil { + return + } + _ = frankenphp.ServeHTTP(recorder, fr) + }() + + // Confirm the worker is parked in sleep() before triggering the + // restart, so we exercise the force-kill path and not drainChan. + require.Eventually(t, func() bool { + _, err := os.Stat(markerFile) + return err == nil + }, 5*time.Second, 10*time.Millisecond, "worker never entered sleep()") + + start := time.Now() + err := frankenphp.RestartWorkers() + elapsed := time.Since(start) + + // 5s grace + 3s slack for signal dispatch, VM tick, restart loop. + const budget = 8 * time.Second + assert.Less(t, elapsed, budget, "drain must force-kill the stuck thread within the grace period") + assert.NoError(t, err, "force-kill should wake the stuck thread and let it restart fully") + + select { + case <-served: + case <-time.After(2 * time.Second): + t.Fatal("server request goroutine did not complete after drain") + } + assert.NotContains(t, recorder.Body.String(), "should not reach", + "VM interrupt was never observed; sleep returned naturally") +} + func TestWorkerDie(t *testing.T) { runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, i int) { req := httptest.NewRequest("GET", "http://example.com/die.php", nil)