From c7f4ba60f28c7e7e135710965615b60443507452 Mon Sep 17 00:00:00 2001 From: David Badura Date: Sun, 26 Apr 2026 11:06:35 +0200 Subject: [PATCH] refactor subscription engine --- docker-compose.yaml | 13 +- src/Subscription/Engine/Command/Boot.php | 21 ++ src/Subscription/Engine/Command/Command.php | 18 ++ src/Subscription/Engine/Command/Pause.php | 9 + .../Engine/Command/Reactivate.php | 9 + src/Subscription/Engine/Command/Refresh.php | 9 + src/Subscription/Engine/Command/Remove.php | 9 + src/Subscription/Engine/Command/Run.php | 21 ++ src/Subscription/Engine/Command/Setup.php | 20 ++ src/Subscription/Engine/Command/Teardown.php | 9 + src/Subscription/Engine/Event/OnCommand.php | 15 ++ .../Engine/Event/OnHandleMessage.php | 17 ++ .../Engine/Event/OnHandleMessageError.php | 21 ++ .../Engine/Event/OnHandleMessageSuccess.php | 19 ++ .../Engine/Event/OnProcessingFinished.php | 21 ++ src/Subscription/Engine/Event/OnResult.php | 17 ++ .../Engine/Handler/BootHandler.php | 221 ++++++++++++++++++ src/Subscription/Engine/Handler/Handler.php | 16 ++ .../Engine/Handler/PauseHandler.php | 60 +++++ .../Engine/Handler/ReactivateHandler.php | 93 ++++++++ .../Engine/Handler/RefreshHandler.php | 116 +++++++++ .../Engine/Handler/RemoveHandler.php | 127 ++++++++++ .../Engine/Handler/RunHandler.php | 180 ++++++++++++++ .../Engine/Handler/SetupHandler.php | 133 +++++++++++ .../Engine/Handler/TeardownHandler.php | 130 +++++++++++ .../LegacyWrapperSubscriptionEngine.php | 135 +++++++++++ .../Engine/Listener/BatchSubscriber.php | 164 +++++++++++++ .../Engine/Listener/DetachListener.php | 63 +++++ .../Engine/Listener/DiscoverListener.php | 91 ++++++++ .../Engine/Listener/FailListener.php | 82 +++++++ .../Engine/Listener/RetrySubscriber.php | 137 +++++++++++ src/Subscription/Engine/MessageProcessor.php | 125 ++++++++++ .../Engine/NextSubscriptionEngine.php | 212 +++++++++++++++++ src/Subscription/Engine/ProcessedResult.php | 5 +- src/Subscription/Engine/Result.php | 2 +- tests/bootstrap.php | 5 + 36 files changed, 2341 insertions(+), 4 deletions(-) create mode 100644 src/Subscription/Engine/Command/Boot.php create mode 100644 src/Subscription/Engine/Command/Command.php create mode 100644 src/Subscription/Engine/Command/Pause.php create mode 100644 src/Subscription/Engine/Command/Reactivate.php create mode 100644 src/Subscription/Engine/Command/Refresh.php create mode 100644 src/Subscription/Engine/Command/Remove.php create mode 100644 src/Subscription/Engine/Command/Run.php create mode 100644 src/Subscription/Engine/Command/Setup.php create mode 100644 src/Subscription/Engine/Command/Teardown.php create mode 100644 src/Subscription/Engine/Event/OnCommand.php create mode 100644 src/Subscription/Engine/Event/OnHandleMessage.php create mode 100644 src/Subscription/Engine/Event/OnHandleMessageError.php create mode 100644 src/Subscription/Engine/Event/OnHandleMessageSuccess.php create mode 100644 src/Subscription/Engine/Event/OnProcessingFinished.php create mode 100644 src/Subscription/Engine/Event/OnResult.php create mode 100644 src/Subscription/Engine/Handler/BootHandler.php create mode 100644 src/Subscription/Engine/Handler/Handler.php create mode 100644 src/Subscription/Engine/Handler/PauseHandler.php create mode 100644 src/Subscription/Engine/Handler/ReactivateHandler.php create mode 100644 src/Subscription/Engine/Handler/RefreshHandler.php create mode 100644 src/Subscription/Engine/Handler/RemoveHandler.php create mode 100644 src/Subscription/Engine/Handler/RunHandler.php create mode 100644 src/Subscription/Engine/Handler/SetupHandler.php create mode 100644 src/Subscription/Engine/Handler/TeardownHandler.php create mode 100644 src/Subscription/Engine/LegacyWrapperSubscriptionEngine.php create mode 100644 src/Subscription/Engine/Listener/BatchSubscriber.php create mode 100644 src/Subscription/Engine/Listener/DetachListener.php create mode 100644 src/Subscription/Engine/Listener/DiscoverListener.php create mode 100644 src/Subscription/Engine/Listener/FailListener.php create mode 100644 src/Subscription/Engine/Listener/RetrySubscriber.php create mode 100644 src/Subscription/Engine/MessageProcessor.php create mode 100644 src/Subscription/Engine/NextSubscriptionEngine.php diff --git a/docker-compose.yaml b/docker-compose.yaml index 38249c8b5..2174d030d 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,4 +1,15 @@ +# docker run --rm -it --volume $PWD:/app --net="host" -w /app ghcr.io/patchlevel/php:8.5 services: + php: + image: ghcr.io/patchlevel/php:8.5 + volumes: + - .:/app + working_dir: /app + network_mode: host + tty: true + stdin_open: true + command: sleep infinity + postgres: image: postgres:alpine environment: @@ -13,4 +24,4 @@ services: - MYSQL_ALLOW_EMPTY_PASSWORD="yes" - MYSQL_DATABASE=eventstore ports: - - 3306:3306 \ No newline at end of file + - 3306:3306 diff --git a/src/Subscription/Engine/Command/Boot.php b/src/Subscription/Engine/Command/Boot.php new file mode 100644 index 000000000..acf6be1cf --- /dev/null +++ b/src/Subscription/Engine/Command/Boot.php @@ -0,0 +1,21 @@ +|null $ids + * @param list|null $groups + * @param positive-int|null $limit + */ + public function __construct( + array|null $ids = null, + array|null $groups = null, + public readonly int|null $limit = null, + ) { + parent::__construct($ids, $groups); + } +} diff --git a/src/Subscription/Engine/Command/Command.php b/src/Subscription/Engine/Command/Command.php new file mode 100644 index 000000000..032742561 --- /dev/null +++ b/src/Subscription/Engine/Command/Command.php @@ -0,0 +1,18 @@ +|null $ids + * @param list|null $groups + */ + public function __construct( + public readonly array|null $ids = null, + public readonly array|null $groups = null, + ) { + } +} diff --git a/src/Subscription/Engine/Command/Pause.php b/src/Subscription/Engine/Command/Pause.php new file mode 100644 index 000000000..28780d532 --- /dev/null +++ b/src/Subscription/Engine/Command/Pause.php @@ -0,0 +1,9 @@ +|null $ids + * @param list|null $groups + * @param positive-int|null $limit + */ + public function __construct( + array|null $ids = null, + array|null $groups = null, + public readonly int|null $limit = null, + ) { + parent::__construct($ids, $groups); + } +} diff --git a/src/Subscription/Engine/Command/Setup.php b/src/Subscription/Engine/Command/Setup.php new file mode 100644 index 000000000..83ef8617f --- /dev/null +++ b/src/Subscription/Engine/Command/Setup.php @@ -0,0 +1,20 @@ +|null $ids + * @param list|null $groups + */ + public function __construct( + array|null $ids = null, + array|null $groups = null, + public readonly bool $skipBooting = false, + ) { + parent::__construct($ids, $groups); + } +} diff --git a/src/Subscription/Engine/Command/Teardown.php b/src/Subscription/Engine/Command/Teardown.php new file mode 100644 index 000000000..e144b032c --- /dev/null +++ b/src/Subscription/Engine/Command/Teardown.php @@ -0,0 +1,9 @@ + + */ +final class BootHandler implements Handler +{ + public function __construct( + private readonly MessageLoader $messageLoader, + private readonly SubscriptionManager $subscriptionManager, + private readonly SubscriberAccessorRepository $subscriberRepository, + private readonly MessageProcessor $messageProcessor, + private readonly EventDispatcherInterface $eventDispatcher, + private readonly LoggerInterface|null $logger = null, + ) { + } + + public function __invoke(Command $command): ProcessedResult + { + $this->logger?->info( + 'Subscription Engine: Start booting.', + ); + + return $this->subscriptionManager->findForUpdate( + new SubscriptionCriteria( + ids: $command->ids, + groups: $command->groups, + status: [Status::Booting], + ), + function (SubscriptionCollection $subscriptions) use ($command): ProcessedResult { + if (count($subscriptions) === 0) { + $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); + + return new ProcessedResult(0, true); + } + + foreach ($subscriptions as $subscription) { + $subscriber = $this->subscriberRepository->get($subscription->id()); + + if ($subscriber) { + continue; + } + + $this->logger?->debug( + sprintf( + 'Subscription Engine: Subscriber for "%s" not found, skipped.', + $subscription->id(), + ), + ); + + $subscriptions->remove($subscription); + } + + $startIndex = $subscriptions->lowestPosition(); + + $this->logger?->debug( + sprintf( + 'Subscription Engine: Event stream is processed for booting from position %s.', + $startIndex, + ), + ); + + /** @var list $errors */ + $errors = []; + $stream = null; + $messageCounter = 0; + $lastIndex = null; + + try { + $stream = $this->messageLoader->load($startIndex, $subscriptions->toArray()); + + foreach ($stream as $index => $message) { + $messageCounter++; + $lastIndex = $index; + + foreach ($subscriptions as $subscription) { + if ($subscription->position() >= $index) { + $this->logger?->debug( + sprintf( + 'Subscription Engine: Subscription "%s" is farther than the current position (%d > %d), continue booting.', + $subscription->id(), + $subscription->position(), + $index, + ), + ); + + continue; + } + + $error = $this->messageProcessor->process($index, $message, $subscription); + + if (!$error) { + continue; + } + + $errors[] = $error; + + $subscriptions->remove($subscription); + + if (count($subscriptions) === 0) { + $this->logger?->info( + 'Subscription Engine: No subscriptions in booting status, finish booting.', + ); + + break 2; + } + } + + $this->logger?->debug( + sprintf( + 'Subscription Engine: Current event stream position for booting: %s', + $index, + ), + ); + + if ($command->limit !== null && $messageCounter >= $command->limit) { + $this->logger?->info( + sprintf( + 'Subscription Engine: Message limit (%d) reached, finish booting.', + $command->limit, + ), + ); + + $this->eventDispatcher->dispatch( + new OnProcessingFinished( + $command, + OnProcessingFinished::REASON_LIMIT_REACHED, + $messageCounter, + ), + ); + + return new ProcessedResult( + $messageCounter, + false, + $errors, + ); + } + } + + $this->eventDispatcher->dispatch( + new OnProcessingFinished( + $command, + OnProcessingFinished::REASON_STREAM_ENDED, + $messageCounter, + ), + ); + } finally { + $stream?->close(); + + if ($lastIndex !== null && $messageCounter > 0) { + foreach ($subscriptions as $subscription) { + $this->subscriptionManager->update($subscription); + } + } + } + + $this->logger?->debug('Subscription Engine: End of stream for booting has been reached.'); + + foreach ($subscriptions as $subscription) { + if ($subscription->status() !== Status::Booting) { + continue; + } + + if ($subscription->runMode() === RunMode::Once) { + $subscription->finished(); + $this->subscriptionManager->update($subscription); + + $this->logger?->info(sprintf( + 'Subscription Engine: Subscription "%s" run only once and has been set to finished.', + $subscription->id(), + )); + + continue; + } + + $subscription->active(); + $this->subscriptionManager->update($subscription); + + $this->logger?->info(sprintf( + 'Subscription Engine: Subscription "%s" has been set to active after booting.', + $subscription->id(), + )); + } + + $this->logger?->info('Subscription Engine: Finish booting.'); + + return new ProcessedResult( + $messageCounter, + true, + $errors, + ); + }, + ); + } +} diff --git a/src/Subscription/Engine/Handler/Handler.php b/src/Subscription/Engine/Handler/Handler.php new file mode 100644 index 000000000..cab82d44a --- /dev/null +++ b/src/Subscription/Engine/Handler/Handler.php @@ -0,0 +1,16 @@ + + */ +final class PauseHandler implements Handler +{ + public function __construct( + private readonly SubscriptionManager $subscriptionManager, + private readonly LoggerInterface|null $logger = null, + ) { + } + + public function __invoke(Command $command): Result + { + return $this->subscriptionManager->findForUpdate( + new SubscriptionCriteria( + ids: $command->ids, + groups: $command->groups, + status: [ + Status::Active, + Status::Booting, + Status::Error, + ], + ), + function (SubscriptionCollection $subscriptions): Result { + /** @var Subscription $subscription */ + foreach ($subscriptions as $subscription) { + $subscription->pause(); + $this->subscriptionManager->update($subscription); + + $this->logger?->info(sprintf( + 'Subscription Engine: Subscription "%s" is paused.', + $subscription->id(), + )); + } + + return new Result(); + }, + ); + } +} diff --git a/src/Subscription/Engine/Handler/ReactivateHandler.php b/src/Subscription/Engine/Handler/ReactivateHandler.php new file mode 100644 index 000000000..170ae3549 --- /dev/null +++ b/src/Subscription/Engine/Handler/ReactivateHandler.php @@ -0,0 +1,93 @@ + + */ +final class ReactivateHandler implements Handler +{ + public function __construct( + private readonly SubscriptionManager $subscriptionManager, + private readonly SubscriberAccessorRepository $subscriberRepository, + private readonly LoggerInterface|null $logger = null, + ) { + } + + public function __invoke(Command $command): Result + { + return $this->subscriptionManager->findForUpdate( + new SubscriptionCriteria( + ids: $command->ids, + groups: $command->groups, + status: [ + Status::Error, + Status::Failed, + Status::Detached, + Status::Paused, + Status::Finished, + ], + ), + function (SubscriptionCollection $subscriptions): Result { + foreach ($subscriptions as $subscription) { + $subscriber = $this->subscriberRepository->get($subscription->id()); + + if (!$subscriber) { + $this->logger?->debug( + sprintf( + 'Subscription Engine: Subscriber for "%s" not found, skipped.', + $subscription->id(), + ), + ); + + continue; + } + + $error = $subscription->subscriptionError(); + + if ($error) { + $subscription->doRetry(); + $subscription->resetRetry(); + + $this->subscriptionManager->update($subscription); + + $this->logger?->info(sprintf( + 'Subscription Engine: Subscriber "%s" for "%s" is reactivated.', + $subscriber::class, + $subscription->id(), + )); + + continue; + } + + $subscription->active(); + $this->subscriptionManager->update($subscription); + + $this->logger?->info(sprintf( + 'Subscription Engine: Subscriber "%s" for "%s" is reactivated.', + $subscriber::class, + $subscription->id(), + )); + } + + return new Result(); + }, + ); + } +} diff --git a/src/Subscription/Engine/Handler/RefreshHandler.php b/src/Subscription/Engine/Handler/RefreshHandler.php new file mode 100644 index 000000000..5931a5524 --- /dev/null +++ b/src/Subscription/Engine/Handler/RefreshHandler.php @@ -0,0 +1,116 @@ + + */ +final class RefreshHandler implements Handler +{ + public function __construct( + private readonly SubscriptionManager $subscriptionManager, + private readonly SubscriberAccessorRepository $subscriberRepository, + private readonly LoggerInterface|null $logger = null, + ) { + } + + public function __invoke(Command $command): Result + { + $subscriptions = $this->subscriptionManager->find(new SubscriptionCriteria( + ids: $command->ids, + groups: $command->groups, + )); + + foreach ($subscriptions as $subscription) { + $subscriber = $this->subscriberRepository->get($subscription->id()); + + if (!$subscriber) { + continue; + } + + $changed = false; + + if ($subscription->runMode() !== $subscriber->metadata()->runMode) { + $changed = true; + $oldRunMode = $subscription->runMode(); + $subscription->changeRunMode($subscriber->metadata()->runMode); + + $this->logger?->info( + sprintf( + 'Subscription Engine: Subscription "%s" run mode changed from "%s" to "%s".', + $subscription->id(), + $oldRunMode->value, + $subscription->runMode()->value, + ), + ); + } + + if ($subscription->group() !== $subscriber->metadata()->group) { + $changed = true; + $oldGroup = $subscription->group(); + $subscription->changeGroup($subscriber->metadata()->group); + + $this->logger?->info( + sprintf( + 'Subscription Engine: Subscription "%s" group changed from "%s" to "%s".', + $subscription->id(), + $oldGroup, + $subscription->group(), + ), + ); + } + + $cleanupTasks = $this->cleanupTasks($subscriber); + + if ($subscription->cleanupTasks() !== $cleanupTasks) { + $changed = true; + $subscription->replaceCleanupTasks($cleanupTasks); + + $this->logger?->info( + sprintf( + 'Subscription Engine: Subscription "%s" cleanup tasks changed.', + $subscription->id(), + ), + ); + } + + if (!$changed) { + continue; + } + + $this->subscriptionManager->update($subscription); + } + + $this->subscriptionManager->flush(); + + return new Result(); + } + + /** @return list|null */ + private function cleanupTasks(MetadataSubscriberAccessor $subscriber): array|null + { + $method = $subscriber->cleanupMethod(); + + if (!$method) { + return null; + } + + return array_values([...$method()]); + } +} diff --git a/src/Subscription/Engine/Handler/RemoveHandler.php b/src/Subscription/Engine/Handler/RemoveHandler.php new file mode 100644 index 000000000..f83e54734 --- /dev/null +++ b/src/Subscription/Engine/Handler/RemoveHandler.php @@ -0,0 +1,127 @@ + + */ +final class RemoveHandler implements Handler +{ + public function __construct( + private readonly SubscriptionManager $subscriptionManager, + private readonly SubscriberAccessorRepository $subscriberRepository, + private readonly Cleaner|null $cleaner = null, + private readonly LoggerInterface|null $logger = null, + ) { + } + + public function __invoke(Command $command): Result + { + return $this->subscriptionManager->findForUpdate( + new SubscriptionCriteria( + ids: $command->ids, + groups: $command->groups, + ), + function (SubscriptionCollection $subscriptions): Result { + /** @var list $errors */ + $errors = []; + + foreach ($subscriptions as $subscription) { + if ($subscription->isNew()) { + $this->subscriptionManager->remove($subscription); + + $this->logger?->info( + sprintf( + 'Subscription Engine: Subscription "%s" removed.', + $subscription->id(), + ), + ); + + continue; + } + + if ($subscription->hasCleanupTasks()) { + $error = $this->cleanup($subscription, true); + + if ($error) { + $errors[] = $error; + } + + continue; + } + + $subscriber = $this->subscriberRepository->get($subscription->id()); + + if (!$subscriber) { + $this->subscriptionManager->remove($subscription); + + $this->logger?->info( + sprintf( + 'Subscription Engine: Subscription "%s" removed without a suitable subscriber.', + $subscription->id(), + ), + ); + + continue; + } + + $teardownMethod = $subscriber->teardownMethod(); + + if (!$teardownMethod) { + $this->subscriptionManager->remove($subscription); + + $this->logger?->info( + sprintf('Subscription Engine: Subscription "%s" removed.', $subscription->id()), + ); + + continue; + } + + try { + $teardownMethod(); + } catch (Throwable $e) { + $this->logger?->error( + sprintf( + 'Subscription Engine: Subscriber "%s" teardown method could not be executed: %s', + $subscriber::class, + $e->getMessage(), + ), + ); + + $errors[] = new Error( + $subscription->id(), + $e->getMessage(), + $e, + ); + } + + $this->subscriptionManager->remove($subscription); + + $this->logger?->info( + sprintf('Subscription Engine: Subscription "%s" removed.', $subscription->id()), + ); + } + + return new Result($errors); + }, + ); + } +} diff --git a/src/Subscription/Engine/Handler/RunHandler.php b/src/Subscription/Engine/Handler/RunHandler.php new file mode 100644 index 000000000..bde40408a --- /dev/null +++ b/src/Subscription/Engine/Handler/RunHandler.php @@ -0,0 +1,180 @@ + + */ +final class RunHandler implements Handler +{ + public function __construct( + private readonly MessageLoader $messageLoader, + private readonly SubscriptionManager $subscriptionManager, + private readonly SubscriberAccessorRepository $subscriberRepository, + private readonly MessageProcessor $messageProcessor, + private readonly EventDispatcherInterface $eventDispatcher, + private readonly LoggerInterface|null $logger = null, + ) { + } + + public function __invoke(Command $command): ProcessedResult + { + return $this->subscriptionManager->findForUpdate( + new SubscriptionCriteria( + ids: $command->ids, + groups: $command->groups, + status: [Status::Active], + ), + function (SubscriptionCollection $subscriptions) use ($command): ProcessedResult { + if (count($subscriptions) === 0) { + $this->logger?->info('Subscription Engine: No subscriptions to process, finish processing.'); + + return new ProcessedResult(0, true); + } + + $startIndex = $subscriptions->lowestPosition(); + + $this->logger?->debug( + sprintf( + 'Subscription Engine: Event stream is processed from position %d.', + $startIndex, + ), + ); + + /** @var list $errors */ + $errors = []; + $stream = null; + $messageCounter = 0; + $lastIndex = null; + + try { + $stream = $this->messageLoader->load($startIndex, $subscriptions->toArray()); + + foreach ($stream as $index => $message) { + $messageCounter++; + $lastIndex = $index; + + foreach ($subscriptions as $subscription) { + if ($subscription->position() >= $index) { + $this->logger?->debug( + sprintf( + 'Subscription Engine: Subscription "%s" is farther than the current position (%d > %d), continue processing.', + $subscription->id(), + $subscription->position(), + $index, + ), + ); + + continue; + } + + $error = $this->messageProcessor->process($index, $message, $subscription); + + if (!$error) { + continue; + } + + $errors[] = $error; + + $subscriptions->remove($subscription); + + if (count($subscriptions) === 0) { + $this->logger?->info( + 'Subscription Engine: No subscriptions in booting status, finish booting.', + ); + + break 2; + } + } + + $this->logger?->debug(sprintf( + 'Subscription Engine: Current event stream position: %s', + $index, + )); + + if ($command->limit !== null && $messageCounter >= $command->limit) { + $this->logger?->info( + sprintf( + 'Subscription Engine: Message limit (%d) reached, finish processing.', + $command->limit, + ), + ); + + $this->eventDispatcher->dispatch( + new OnProcessingFinished( + $command, + OnProcessingFinished::REASON_LIMIT_REACHED, + $messageCounter, + ), + ); + + return new ProcessedResult($messageCounter, false, $errors); + } + } + + $this->eventDispatcher->dispatch( + new OnProcessingFinished( + $command, + OnProcessingFinished::REASON_STREAM_ENDED, + $messageCounter, + ), + ); + } finally { + $stream?->close(); + + if ($lastIndex !== null && $messageCounter > 0) { + foreach ($subscriptions as $subscription) { + $this->subscriptionManager->update($subscription); + } + } + } + + foreach ($subscriptions as $subscription) { + if ($subscription->runMode() !== RunMode::Once) { + continue; + } + + $subscription->finished(); + $this->subscriptionManager->update($subscription); + + $this->logger?->info(sprintf( + 'Subscription Engine: Subscription "%s" run only once and has been set to finished.', + $subscription->id(), + )); + } + + $this->logger?->info( + sprintf( + 'Subscription Engine: End of stream on position "%d" has been reached, finish processing.', + $lastIndex, + ), + ); + + return new ProcessedResult($messageCounter, true, $errors); + }, + ); + } +} diff --git a/src/Subscription/Engine/Handler/SetupHandler.php b/src/Subscription/Engine/Handler/SetupHandler.php new file mode 100644 index 000000000..f931d850f --- /dev/null +++ b/src/Subscription/Engine/Handler/SetupHandler.php @@ -0,0 +1,133 @@ + + */ +final class SetupHandler implements Handler +{ + public function __construct( + private readonly MessageLoader $messageLoader, + private readonly SubscriptionManager $subscriptionManager, + private readonly SubscriberAccessorRepository $subscriberRepository, + private readonly LoggerInterface|null $logger = null, + ) { + } + + public function __invoke(Command $command): Result + { + $this->logger?->info( + 'Subscription Engine: Start to setup.', + ); + + return $this->subscriptionManager->findForUpdate( + new SubscriptionCriteria( + ids: $command->ids, + groups: $command->groups, + status: [Status::New], + ), + function (SubscriptionCollection $subscriptions) use ($command): Result { + if (count($subscriptions) === 0) { + $this->logger?->info('Subscription Engine: No subscriptions to setup, finish setup.'); + + return new Result(); + } + + /** @var list $errors */ + $errors = []; + + $latestIndex = $this->messageLoader->lastIndex(); + + foreach ($subscriptions as $subscription) { + $subscriber = $this->subscriberRepository->get($subscription->id()); + + if (!$subscriber) { + throw SubscriberNotFound::forSubscriptionId($subscription->id()); + } + + $setupMethod = $subscriber->setupMethod(); + + if (!$setupMethod) { + if ($subscription->runMode() === RunMode::FromNow) { + $subscription->changePosition($latestIndex); + $subscription->active(); + } else { + $command->skipBooting ? $subscription->active() : $subscription->booting(); + } + + $this->subscriptionManager->update($subscription); + + $this->logger?->debug(sprintf( + 'Subscription Engine: Subscriber "%s" for "%s" has no setup method, set to %s.', + $subscriber::class, + $subscription->id(), + $subscription->runMode() === RunMode::FromNow || $command->skipBooting ? 'active' : 'booting', + )); + + continue; + } + + try { + $setupMethod(); + + if ($subscription->runMode() === RunMode::FromNow) { + $subscription->changePosition($latestIndex); + $subscription->active(); + } else { + $command->skipBooting ? $subscription->active() : $subscription->booting(); + } + + $this->subscriptionManager->update($subscription); + + $this->logger?->debug(sprintf( + 'Subscription Engine: For Subscriber "%s" for "%s" the setup method has been executed, set to %s.', + $subscriber::class, + $subscription->id(), + $subscription->runMode() === RunMode::FromNow || $command->skipBooting ? 'active' : 'booting', + )); + } catch (Throwable $e) { + $this->logger?->error(sprintf( + 'Subscription Engine: Subscriber "%s" for "%s" has an error in the setup method: %s', + $subscriber::class, + $subscription->id(), + $e->getMessage(), + )); + + $this->handleError($subscription, $e); + + $errors[] = new Error( + $subscription->id(), + $e->getMessage(), + $e, + ); + } + } + + return new Result($errors); + }, + ); + } +} diff --git a/src/Subscription/Engine/Handler/TeardownHandler.php b/src/Subscription/Engine/Handler/TeardownHandler.php new file mode 100644 index 000000000..d7a40ef93 --- /dev/null +++ b/src/Subscription/Engine/Handler/TeardownHandler.php @@ -0,0 +1,130 @@ + + */ +final class TeardownHandler implements Handler +{ + public function __construct( + private readonly SubscriptionManager $subscriptionManager, + private readonly SubscriberAccessorRepository $subscriberRepository, + private readonly LoggerInterface|null $logger = null, + ) { + } + + public function __invoke(Command $command): Result + { + $this->logger?->info('Subscription Engine: Start teardown detached subscriptions.'); + + return $this->subscriptionManager->findForUpdate( + new SubscriptionCriteria( + ids: $command->ids, + groups: $command->groups, + status: [Status::Detached], + ), + function (SubscriptionCollection $subscriptions): Result { + /** @var list $errors */ + $errors = []; + + foreach ($subscriptions as $subscription) { + if ($subscription->hasCleanupTasks()) { + $error = $this->cleanup($subscription); + + if ($error) { + $errors[] = $error; + } + + continue; + } + + $subscriber = $this->subscriberRepository->get($subscription->id()); + + if (!$subscriber) { + $this->logger?->warning( + sprintf( + 'Subscription Engine: Subscriber for "%s" to teardown or cleanup not found, skipped.', + $subscription->id(), + ), + ); + + continue; + } + + $teardownMethod = $subscriber->teardownMethod(); + + if (!$teardownMethod) { + $this->subscriptionManager->remove($subscription); + + $this->logger?->info( + sprintf( + 'Subscription Engine: Subscriber "%s" for "%s" has no teardown method and was immediately removed.', + $subscriber::class, + $subscription->id(), + ), + ); + + continue; + } + + try { + $teardownMethod(); + + $this->logger?->debug(sprintf( + 'Subscription Engine: For Subscriber "%s" for "%s" the teardown method has been executed and is now prepared to be removed.', + $subscriber::class, + $subscription->id(), + )); + } catch (Throwable $e) { + $this->logger?->error( + sprintf( + 'Subscription Engine: Subscription "%s" for "%s" has an error in the teardown method, skipped: %s', + $subscriber::class, + $subscription->id(), + $e->getMessage(), + ), + ); + + $errors[] = new Error( + $subscription->id(), + $e->getMessage(), + $e, + ); + + continue; + } + + $this->subscriptionManager->remove($subscription); + + $this->logger?->info( + sprintf( + 'Subscription Engine: Subscription "%s" removed.', + $subscription->id(), + ), + ); + } + + $this->logger?->info('Subscription Engine: Finish teardown.'); + + return new Result($errors); + }, + ); + } +} diff --git a/src/Subscription/Engine/LegacyWrapperSubscriptionEngine.php b/src/Subscription/Engine/LegacyWrapperSubscriptionEngine.php new file mode 100644 index 000000000..b9d2a3793 --- /dev/null +++ b/src/Subscription/Engine/LegacyWrapperSubscriptionEngine.php @@ -0,0 +1,135 @@ +engine = new NextSubscriptionEngine( + $messageLoader, + $subscriptionStore, + $subscriberRepository, + $retryStrategyRepository, + $logger, + $cleaner, + ); + } + + public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $skipBooting = false): Result + { + $criteria ??= new SubscriptionEngineCriteria(); + + return $this->engine->run(new Setup( + $criteria->ids, + $criteria->groups, + )); + } + + public function boot( + SubscriptionEngineCriteria|null $criteria = null, + int|null $limit = null, + ): ProcessedResult { + $criteria ??= new SubscriptionEngineCriteria(); + + return $this->engine->run(new Boot( + $criteria->ids, + $criteria->groups, + $limit, + )); + } + + public function run( + SubscriptionEngineCriteria|null $criteria = null, + int|null $limit = null, + ): ProcessedResult { + $criteria ??= new SubscriptionEngineCriteria(); + + return $this->engine->run(new Run( + $criteria->ids, + $criteria->groups, + $limit, + )); + } + + public function teardown(SubscriptionEngineCriteria|null $criteria = null): Result + { + $criteria ??= new SubscriptionEngineCriteria(); + + return $this->engine->run(new Teardown( + $criteria->ids, + $criteria->groups, + )); + } + + public function remove(SubscriptionEngineCriteria|null $criteria = null): Result + { + $criteria ??= new SubscriptionEngineCriteria(); + + return $this->engine->run(new Remove( + $criteria->ids, + $criteria->groups, + )); + } + + public function reactivate(SubscriptionEngineCriteria|null $criteria = null): Result + { + $criteria ??= new SubscriptionEngineCriteria(); + + return $this->engine->run(new Reactivate( + $criteria->ids, + $criteria->groups, + )); + } + + public function pause(SubscriptionEngineCriteria|null $criteria = null): Result + { + $criteria ??= new SubscriptionEngineCriteria(); + + return $this->engine->run(new Pause( + $criteria->ids, + $criteria->groups, + )); + } + + public function refresh(SubscriptionEngineCriteria|null $criteria = null): Result + { + $criteria ??= new SubscriptionEngineCriteria(); + + return $this->engine->run(new Refresh( + $criteria->ids, + $criteria->groups, + )); + } + + /** @return list */ + public function subscriptions(SubscriptionEngineCriteria|null $criteria = null): array + { + return $this->engine->subscriptions($criteria); + } +} diff --git a/src/Subscription/Engine/Listener/BatchSubscriber.php b/src/Subscription/Engine/Listener/BatchSubscriber.php new file mode 100644 index 000000000..298c55804 --- /dev/null +++ b/src/Subscription/Engine/Listener/BatchSubscriber.php @@ -0,0 +1,164 @@ + */ + private array $batching = []; + + public function __construct( + private readonly SubscriberAccessorRepository $subscriberRepository, + private readonly LoggerInterface|null $logger = null, + ) { + } + + public function onCommand(OnCommand $event): void + { + $this->batching = []; + } + + public function onHandleMessage(OnHandleMessage $event): void + { + $subscriberId = $event->subscription->id(); + + if (isset($this->batching[$subscriberId])) { + return; + } + + $subscriber = $this->subscriberRepository->get($subscriberId); + + if (!$subscriber) { + return; + } + + $realSubscriber = $subscriber->subscriber(); + + if (!$realSubscriber instanceof BatchableSubscriber) { + return; + } + + $this->batching[$subscriberId] = $realSubscriber; + + $this->logger?->debug(sprintf( + 'Subscription Engine: Subscriber "%s" starts a new batch.', + $subscriberId, + )); + + try { + $realSubscriber->beginBatch(); + } catch (Throwable $e) { + $this->logger?->error(sprintf( + 'Subscription Engine: Subscriber "%s" has an error in the begin batch method: %s', + $subscriberId, + $e->getMessage(), + )); + + throw $e; + } + } + + public function onHandleMessageSuccess(OnHandleMessageSuccess $event): void + { + $subscriberId = $event->subscription->id(); + + if (!isset($this->batching[$subscriberId])) { + return; + } + + if (!$this->shouldCommitBatch($event->subscription)) { + $event->shouldChangePosition = false; + + return; + } + + $subscriber = $this->batching[$subscriberId]; + unset($this->batching[$subscriberId]); + + $this->logger?->debug(sprintf( + 'Subscription Engine: Subscriber "%s" commits the batch.', + $subscriberId, + )); + + try { + $subscriber->commitBatch(); + $event->shouldChangePosition = true; + } catch (Throwable $e) { + $this->logger?->error(sprintf( + 'Subscription Engine: Subscriber "%s" has an error in the commit batch method: %s', + $subscriberId, + $e->getMessage(), + )); + + throw $e; + } + } + + public function onResult(OnResult $event): void + { + } + + private function shouldCommitBatch(Subscription $subscription): bool + { + if (!isset($this->batching[$subscription->id()])) { + return false; + } + + return $this->batching[$subscription->id()]->forceCommit(); + } + + public function onError(OnHandleMessageError $event): void + { + $subscriptionId = $event->subscription->id(); + + if (!isset($this->batching[$subscriptionId])) { + return; + } + + $subscriber = $this->batching[$subscriptionId]; + + unset($this->batching[$subscriptionId]); + + $this->logger?->debug(sprintf( + 'Subscription Engine: Subscriber "%s" rollback the batch.', + $subscriptionId, + )); + + try { + $subscriber->rollbackBatch(); + } catch (Throwable $e) { + $this->logger?->error(sprintf( + 'Subscription Engine: Subscriber "%s" has an error in the rollback batch method: %s', + $subscriptionId, + $e->getMessage(), + )); + } + } + + public static function getSubscribedEvents(): array + { + return [ + OnCommand::class => 'onCommand', + OnHandleMessage::class => 'onHandleMessage', + OnHandleMessageSuccess::class => 'onHandleMessageSuccess', + OnHandleMessageError::class => 'onError', + ]; + } +} diff --git a/src/Subscription/Engine/Listener/DetachListener.php b/src/Subscription/Engine/Listener/DetachListener.php new file mode 100644 index 000000000..930db2c55 --- /dev/null +++ b/src/Subscription/Engine/Listener/DetachListener.php @@ -0,0 +1,63 @@ +command; + + if (!$command instanceof Run) { + return; + } + + $this->subscriptionManager->findForUpdate( + new SubscriptionCriteria( + ids: $command->ids, + groups: $command->groups, + status: [Status::Active, Status::Paused, Status::Finished], + ), + function (SubscriptionCollection $subscriptions): void { + foreach ($subscriptions as $subscription) { + $subscriber = $this->subscriberRepository->get($subscription->id()); + + if ($subscriber) { + continue; + } + + $subscription->detached(); + $this->subscriptionManager->update($subscription); + + $this->logger?->info( + sprintf( + 'Subscription Engine: Subscriber for "%s" not found and has been marked as detached.', + $subscription->id(), + ), + ); + } + }, + ); + } +} diff --git a/src/Subscription/Engine/Listener/DiscoverListener.php b/src/Subscription/Engine/Listener/DiscoverListener.php new file mode 100644 index 000000000..56f9d16d3 --- /dev/null +++ b/src/Subscription/Engine/Listener/DiscoverListener.php @@ -0,0 +1,91 @@ +command; + + // todo define when to discover + + $subscriptions = $this->subscriptionManager->find(new SubscriptionCriteria( + // ids: $command->ids, + // groups: $command->groups, + )); + + $latestIndex = null; + + foreach ($this->subscriberRepository->all() as $subscriber) { + foreach ($subscriptions as $subscription) { + if ($subscription->id() === $subscriber->metadata()->id) { + continue 2; + } + } + + $subscription = new Subscription( + $subscriber->metadata()->id, + $subscriber->metadata()->group, + $subscriber->metadata()->runMode, + cleanupTasks: $this->cleanupTasks($subscriber), + ); + + if ($subscriber->setupMethod() === null && $subscriber->metadata()->runMode === RunMode::FromNow) { + if ($latestIndex === null) { + $latestIndex = $this->messageLoader->lastIndex(); + } + + $subscription->changePosition($latestIndex); + $subscription->active(); + } + + $this->subscriptionManager->add($subscription); + + $this->logger?->info( + sprintf( + 'Subscription Engine: New Subscriber "%s" was found and added to the subscription store.', + $subscriber->metadata()->id, + ), + ); + } + + $this->subscriptionManager->flush(); + } + + /** @return list|null */ + private function cleanupTasks(MetadataSubscriberAccessor $subscriber): array|null + { + $method = $subscriber->cleanupMethod(); + + if (!$method) { + return null; + } + + return array_values([...$method()]); + } +} diff --git a/src/Subscription/Engine/Listener/FailListener.php b/src/Subscription/Engine/Listener/FailListener.php new file mode 100644 index 000000000..beff60b2b --- /dev/null +++ b/src/Subscription/Engine/Listener/FailListener.php @@ -0,0 +1,82 @@ +failed($throwable); + $this->subscriptionManager->update($subscription); + + return; + } + + $subscriber = $this->subscriber($subscription->id()); + + if (!$subscriber) { + $subscription->failed($throwable); + $this->subscriptionManager->update($subscription); + + return; + } + + if ($subscriber->subscriber() instanceof BatchableSubscriber) { + $subscription->failed($throwable); + $this->subscriptionManager->update($subscription); + + return; + } + + $failedMethod = $subscriber->failedMethod(); + + if (!$failedMethod) { + $subscription->failed($throwable); + $this->subscriptionManager->update($subscription); + + return; + } + + try { + $failedMethod($message, $throwable); + $subscription->changePosition($index); + $subscription->resetRetry(); + + $this->subscriptionManager->update($subscription); + } catch (Throwable $e) { + $this->logger?->error(sprintf( + 'Subscription Engine: Subscriber "%s" has an error in the failed method: %s', + $subscription->id(), + $e->getMessage(), + )); + + $subscription->failed($throwable); + $this->subscriptionManager->update($subscription); + } + } +} diff --git a/src/Subscription/Engine/Listener/RetrySubscriber.php b/src/Subscription/Engine/Listener/RetrySubscriber.php new file mode 100644 index 000000000..4c0c7400b --- /dev/null +++ b/src/Subscription/Engine/Listener/RetrySubscriber.php @@ -0,0 +1,137 @@ +command; + + $status = match ($command::class) { + Setup::class => Status::New, + Boot::class => Status::Booting, + Run::class => Status::Active, + default => null, + }; + + if ($status === null) { + return; + } + + $this->subscriptionManager->findForUpdate( + new SubscriptionCriteria( + ids: $command->ids, + groups: $command->groups, + status: [Status::Error], + ), + function (SubscriptionCollection $subscriptions) use ($status): void { + /** @var Subscription $subscription */ + foreach ($subscriptions as $subscription) { + $error = $subscription->subscriptionError(); + + if ($error === null) { + continue; + } + + if ($error->previousStatus !== $status) { + continue; + } + + if (!$this->retryStrategy($subscription)->shouldRetry($subscription)) { + continue; + } + + $subscription->doRetry(); + $this->subscriptionManager->update($subscription); + + $this->logger?->info( + sprintf( + 'Subscription Engine: Retry subscription "%s" (%d) and set back to %s.', + $subscription->id(), + $subscription->retryAttempt(), + $subscription->status()->value, + ), + ); + } + }, + ); + } + + public function onHandleMessageError(OnHandleMessageError $event): void + { + $retryStrategy = $this->retryStrategy($event->subscription); + + if (!$retryStrategy instanceof ConditionalRetryStrategy || $retryStrategy->canRetry($event->subscription)) { + $event->subscription->error($event->throwable); + $this->subscriptionManager->update($event->subscription); + + return; + } + + $event->transitionToFailed = true; + } + + public function onSuccessHandleMessage(OnHandleMessageSuccess $event): void + { + $event->subscription->resetRetry(); + } + + private function retryStrategy(Subscription $subscription): RetryStrategy + { + $subscriber = $this->subscriberRepository->get($subscription->id()); + + if (!$subscriber instanceof MetadataSubscriberAccessor) { + return $this->retryStrategyRepository->getDefaultRetryStrategy(); + } + + $retryStrategy = $subscriber->metadata()->retryStrategy; + + if ($retryStrategy === null) { + return $this->retryStrategyRepository->getDefaultRetryStrategy(); + } + + return $this->retryStrategyRepository->get($retryStrategy); + } + + public static function getSubscribedEvents(): array + { + return [ + OnCommand::class => ['onCommand', 16], + OnHandleMessageError::class => 'onHandleMessageError', + OnHandleMessageSuccess::class => 'onSuccessHandleMessage', + ]; + } +} diff --git a/src/Subscription/Engine/MessageProcessor.php b/src/Subscription/Engine/MessageProcessor.php new file mode 100644 index 000000000..6ef42739b --- /dev/null +++ b/src/Subscription/Engine/MessageProcessor.php @@ -0,0 +1,125 @@ +subscriberRepository->get($subscription->id()); + + if (!$subscriber) { + throw SubscriberNotFound::forSubscriptionId($subscription->id()); + } + + $subscribeMethods = $subscriber->subscribeMethods($message->event()::class); + + if ($subscribeMethods === []) { + if (!isset($this->batching[$subscription->id()])) { + $subscription->changePosition($index); + } + + $this->logger?->debug( + sprintf( + 'Subscription Engine: Subscriber "%s" for "%s" has no subscribe methods for "%s", continue.', + $subscriber::class, + $subscription->id(), + $message->event()::class, + ), + ); + + return null; + } + + try { + $event = new OnHandleMessage( + $subscription, + $message, + ); + + $this->eventDispatcher->dispatch($event); + } catch (Throwable $e) { + return new Error( + $subscription->id(), + $e->getMessage(), + $e, + ); + } + + try { + foreach ($subscribeMethods as $subscribeMethod) { + $subscribeMethod($message); + } + } catch (Throwable $e) { + $this->logger?->error( + sprintf( + 'Subscription Engine: Subscriber "%s" for "%s" could not process the event "%s": %s', + $subscriber::class, + $subscription->id(), + $message->event()::class, + $e->getMessage(), + ), + ); + + $this->eventDispatcher->dispatch( + new OnHandleMessageError( + $subscription, + $e, + $message, + $index, + ), + ); + + return new Error( + $subscription->id(), + $e->getMessage(), + $e, + ); + } + + $event = new OnHandleMessageSuccess( + $subscription, + $message, + $index, + ); + + $this->eventDispatcher->dispatch($event); + + if ($event->shouldChangePosition) { + $subscription->changePosition($index); + } + + $this->logger?->debug( + sprintf( + 'Subscription Engine: Subscriber "%s" for "%s" processed the event "%s".', + $subscriber::class, + $subscription->id(), + $message->event()::class, + ), + ); + + return null; + } +} diff --git a/src/Subscription/Engine/NextSubscriptionEngine.php b/src/Subscription/Engine/NextSubscriptionEngine.php new file mode 100644 index 000000000..ba08b86e5 --- /dev/null +++ b/src/Subscription/Engine/NextSubscriptionEngine.php @@ -0,0 +1,212 @@ +, Handler> */ + private readonly array $handlers; + + public function __construct( + private readonly MessageLoader $messageLoader, + SubscriptionStore $subscriptionStore, + private readonly SubscriberAccessorRepository $subscriberRepository, + RetryStrategyRepository|null $retryStrategyRepository = null, + private readonly LoggerInterface|null $logger = null, + private readonly Cleaner|null $cleaner = null, + private readonly EventDispatcherInterface $eventDispatcher = new EventDispatcher(), + ) { + $this->subscriptionManager = new SubscriptionManager($subscriptionStore); + + if ($retryStrategyRepository instanceof RetryStrategyRepository) { + $this->retryStrategyRepository = $retryStrategyRepository; + } else { + $this->retryStrategyRepository = new RetryStrategyRepository([ + RetryStrategyRepository::DEFAULT_STRATEGY_NAME => new ClockBasedRetryStrategy(), + 'no_retry' => new NoRetryStrategy(), + ]); + } + + $messageProcessor = new MessageProcessor( + $this->subscriberRepository, + $this->eventDispatcher, + $this->logger, + ); + + $this->handlers = [ + Boot::class => new BootHandler( + $this->messageLoader, + $this->subscriptionManager, + $this->subscriberRepository, + $messageProcessor, + $this->eventDispatcher, + $this->logger, + ), + Pause::class => new PauseHandler( + $this->subscriptionManager, + $this->logger, + ), + Reactivate::class => new ReactivateHandler( + $this->subscriptionManager, + $this->subscriberRepository, + $this->logger, + ), + Refresh::class => new RefreshHandler( + $this->subscriptionManager, + $this->subscriberRepository, + $this->logger, + ), + Remove::class => new RemoveHandler( + $this->subscriptionManager, + $this->subscriberRepository, + $this->cleaner, + $this->logger, + ), + Run::class => new RunHandler( + $this->messageLoader, + $this->subscriptionManager, + $this->subscriberRepository, + $messageProcessor, + $this->eventDispatcher, + $this->logger, + ), + Setup::class => new SetupHandler( + $this->messageLoader, + $this->subscriptionManager, + $this->subscriberRepository, + $this->logger, + ), + Teardown::class => new TeardownHandler( + $this->subscriptionManager, + $this->subscriberRepository, + $this->logger, + ), + ]; + + $this->eventDispatcher->addListener( + OnCommand::class, + new DiscoverListener( + $this->messageLoader, + $this->subscriptionManager, + $this->subscriberRepository, + $this->logger, + ), + 64, + ); + + $this->eventDispatcher->addSubscriber( + new RetrySubscriber( + $this->subscriptionManager, + $this->subscriberRepository, + $this->retryStrategyRepository, + $this->logger, + ), + ); + + $this->eventDispatcher->addSubscriber( + new BatchSubscriber( + $this->subscriberRepository, + $this->logger, + ), + ); + + $this->eventDispatcher->addListener( + OnCommand::class, + new DetachListener( + $this->subscriptionManager, + $this->subscriberRepository, + $this->logger, + ), + 32, + ); + } + + public function run(Command $command): Result + { + if ($this->processing) { + throw new AlreadyProcessing(); + } + + $this->processing = true; + + try { + $handler = $this->handlers[$command::class] ?? null; + + if ($handler === null) { + throw new InvalidArgumentException('No handler found for command: ' . $command::class); + } + + $event = new OnCommand($command); + $this->eventDispatcher->dispatch($event); + + $result = $handler($command); + + $event = new OnResult($command, $result); + $this->eventDispatcher->dispatch($event); + + return $result; + } finally { + $this->processing = false; + } + } + + /** @return list */ + public function subscriptions(SubscriptionEngineCriteria|null $criteria = null): array + { + $criteria ??= new SubscriptionEngineCriteria(); + + // todo dispatch event for discover + + return $this->subscriptionManager->find( + new SubscriptionCriteria( + ids: $criteria->ids, + groups: $criteria->groups, + ), + ); + } +} diff --git a/src/Subscription/Engine/ProcessedResult.php b/src/Subscription/Engine/ProcessedResult.php index d34f6c97d..be31a7f89 100644 --- a/src/Subscription/Engine/ProcessedResult.php +++ b/src/Subscription/Engine/ProcessedResult.php @@ -4,13 +4,14 @@ namespace Patchlevel\EventSourcing\Subscription\Engine; -final class ProcessedResult +final class ProcessedResult extends Result { /** @param list $errors */ public function __construct( public readonly int $processedMessages, public readonly bool $finished = false, - public readonly array $errors = [], + array $errors = [], ) { + parent::__construct($errors); } } diff --git a/src/Subscription/Engine/Result.php b/src/Subscription/Engine/Result.php index d644bb17d..3b0207337 100644 --- a/src/Subscription/Engine/Result.php +++ b/src/Subscription/Engine/Result.php @@ -4,7 +4,7 @@ namespace Patchlevel\EventSourcing\Subscription\Engine; -final class Result +class Result { /** @param list $errors */ public function __construct( diff --git a/tests/bootstrap.php b/tests/bootstrap.php index 06fdcb453..72d9b9bca 100644 --- a/tests/bootstrap.php +++ b/tests/bootstrap.php @@ -2,4 +2,9 @@ declare(strict_types=1); +use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Engine\LegacyWrapperSubscriptionEngine; + require __DIR__ . '/../vendor/autoload.php'; + +class_alias(LegacyWrapperSubscriptionEngine::class, DefaultSubscriptionEngine::class);