From 4e76339535941ddee5f334f4455fe7206acf855c Mon Sep 17 00:00:00 2001 From: David Badura Date: Wed, 22 Apr 2026 11:47:18 +0200 Subject: [PATCH] refactor stream --- phpstan-baseline.neon | 72 +-- src/Console/Command/StoreMigrateCommand.php | 30 +- src/Console/InputHelper.php | 20 + src/Message/Pipe.php | 59 --- src/Message/Stream.php | 206 +++++++++ src/{Store => Message}/StreamClosed.php | 2 +- src/Message/StreamNotRewindable.php | 15 + src/Repository/DefaultRepository.php | 2 +- src/Store/AppendStore.php | 1 + src/Store/ArrayStream.php | 133 ------ src/Store/InMemoryStore.php | 5 +- src/Store/ReadOnlyStore.php | 1 + src/Store/Store.php | 1 + src/Store/Stream.php | 37 -- src/Store/StreamDoctrineDbalStore.php | 56 ++- src/Store/StreamDoctrineDbalStoreStream.php | 163 ------- src/Store/TaggableDoctrineDbalStore.php | 82 +++- src/Store/TaggableDoctrineDbalStoreStream.php | 179 ------- .../Engine/DefaultSubscriptionEngine.php | 32 +- .../EventFilteredStoreMessageLoader.php | 10 +- .../Engine/GapResolverStoreMessageLoader.php | 34 +- src/Subscription/Engine/GeneratorStream.php | 117 ----- src/Subscription/Engine/MessageLoader.php | 4 +- .../Engine/StoreMessageLoader.php | 12 +- .../Engine/SubscriptionCollection.php | 16 +- src/Subscription/Lookup/Lookup.php | 2 +- .../Store/DoctrineSubscriptionStore.php | 4 +- src/Subscription/Subscription.php | 4 +- .../BasicIntegrationTest.php | 8 +- .../Store/StreamDoctrineDbalStoreTest.php | 2 + .../Store/TaggableDoctrineDbalStoreTest.php | 2 +- tests/PhpunitHelper.php | 2 +- .../Command/ShowAggregateCommandTest.php | 8 +- .../Command/StoreMigrateCommandTest.php | 10 +- .../StoreDecisionModelBuilderTest.php | 8 +- .../Message/{PipeTest.php => StreamTest.php} | 112 ++++- .../Unit/Repository/DefaultRepositoryTest.php | 22 +- tests/Unit/Store/ArrayStreamTest.php | 187 -------- tests/Unit/Store/InMemoryStoreTest.php | 66 +-- tests/Unit/Store/ReadOnlyStoreTest.php | 9 +- .../Store/StreamDoctrineDbalStoreTest.php | 2 +- .../Store/StreamDoctrineDbalStreamTest.php | 437 ------------------ .../Engine/DefaultSubscriptionEngineTest.php | 88 ++-- .../GapResolverStoreMessageLoaderTest.php | 23 +- .../Engine/SubscriptionCollectionTest.php | 16 +- tests/Unit/Subscription/Lookup/LookupTest.php | 18 +- 46 files changed, 665 insertions(+), 1654 deletions(-) delete mode 100644 src/Message/Pipe.php create mode 100644 src/Message/Stream.php rename src/{Store => Message}/StreamClosed.php (83%) create mode 100644 src/Message/StreamNotRewindable.php delete mode 100644 src/Store/ArrayStream.php delete mode 100644 src/Store/Stream.php delete mode 100644 src/Store/StreamDoctrineDbalStoreStream.php delete mode 100644 src/Store/TaggableDoctrineDbalStoreStream.php delete mode 100644 src/Subscription/Engine/GeneratorStream.php rename tests/Unit/Message/{PipeTest.php => StreamTest.php} (61%) delete mode 100644 tests/Unit/Store/ArrayStreamTest.php delete mode 100644 tests/Unit/Store/StreamDoctrineDbalStreamTest.php diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon index b1e9278c9..a4a436217 100644 --- a/phpstan-baseline.neon +++ b/phpstan-baseline.neon @@ -54,6 +54,12 @@ parameters: count: 1 path: src/Message/Serializer/DefaultHeadersSerializer.php + - + message: '#^Property Patchlevel\\EventSourcing\\Message\\Stream\:\:\$iterator \(Iterator\\|null\) does not accept IteratorIterator\\>\.$#' + identifier: assign.propertyType + count: 1 + path: src/Message/Stream.php + - message: '#^Property Patchlevel\\EventSourcing\\Serializer\\Normalizer\\IdNormalizer\:\:\$identifierClass \(class\-string\\|null\) does not accept string\.$#' identifier: assign.propertyType @@ -66,24 +72,6 @@ parameters: count: 1 path: src/Snapshot/DefaultSnapshotStore.php - - - message: '#^Method Patchlevel\\EventSourcing\\Store\\ArrayStream\:\:current\(\) never returns null so it can be removed from the return type\.$#' - identifier: return.unusedType - count: 1 - path: src/Store/ArrayStream.php - - - - message: '#^Property Patchlevel\\EventSourcing\\Store\\ArrayStream\:\:\$index \(int\<1, max\>\|null\) does not accept int\<0, max\>\.$#' - identifier: assign.propertyType - count: 1 - path: src/Store/ArrayStream.php - - - - message: '#^Ternary operator condition is always true\.$#' - identifier: ternary.alwaysTrue - count: 1 - path: src/Store/ArrayStream.php - - message: '#^Method Patchlevel\\EventSourcing\\Store\\Criteria\\Criteria\:\:get\(\) should return T of object but returns object\.$#' identifier: return.type @@ -96,59 +84,23 @@ parameters: count: 1 path: src/Store/Criteria/StreamCriterion.php - - - message: '#^Method Patchlevel\\EventSourcing\\Store\\StreamDoctrineDbalStoreStream\:\:current\(\) never returns null so it can be removed from the return type\.$#' - identifier: return.unusedType - count: 1 - path: src/Store/StreamDoctrineDbalStoreStream.php - - message: '#^Parameter \#1 \$playhead of class Patchlevel\\EventSourcing\\Store\\Header\\PlayheadHeader constructor expects int\<1, max\>, int given\.$#' identifier: argument.type count: 1 - path: src/Store/StreamDoctrineDbalStoreStream.php - - - - message: '#^Ternary operator condition is always true\.$#' - identifier: ternary.alwaysTrue - count: 1 - path: src/Store/StreamDoctrineDbalStoreStream.php - - - - message: '#^Method Patchlevel\\EventSourcing\\Store\\TaggableDoctrineDbalStoreStream\:\:current\(\) never returns null so it can be removed from the return type\.$#' - identifier: return.unusedType - count: 1 - path: src/Store/TaggableDoctrineDbalStoreStream.php + path: src/Store/StreamDoctrineDbalStore.php - message: '#^Parameter \#1 \$playhead of class Patchlevel\\EventSourcing\\Store\\Header\\PlayheadHeader constructor expects int\<1, max\>, int given\.$#' identifier: argument.type count: 1 - path: src/Store/TaggableDoctrineDbalStoreStream.php + path: src/Store/TaggableDoctrineDbalStore.php - message: '#^Parameter \#1 \$tags of class Patchlevel\\EventSourcing\\Store\\Header\\TagsHeader constructor expects list\, mixed given\.$#' identifier: argument.type count: 1 - path: src/Store/TaggableDoctrineDbalStoreStream.php - - - - message: '#^Ternary operator condition is always true\.$#' - identifier: ternary.alwaysTrue - count: 1 - path: src/Store/TaggableDoctrineDbalStoreStream.php - - - - message: '#^Generator expects key type int, int\<1, max\>\|null given\.$#' - identifier: generator.keyType - count: 1 - path: src/Subscription/Engine/GapResolverStoreMessageLoader.php - - - - message: '#^Property Patchlevel\\EventSourcing\\Subscription\\Engine\\GeneratorStream\:\:\$index \(int\<1, max\>\|null\) does not accept int\.$#' - identifier: assign.propertyType - count: 1 - path: src/Subscription/Engine/GeneratorStream.php + path: src/Store/TaggableDoctrineDbalStore.php - message: '#^Parameter \#1 \$eventClass of method Patchlevel\\EventSourcing\\Metadata\\Event\\EventRegistry\:\:eventName\(\) expects class\-string, string given\.$#' @@ -294,12 +246,6 @@ parameters: count: 1 path: tests/Integration/Store/Profile.php - - - message: '#^Using nullsafe method call on non\-nullable type Patchlevel\\EventSourcing\\Store\\Stream\. Use \-\> instead\.$#' - identifier: nullsafe.neverNull - count: 1 - path: tests/Integration/Store/StreamDoctrineDbalStoreTest.php - - message: '#^Parameter \#1 \$table of class Patchlevel\\EventSourcing\\Subscription\\Cleanup\\Dbal\\DropTableTask constructor expects non\-empty\-string, string given\.$#' identifier: argument.type diff --git a/src/Console/Command/StoreMigrateCommand.php b/src/Console/Command/StoreMigrateCommand.php index b91daa3d8..825e48469 100644 --- a/src/Console/Command/StoreMigrateCommand.php +++ b/src/Console/Command/StoreMigrateCommand.php @@ -6,7 +6,6 @@ use Patchlevel\EventSourcing\Console\InputHelper; use Patchlevel\EventSourcing\Console\OutputStyle; -use Patchlevel\EventSourcing\Message\Pipe; use Patchlevel\EventSourcing\Message\Translator\Translator; use Patchlevel\EventSourcing\Store\Store; use Symfony\Component\Console\Attribute\AsCommand; @@ -46,38 +45,23 @@ protected function configure(): void protected function execute(InputInterface $input, OutputInterface $output): int { - $buffer = InputHelper::positiveIntOrZero($input->getOption('buffer')); + $buffer = InputHelper::positiveInt($input->getOption('buffer')); $style = new OutputStyle($input, $output); $style->info('Migration initialization...'); $count = $this->store->count(); - $messages = $this->store->load(); + $stream = $this->store->load(); $style->progressStart($count); - $bufferedMessages = []; + $translatedStream = $stream->transform(...$this->translators); - $pipe = new Pipe( - $messages, - ...$this->translators, - ); + foreach ($translatedStream->chunk($buffer) as $chunk) { + $messages = $chunk->toList(); - foreach ($pipe as $message) { - $bufferedMessages[] = $message; - - if (count($bufferedMessages) < $buffer) { - continue; - } - - $this->newStore->save(...$bufferedMessages); - $bufferedMessages = []; - $style->progressAdvance($buffer); - } - - if (count($bufferedMessages) !== 0) { - $this->newStore->save(...$bufferedMessages); - $style->progressAdvance(count($bufferedMessages)); + $this->newStore->save(...$messages); + $style->progressAdvance(count($messages)); } $style->progressFinish(); diff --git a/src/Console/InputHelper.php b/src/Console/InputHelper.php index 83219589f..0fbdb203d 100644 --- a/src/Console/InputHelper.php +++ b/src/Console/InputHelper.php @@ -66,6 +66,26 @@ public static function nullableInt(mixed $value): int|null return (int)$value; } + /** @return positive-int */ + public static function positiveInt(mixed $value): int + { + if (!is_string($value) && !is_int($value)) { + throw new InvalidArgumentGiven($value, 'positive-int'); + } + + if (!is_numeric($value)) { + throw new InvalidArgumentGiven($value, 'positive-int'); + } + + $value = (int)$value; + + if ($value <= 0) { + throw new InvalidArgumentGiven($value, 'positive-int'); + } + + return $value; + } + /** @return positive-int|null */ public static function nullablePositiveInt(mixed $value): int|null { diff --git a/src/Message/Pipe.php b/src/Message/Pipe.php deleted file mode 100644 index 2c38bcdc3..000000000 --- a/src/Message/Pipe.php +++ /dev/null @@ -1,59 +0,0 @@ - */ -final class Pipe implements IteratorAggregate -{ - private Translator $translator; - - /** @param iterable $messages */ - public function __construct( - private readonly iterable $messages, - Translator ...$translators, - ) { - $this->translator = new ChainTranslator($translators); - } - - /** @return Traversable */ - public function getIterator(): Traversable - { - return $this->createGenerator( - $this->messages, - $this->translator, - ); - } - - /** @return list */ - public function toArray(): array - { - return array_values( - iterator_to_array($this->getIterator()), - ); - } - - /** - * @param iterable $messages - * - * @return Generator - */ - private function createGenerator(iterable $messages, Translator $translator): Generator - { - foreach ($messages as $message) { - foreach ($translator($message) as $translatedMessage) { - yield $translatedMessage; - } - } - } -} diff --git a/src/Message/Stream.php b/src/Message/Stream.php new file mode 100644 index 000000000..4289715bd --- /dev/null +++ b/src/Message/Stream.php @@ -0,0 +1,206 @@ + */ +final class Stream implements Iterator +{ + /** @var Iterator|null */ + private Iterator|null $iterator; + + /** @var positive-int|0|null */ + private int|null $position = null; + + /** @param iterable $messages */ + public function __construct(iterable $messages = []) + { + if ($messages instanceof Iterator) { + $this->iterator = $messages; + } elseif ($messages instanceof Traversable) { + $this->iterator = new IteratorIterator($messages); + } else { + $this->iterator = new ArrayIterator($messages); + } + } + + public function close(): void + { + $this->iterator = null; + $this->position = null; + } + + public function current(): Message|null + { + $this->assertNotClosed(); + + return $this->iterator->valid() ? $this->iterator->current() : null; + } + + public function key(): int|null + { + $this->assertNotClosed(); + + return $this->iterator->valid() ? $this->iterator->key() : null; + } + + public function next(): void + { + $this->initialize(); + $this->assertNotClosed(); + + $this->iterator->next(); + + if (!$this->iterator->valid()) { + return; + } + + $this->position = $this->position === null ? 0 : $this->position + 1; + } + + public function rewind(): void + { + $this->assertNotClosed(); + + try { + $this->iterator->rewind(); + } catch (Throwable) { + throw new StreamNotRewindable(); + } + + $this->position = null; + } + + public function valid(): bool + { + $this->assertNotClosed(); + + return $this->iterator->valid(); + } + + /** @phpstan-assert !null $this->iterator */ + private function assertNotClosed(): void + { + if ($this->iterator === null) { + throw new StreamClosed(); + } + } + + public function end(): bool + { + $this->assertNotClosed(); + + return !$this->iterator->valid(); + } + + public function position(): int|null + { + $this->initialize(); + $this->assertNotClosed(); + + return $this->position; + } + + /** + * Alias for key(). + */ + public function index(): int|null + { + return $this->key(); + } + + private function initialize(): void + { + if (!$this->iterator?->valid() || $this->position !== null) { + return; + } + + $this->position = 0; + } + + /** + * Converts the stream to a list and consumes it from the current position. + * + * @return list + */ + public function toList(): array + { + $this->assertNotClosed(); + + return iterator_to_array($this, false); + } + + /** + * Converts the stream to an indexed array and consumes it from the current position. + * + * @return array + */ + public function toArray(): array + { + $this->assertNotClosed(); + + return iterator_to_array($this); + } + + /** + * If the underlying iterator is single-pass (for example a Generator), + * the transformed stream is single-pass as well. + */ + public function transform(Translator ...$translators): Stream + { + $chainTranslator = new ChainTranslator($translators); + + $generator = function () use ($chainTranslator) { + foreach ($this as $message) { + foreach ($chainTranslator($message) as $translatedMessage) { + yield $translatedMessage; + } + } + }; + + return new Stream($generator()); + } + + /** + * @param positive-int $size + * + * @return Generator + */ + public function chunk(int $size = 1000): Generator + { + $buffer = []; + $bufferSize = 0; + + foreach ($this as $message) { + $buffer[] = $message; + $bufferSize++; + + if ($bufferSize !== $size) { + continue; + } + + yield new Stream($buffer); + + $buffer = []; + $bufferSize = 0; + } + + if ($bufferSize <= 0) { + return; + } + + yield new Stream($buffer); + } +} diff --git a/src/Store/StreamClosed.php b/src/Message/StreamClosed.php similarity index 83% rename from src/Store/StreamClosed.php rename to src/Message/StreamClosed.php index ac68dd8ad..18c1460f8 100644 --- a/src/Store/StreamClosed.php +++ b/src/Message/StreamClosed.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace Patchlevel\EventSourcing\Store; +namespace Patchlevel\EventSourcing\Message; use RuntimeException; diff --git a/src/Message/StreamNotRewindable.php b/src/Message/StreamNotRewindable.php new file mode 100644 index 000000000..1607ee2cf --- /dev/null +++ b/src/Message/StreamNotRewindable.php @@ -0,0 +1,15 @@ + */ -final class ArrayStream implements Stream, IteratorAggregate -{ - /** @var Iterator|null $iterator */ - private Iterator|null $iterator; - - /** @var positive-int|0|null */ - private int|null $position; - - /** @var positive-int|null */ - private int|null $index; - - /** @param array $messages The index is the key. An offset is not supported. */ - public function __construct(array $messages = []) - { - $this->iterator = $messages === [] ? new ArrayIterator() : $this->createGenerator($messages); - $this->position = null; - $this->index = null; - } - - public function close(): void - { - $this->iterator = null; - } - - /** @return Traversable */ - public function getIterator(): Traversable - { - $this->assertNotClosed(); - - return $this->iterator; - } - - /** @return positive-int|0|null */ - public function position(): int|null - { - $this->assertNotClosed(); - - if ($this->position === null) { - $this->iterator->key(); - } - - return $this->position; - } - - /** - * The index is based on position. An offset is not supported. - * - * @return positive-int|null - */ - public function index(): int|null - { - $this->assertNotClosed(); - - if ($this->index === null) { - $this->iterator->key(); - } - - return $this->index; - } - - public function next(): void - { - $this->assertNotClosed(); - - $this->iterator->next(); - } - - public function end(): bool - { - $this->assertNotClosed(); - - return !$this->iterator->valid(); - } - - public function current(): Message|null - { - $this->assertNotClosed(); - - return $this->iterator->current() ?: null; - } - - /** - * @param array $messages - * - * @return Generator - */ - private function createGenerator(array $messages): Generator - { - $hasIndex = true; - - foreach ($messages as $index => $message) { - if ($this->position === null) { - $this->position = 0; - } else { - $this->position++; - } - - if ($index === 0) { - $hasIndex = false; - } - - if ($hasIndex) { - $this->index = $index; - } else { - $this->index = $this->position + 1; - } - - yield $message; - } - } - - /** @phpstan-assert !null $this->iterator */ - private function assertNotClosed(): void - { - if ($this->iterator === null) { - throw new StreamClosed(); - } - } -} diff --git a/src/Store/InMemoryStore.php b/src/Store/InMemoryStore.php index d367169fe..625eb4202 100644 --- a/src/Store/InMemoryStore.php +++ b/src/Store/InMemoryStore.php @@ -8,6 +8,7 @@ use Patchlevel\EventSourcing\Clock\SystemClock; use Patchlevel\EventSourcing\Message\HeaderNotFound; use Patchlevel\EventSourcing\Message\Message; +use Patchlevel\EventSourcing\Message\Stream; use Patchlevel\EventSourcing\Metadata\Event\EventRegistry; use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion; use Patchlevel\EventSourcing\Store\Criteria\Criteria; @@ -58,7 +59,7 @@ public function load( int|null $limit = null, int|null $offset = null, bool $backwards = false, - ): ArrayStream { + ): Stream { $messages = $this->filter($criteria); if ($backwards) { @@ -73,7 +74,7 @@ public function load( $messages = array_slice($messages, 0, $limit); } - return new ArrayStream($messages); + return new Stream($messages); } public function count(Criteria|null $criteria = null): int diff --git a/src/Store/ReadOnlyStore.php b/src/Store/ReadOnlyStore.php index f016b365a..95a57ef8c 100644 --- a/src/Store/ReadOnlyStore.php +++ b/src/Store/ReadOnlyStore.php @@ -6,6 +6,7 @@ use Closure; use Patchlevel\EventSourcing\Message\Message; +use Patchlevel\EventSourcing\Message\Stream; use Patchlevel\EventSourcing\Store\Criteria\Criteria; use Psr\Log\LoggerInterface; diff --git a/src/Store/Store.php b/src/Store/Store.php index 3fe568526..0238a2459 100644 --- a/src/Store/Store.php +++ b/src/Store/Store.php @@ -6,6 +6,7 @@ use Closure; use Patchlevel\EventSourcing\Message\Message; +use Patchlevel\EventSourcing\Message\Stream; use Patchlevel\EventSourcing\Store\Criteria\Criteria; interface Store diff --git a/src/Store/Stream.php b/src/Store/Stream.php deleted file mode 100644 index 3fc268d3d..000000000 --- a/src/Store/Stream.php +++ /dev/null @@ -1,37 +0,0 @@ - */ -interface Stream extends Traversable -{ - public function close(): void; - - /** @throws StreamClosed */ - public function next(): void; - - /** @throws StreamClosed */ - public function current(): Message|null; - - /** @throws StreamClosed */ - public function end(): bool; - - /** - * @return positive-int|0|null - * - * @throws StreamClosed - */ - public function position(): int|null; - - /** - * @return positive-int|null - * - * @throws StreamClosed - */ - public function index(): int|null; -} diff --git a/src/Store/StreamDoctrineDbalStore.php b/src/Store/StreamDoctrineDbalStore.php index 0a7eab6d2..615ddcf49 100644 --- a/src/Store/StreamDoctrineDbalStore.php +++ b/src/Store/StreamDoctrineDbalStore.php @@ -13,17 +13,22 @@ use Doctrine\DBAL\Platforms\PostgreSQLPlatform; use Doctrine\DBAL\Platforms\SQLitePlatform; use Doctrine\DBAL\Query\QueryBuilder; +use Doctrine\DBAL\Result; use Doctrine\DBAL\Schema\Schema; +use Doctrine\DBAL\Types\DateTimeTzImmutableType; use Doctrine\DBAL\Types\Type; use Doctrine\DBAL\Types\Types; +use Generator; use Patchlevel\EventSourcing\Clock\SystemClock; use Patchlevel\EventSourcing\Message\HeaderNotFound; use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer; use Patchlevel\EventSourcing\Message\Serializer\HeadersSerializer; +use Patchlevel\EventSourcing\Message\Stream; use Patchlevel\EventSourcing\Schema\DoctrineHelper; use Patchlevel\EventSourcing\Schema\DoctrineSchemaConfigurator; use Patchlevel\EventSourcing\Serializer\EventSerializer; +use Patchlevel\EventSourcing\Serializer\SerializedEvent; use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion; use Patchlevel\EventSourcing\Store\Criteria\Criteria; use Patchlevel\EventSourcing\Store\Criteria\EventIdCriterion; @@ -107,7 +112,7 @@ public function load( int|null $limit = null, int|null $offset = null, bool $backwards = false, - ): StreamDoctrineDbalStoreStream { + ): Stream { $builder = $this->connection->createQueryBuilder() ->select('*') ->from($this->config['table_name']) @@ -118,15 +123,14 @@ public function load( $builder->setMaxResults($limit); $builder->setFirstResult($offset ?? 0); - return new StreamDoctrineDbalStoreStream( - $this->connection->executeQuery( - $builder->getSQL(), - $builder->getParameters(), - $builder->getParameterTypes(), + return new Stream( + $this->buildGenerator( + $this->connection->executeQuery( + $builder->getSQL(), + $builder->getParameters(), + $builder->getParameterTypes(), + ), ), - $this->eventSerializer, - $this->headersSerializer, - $this->connection->getDatabasePlatform(), ); } @@ -617,4 +621,38 @@ private function unlock(): void throw new LockingNotImplemented($platform::class); } + + /** @return Generator */ + private function buildGenerator(Result $result): Generator + { + /** @var DateTimeTzImmutableType $dateTimeType */ + $dateTimeType = Type::getType(Types::DATETIMETZ_IMMUTABLE); + $platform = $this->connection->getDatabasePlatform(); + + /** @var array{id: positive-int, stream: string, playhead: int|string|null, event_id: string, event_name: string, event_payload: string, recorded_on: string, archived: int|string, custom_headers: string} $data */ + foreach ($result->iterateAssociative() as $data) { + $event = $this->eventSerializer->deserialize(new SerializedEvent( + $data['event_name'], + $data['event_payload'], + )); + + $message = Message::create($event) + ->withHeader(new IndexHeader($data['id'])) + ->withHeader(new StreamNameHeader($data['stream'])) + ->withHeader(new RecordedOnHeader($dateTimeType->convertToPHPValue($data['recorded_on'], $platform))) + ->withHeader(new EventIdHeader($data['event_id'])); + + if ($data['playhead'] !== null) { + $message = $message->withHeader(new PlayheadHeader((int)$data['playhead'])); + } + + if ($data['archived']) { + $message = $message->withHeader(new ArchivedHeader()); + } + + $customHeaders = $this->headersSerializer->deserialize($data['custom_headers']); + + yield $data['id'] => $message->withHeaders($customHeaders); + } + } } diff --git a/src/Store/StreamDoctrineDbalStoreStream.php b/src/Store/StreamDoctrineDbalStoreStream.php deleted file mode 100644 index e507996ed..000000000 --- a/src/Store/StreamDoctrineDbalStoreStream.php +++ /dev/null @@ -1,163 +0,0 @@ - */ -final class StreamDoctrineDbalStoreStream implements Stream, IteratorAggregate -{ - private Result|null $result; - - /** @var Generator */ - private Generator|null $generator; - - /** @var positive-int|0|null */ - private int|null $position; - - /** @var positive-int|null */ - private int|null $index; - - public function __construct( - Result $result, - EventSerializer $eventSerializer, - HeadersSerializer $headersSerializer, - AbstractPlatform $platform, - ) { - $this->result = $result; - $this->generator = $this->buildGenerator($result, $eventSerializer, $headersSerializer, $platform); - $this->position = null; - $this->index = null; - } - - public function close(): void - { - $this->result?->free(); - - $this->result = null; - $this->generator = null; - } - - public function next(): void - { - $this->assertNotClosed(); - - $this->generator->next(); - } - - public function end(): bool - { - $this->assertNotClosed(); - - return !$this->generator->valid(); - } - - public function current(): Message|null - { - $this->assertNotClosed(); - - return $this->generator->current() ?: null; - } - - /** @return positive-int|0|null */ - public function position(): int|null - { - $this->assertNotClosed(); - - if ($this->position === null) { - $this->generator->key(); - } - - return $this->position; - } - - /** @return positive-int|null */ - public function index(): int|null - { - $this->assertNotClosed(); - - if ($this->index === null) { - $this->generator->key(); - } - - return $this->index; - } - - /** @return Traversable */ - public function getIterator(): Traversable - { - $this->assertNotClosed(); - - return $this->generator; - } - - /** @return Generator */ - private function buildGenerator( - Result $result, - EventSerializer $eventSerializer, - HeadersSerializer $headersSerializer, - AbstractPlatform $platform, - ): Generator { - /** @var DateTimeTzImmutableType $dateTimeType */ - $dateTimeType = Type::getType(Types::DATETIMETZ_IMMUTABLE); - - /** @var array{id: positive-int, stream: string, playhead: int|string|null, event_id: string, event_name: string, event_payload: string, recorded_on: string, archived: int|string, custom_headers: string} $data */ - foreach ($result->iterateAssociative() as $data) { - if ($this->position === null) { - $this->position = 0; - } else { - ++$this->position; - } - - $this->index = $data['id']; - $event = $eventSerializer->deserialize(new SerializedEvent($data['event_name'], $data['event_payload'])); - - $message = Message::create($event) - ->withHeader(new IndexHeader($data['id'])) - ->withHeader(new StreamNameHeader($data['stream'])) - ->withHeader(new RecordedOnHeader($dateTimeType->convertToPHPValue($data['recorded_on'], $platform))) - ->withHeader(new EventIdHeader($data['event_id'])); - - if ($data['playhead'] !== null) { - $message = $message->withHeader(new PlayheadHeader((int)$data['playhead'])); - } - - if ($data['archived']) { - $message = $message->withHeader(new ArchivedHeader()); - } - - $customHeaders = $headersSerializer->deserialize($data['custom_headers']); - - yield $message->withHeaders($customHeaders); - } - } - - /** - * @phpstan-assert !null $this->result - * @phpstan-assert !null $this->generator - */ - private function assertNotClosed(): void - { - if ($this->result === null || $this->generator === null) { - throw new StreamClosed(); - } - } -} diff --git a/src/Store/TaggableDoctrineDbalStore.php b/src/Store/TaggableDoctrineDbalStore.php index 163fb85b9..0db57009e 100644 --- a/src/Store/TaggableDoctrineDbalStore.php +++ b/src/Store/TaggableDoctrineDbalStore.php @@ -13,19 +13,24 @@ use Doctrine\DBAL\Platforms\PostgreSQLPlatform; use Doctrine\DBAL\Platforms\SQLitePlatform; use Doctrine\DBAL\Query\QueryBuilder; +use Doctrine\DBAL\Result; use Doctrine\DBAL\Schema\Name\UnqualifiedName; use Doctrine\DBAL\Schema\PrimaryKeyConstraint; use Doctrine\DBAL\Schema\Schema; +use Doctrine\DBAL\Types\DateTimeTzImmutableType; use Doctrine\DBAL\Types\Type; use Doctrine\DBAL\Types\Types; +use Generator; use Patchlevel\EventSourcing\Clock\SystemClock; use Patchlevel\EventSourcing\Message\HeaderNotFound; use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer; use Patchlevel\EventSourcing\Message\Serializer\HeadersSerializer; +use Patchlevel\EventSourcing\Message\Stream; use Patchlevel\EventSourcing\Metadata\Event\EventRegistry; use Patchlevel\EventSourcing\Schema\DoctrineSchemaConfigurator; use Patchlevel\EventSourcing\Serializer\EventSerializer; +use Patchlevel\EventSourcing\Serializer\SerializedEvent; use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion; use Patchlevel\EventSourcing\Store\Criteria\Criteria; use Patchlevel\EventSourcing\Store\Criteria\EventIdCriterion; @@ -60,11 +65,14 @@ use function in_array; use function is_int; use function is_string; +use function json_decode; use function json_encode; use function sprintf; use function str_contains; use function str_replace; +use const JSON_THROW_ON_ERROR; + /** @experimental */ final class TaggableDoctrineDbalStore implements Store, AppendStore, SubscriptionStore, DoctrineSchemaConfigurator { @@ -129,7 +137,7 @@ public function load( int|null $limit = null, int|null $offset = null, bool $backwards = false, - ): TaggableDoctrineDbalStoreStream { + ): Stream { $builder = $this->connection->createQueryBuilder() ->select('*') ->from($this->config['table_name']) @@ -140,15 +148,14 @@ public function load( $builder->setMaxResults($limit); $builder->setFirstResult($offset ?? 0); - return new TaggableDoctrineDbalStoreStream( - $this->connection->executeQuery( - $builder->getSQL(), - $builder->getParameters(), - $builder->getParameterTypes(), + return new Stream( + $this->buildGenerator( + $this->connection->executeQuery( + $builder->getSQL(), + $builder->getParameters(), + $builder->getParameterTypes(), + ), ), - $this->eventSerializer, - $this->headersSerializer, - $this->connection->getDatabasePlatform(), ); } @@ -501,15 +508,14 @@ public function query(Query $query): Stream $this->queryCondition($builder, $query); - return new TaggableDoctrineDbalStoreStream( - $this->connection->executeQuery( - $builder->getSQL(), - $builder->getParameters(), - $builder->getParameterTypes(), + return new Stream( + $this->buildGenerator( + $this->connection->executeQuery( + $builder->getSQL(), + $builder->getParameters(), + $builder->getParameterTypes(), + ), ), - $this->eventSerializer, - $this->headersSerializer, - $this->connection->getDatabasePlatform(), ); } @@ -890,4 +896,46 @@ private function queryCondition(QueryBuilder $builder, Query $query): void 'ej.id = events.id', ); } + + /** @return Generator */ + private function buildGenerator(Result $result): Generator + { + /** @var DateTimeTzImmutableType $dateTimeType */ + $dateTimeType = Type::getType(Types::DATETIMETZ_IMMUTABLE); + $platform = $this->connection->getDatabasePlatform(); + + /** @var array{id: positive-int, stream: string, playhead: int|string|null, event_id: string, event_name: string, event_payload: string, tags: string, recorded_on: string, archived: int|string, custom_headers: string} $data */ + foreach ($result->iterateAssociative() as $data) { + $event = $this->eventSerializer->deserialize(new SerializedEvent( + $data['event_name'], + $data['event_payload'], + )); + + $message = Message::create($event) + ->withHeader(new IndexHeader($data['id'])) + ->withHeader(new StreamNameHeader($data['stream'])) + ->withHeader(new RecordedOnHeader($dateTimeType->convertToPHPValue($data['recorded_on'], $platform))) + ->withHeader(new EventIdHeader($data['event_id'])); + + if ($data['playhead'] !== null) { + $message = $message->withHeader(new PlayheadHeader((int)$data['playhead'])); + } + + if ($data['archived']) { + $message = $message->withHeader(new ArchivedHeader()); + } + + if ($data['tags'] !== '[]') { + $message = $message->withHeader( + new TagsHeader( + json_decode($data['tags'], true, 512, JSON_THROW_ON_ERROR), + ), + ); + } + + $customHeaders = $this->headersSerializer->deserialize($data['custom_headers']); + + yield $data['id'] => $message->withHeaders($customHeaders); + } + } } diff --git a/src/Store/TaggableDoctrineDbalStoreStream.php b/src/Store/TaggableDoctrineDbalStoreStream.php deleted file mode 100644 index aa79d5c37..000000000 --- a/src/Store/TaggableDoctrineDbalStoreStream.php +++ /dev/null @@ -1,179 +0,0 @@ - - */ -final class TaggableDoctrineDbalStoreStream implements Stream, IteratorAggregate -{ - private Result|null $result; - - /** @var Generator */ - private Generator|null $generator; - - /** @var positive-int|0|null */ - private int|null $position; - - /** @var positive-int|null */ - private int|null $index; - - public function __construct( - Result $result, - EventSerializer $eventSerializer, - HeadersSerializer $headersSerializer, - AbstractPlatform $platform, - ) { - $this->result = $result; - $this->generator = $this->buildGenerator($result, $eventSerializer, $headersSerializer, $platform); - $this->position = null; - $this->index = null; - } - - public function close(): void - { - $this->result?->free(); - - $this->result = null; - $this->generator = null; - } - - public function next(): void - { - $this->assertNotClosed(); - - $this->generator->next(); - } - - public function end(): bool - { - $this->assertNotClosed(); - - return !$this->generator->valid(); - } - - public function current(): Message|null - { - $this->assertNotClosed(); - - return $this->generator->current() ?: null; - } - - /** @return positive-int|0|null */ - public function position(): int|null - { - $this->assertNotClosed(); - - if ($this->position === null) { - $this->generator->key(); - } - - return $this->position; - } - - /** @return positive-int|null */ - public function index(): int|null - { - $this->assertNotClosed(); - - if ($this->index === null) { - $this->generator->key(); - } - - return $this->index; - } - - /** @return Traversable */ - public function getIterator(): Traversable - { - $this->assertNotClosed(); - - return $this->generator; - } - - /** @return Generator */ - private function buildGenerator( - Result $result, - EventSerializer $eventSerializer, - HeadersSerializer $headersSerializer, - AbstractPlatform $platform, - ): Generator { - /** @var DateTimeTzImmutableType $dateTimeType */ - $dateTimeType = Type::getType(Types::DATETIMETZ_IMMUTABLE); - - /** @var array{id: positive-int, stream: string, playhead: int|string|null, event_id: string, event_name: string, event_payload: string, tags: string, recorded_on: string, archived: int|string, custom_headers: string} $data */ - foreach ($result->iterateAssociative() as $data) { - if ($this->position === null) { - $this->position = 0; - } else { - ++$this->position; - } - - $this->index = $data['id']; - $event = $eventSerializer->deserialize(new SerializedEvent($data['event_name'], $data['event_payload'])); - - $message = Message::create($event) - ->withHeader(new IndexHeader($data['id'])) - ->withHeader(new StreamNameHeader($data['stream'])) - ->withHeader(new RecordedOnHeader($dateTimeType->convertToPHPValue($data['recorded_on'], $platform))) - ->withHeader(new EventIdHeader($data['event_id'])); - - if ($data['playhead'] !== null) { - $message = $message->withHeader(new PlayheadHeader((int)$data['playhead'])); - } - - if ($data['archived']) { - $message = $message->withHeader(new ArchivedHeader()); - } - - if ($data['tags'] !== '[]') { - $message = $message->withHeader( - new TagsHeader( - json_decode($data['tags'], true, 512, JSON_THROW_ON_ERROR), - ), - ); - } - - $customHeaders = $headersSerializer->deserialize($data['custom_headers']); - - yield $message->withHeaders($customHeaders); - } - } - - /** - * @phpstan-assert !null $this->result - * @phpstan-assert !null $this->generator - */ - private function assertNotClosed(): void - { - if ($this->result === null || $this->generator === null) { - throw new StreamClosed(); - } - } -} diff --git a/src/Subscription/Engine/DefaultSubscriptionEngine.php b/src/Subscription/Engine/DefaultSubscriptionEngine.php index 0b668f407..b82e63d47 100644 --- a/src/Subscription/Engine/DefaultSubscriptionEngine.php +++ b/src/Subscription/Engine/DefaultSubscriptionEngine.php @@ -220,18 +220,14 @@ function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult { $errors = []; $stream = null; $messageCounter = 0; + $lastIndex = null; try { $stream = $this->messageLoader->load($startIndex, $subscriptions->toArray()); - foreach ($stream as $message) { + foreach ($stream as $index => $message) { $messageCounter++; - - $index = $stream->index(); - - if ($index === null) { - throw new UnexpectedError('Stream index is null, this should not happen.'); - } + $lastIndex = $index; foreach ($subscriptions as $subscription) { if ($subscription->position() >= $index) { @@ -289,12 +285,11 @@ function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult { } } } finally { - $endIndex = $stream?->index() ?: $startIndex; $stream?->close(); - if ($messageCounter > 0) { + if ($lastIndex !== null && $messageCounter > 0) { foreach ($subscriptions as $subscription) { - $error = $this->ensureCommitBatch($subscription, $endIndex); + $error = $this->ensureCommitBatch($subscription, $lastIndex); if ($error) { $errors[] = $error; @@ -391,18 +386,14 @@ function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult { $errors = []; $stream = null; $messageCounter = 0; + $lastIndex = null; try { $stream = $this->messageLoader->load($startIndex, $subscriptions->toArray()); - foreach ($stream as $message) { + foreach ($stream as $index => $message) { $messageCounter++; - - $index = $stream->index(); - - if ($index === null) { - throw new UnexpectedError('Stream index is null, this should not happen.'); - } + $lastIndex = $index; foreach ($subscriptions as $subscription) { if ($subscription->position() >= $index) { @@ -454,12 +445,11 @@ function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult { } } } finally { - $endIndex = $stream?->index() ?: $startIndex; $stream?->close(); - if ($messageCounter > 0) { + if ($lastIndex !== null && $messageCounter > 0) { foreach ($subscriptions as $subscription) { - $error = $this->ensureCommitBatch($subscription, $endIndex); + $error = $this->ensureCommitBatch($subscription, $lastIndex); if ($error) { $errors[] = $error; @@ -489,7 +479,7 @@ function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult { $this->logger?->info( sprintf( 'Subscription Engine: End of stream on position "%d" has been reached, finish processing.', - $endIndex, + $lastIndex, ), ); diff --git a/src/Subscription/Engine/EventFilteredStoreMessageLoader.php b/src/Subscription/Engine/EventFilteredStoreMessageLoader.php index 217588d40..b49580959 100644 --- a/src/Subscription/Engine/EventFilteredStoreMessageLoader.php +++ b/src/Subscription/Engine/EventFilteredStoreMessageLoader.php @@ -4,12 +4,12 @@ namespace Patchlevel\EventSourcing\Subscription\Engine; +use Patchlevel\EventSourcing\Message\Stream; use Patchlevel\EventSourcing\Metadata\Event\EventMetadataFactory; use Patchlevel\EventSourcing\Store\Criteria\Criteria; use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion; use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion; use Patchlevel\EventSourcing\Store\Store; -use Patchlevel\EventSourcing\Store\Stream; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessor; use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessorRepository; use Patchlevel\EventSourcing\Subscription\Subscription; @@ -26,9 +26,13 @@ public function __construct( } /** @param list $subscriptions */ - public function load(int $startIndex, array $subscriptions): Stream + public function load(int|null $startIndex, array $subscriptions): Stream { - $criteria = new Criteria(new FromIndexCriterion($startIndex)); + $criteria = new Criteria(); + + if ($startIndex !== null) { + $criteria = $criteria->add(new FromIndexCriterion($startIndex)); + } $events = $this->events($subscriptions); diff --git a/src/Subscription/Engine/GapResolverStoreMessageLoader.php b/src/Subscription/Engine/GapResolverStoreMessageLoader.php index 88dede9a1..89116ea2e 100644 --- a/src/Subscription/Engine/GapResolverStoreMessageLoader.php +++ b/src/Subscription/Engine/GapResolverStoreMessageLoader.php @@ -8,11 +8,11 @@ use Generator; use Patchlevel\EventSourcing\Clock\SystemClock; use Patchlevel\EventSourcing\Message\Message; +use Patchlevel\EventSourcing\Message\Stream; use Patchlevel\EventSourcing\Store\Criteria\Criteria; use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion; use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader; use Patchlevel\EventSourcing\Store\Store; -use Patchlevel\EventSourcing\Store\Stream; use Patchlevel\EventSourcing\Subscription\Subscription; use Psr\Clock\ClockInterface; @@ -30,9 +30,9 @@ public function __construct( } /** @param list $subscriptions */ - public function load(int $startIndex, array $subscriptions): Stream + public function load(int|null $startIndex, array $subscriptions): Stream { - return new GeneratorStream( + return new Stream( $this->generator($startIndex), ); } @@ -45,29 +45,41 @@ public function lastIndex(): int } /** @return Generator */ - private function generator(int $startIndex, int $retry = 0): Generator + private function generator(int|null $startIndex, int $retry = 0): Generator { - $currentIndex = $startIndex + 1; - $stream = $this->store->load(new Criteria(new FromIndexCriterion($startIndex))); + $expectedNextIndex = $startIndex ? $startIndex + 1 : null; - foreach ($stream as $message) { - if ($currentIndex !== $stream->index() && $this->inDetectionWindow($message)) { + $criteria = new Criteria(); + + if ($startIndex !== null) { + $criteria = $criteria->add(new FromIndexCriterion($startIndex)); + } + + $stream = $this->store->load($criteria); + + foreach ($stream as $currentIndex => $message) { + if ($expectedNextIndex !== null && $expectedNextIndex !== $currentIndex && $this->inDetectionWindow($message)) { $sleep = $this->retriesInMs[$retry] ?? null; if ($sleep !== null) { $stream->close(); usleep($sleep * 1000); - yield from $this->generator($currentIndex - 1, $retry + 1); + yield from $this->generator($expectedNextIndex - 1, $retry + 1); break; } } + if ($expectedNextIndex === null) { + $expectedNextIndex = $currentIndex + 1; + } else { + $expectedNextIndex++; + } + $retry = 0; - $currentIndex++; - yield $stream->index() => $message; + yield $currentIndex => $message; } $stream->close(); diff --git a/src/Subscription/Engine/GeneratorStream.php b/src/Subscription/Engine/GeneratorStream.php deleted file mode 100644 index 216809635..000000000 --- a/src/Subscription/Engine/GeneratorStream.php +++ /dev/null @@ -1,117 +0,0 @@ - - * @interal - */ -final class GeneratorStream implements Stream, IteratorAggregate -{ - /** @var Generator|null */ - private Generator|null $iterator; - - private Message|null $current = null; - - /** @var positive-int|null */ - private int|null $index = null; - - /** @var positive-int|0|null */ - private int|null $position = null; - - /** @param Generator $iterator */ - public function __construct(Generator $iterator) - { - $this->iterator = $this->generator($iterator); - } - - public function close(): void - { - $this->iterator = null; - } - - /** @return Generator */ - public function getIterator(): Generator - { - $this->assertNotClosed(); - - return $this->iterator; - } - - /** @return positive-int|0|null */ - public function position(): int|null - { - $this->assertNotClosed(); - - return $this->position; - } - - /** - * The index is based on position. An offset is not supported. - * - * @return positive-int|null - */ - public function index(): int|null - { - $this->assertNotClosed(); - - return $this->index; - } - - public function next(): void - { - $this->assertNotClosed(); - - $this->iterator->next(); - } - - public function end(): bool - { - $this->assertNotClosed(); - - return !$this->iterator->valid(); - } - - public function current(): Message|null - { - $this->assertNotClosed(); - - return $this->current; - } - - /** - * @param Generator $messages - * - * @return Generator - */ - private function generator(Generator $messages): Generator - { - foreach ($messages as $index => $message) { - if ($this->position === null) { - $this->position = 0; - } - - $this->index = $index; - $this->position++; - $this->current = $message; - - yield $message; - } - } - - /** @phpstan-assert !null $this->iterator */ - private function assertNotClosed(): void - { - if ($this->iterator === null) { - throw new StreamClosed(); - } - } -} diff --git a/src/Subscription/Engine/MessageLoader.php b/src/Subscription/Engine/MessageLoader.php index 1bdf5a655..01f5a860f 100644 --- a/src/Subscription/Engine/MessageLoader.php +++ b/src/Subscription/Engine/MessageLoader.php @@ -4,13 +4,13 @@ namespace Patchlevel\EventSourcing\Subscription\Engine; -use Patchlevel\EventSourcing\Store\Stream; +use Patchlevel\EventSourcing\Message\Stream; use Patchlevel\EventSourcing\Subscription\Subscription; interface MessageLoader { /** @param list $subscriptions */ - public function load(int $startIndex, array $subscriptions): Stream; + public function load(int|null $startIndex, array $subscriptions): Stream; public function lastIndex(): int; } diff --git a/src/Subscription/Engine/StoreMessageLoader.php b/src/Subscription/Engine/StoreMessageLoader.php index eae336dd1..44fc508da 100644 --- a/src/Subscription/Engine/StoreMessageLoader.php +++ b/src/Subscription/Engine/StoreMessageLoader.php @@ -4,10 +4,10 @@ namespace Patchlevel\EventSourcing\Subscription\Engine; +use Patchlevel\EventSourcing\Message\Stream; use Patchlevel\EventSourcing\Store\Criteria\Criteria; use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion; use Patchlevel\EventSourcing\Store\Store; -use Patchlevel\EventSourcing\Store\Stream; use Patchlevel\EventSourcing\Subscription\Subscription; final class StoreMessageLoader implements MessageLoader @@ -18,9 +18,15 @@ public function __construct( } /** @param list $subscriptions */ - public function load(int $startIndex, array $subscriptions): Stream + public function load(int|null $startIndex, array $subscriptions): Stream { - return $this->store->load(new Criteria(new FromIndexCriterion($startIndex))); + $criteria = new Criteria(); + + if ($startIndex !== null) { + $criteria = $criteria->add(new FromIndexCriterion($startIndex)); + } + + return $this->store->load($criteria); } public function lastIndex(): int diff --git a/src/Subscription/Engine/SubscriptionCollection.php b/src/Subscription/Engine/SubscriptionCollection.php index a8fba233c..0dabda83c 100644 --- a/src/Subscription/Engine/SubscriptionCollection.php +++ b/src/Subscription/Engine/SubscriptionCollection.php @@ -46,20 +46,22 @@ public function count(): int return count($this->subscriptions); } - public function lowestPosition(): int + public function lowestPosition(): int|null { $min = null; foreach ($this->subscriptions as $subscription) { - if ($min !== null && $subscription->position() >= $min) { - continue; + $position = $subscription->position(); + + if ($position === null) { + return null; } - $min = $subscription->position(); - } + if ($min !== null && $position >= $min) { + continue; + } - if ($min === null) { - return 0; + $min = $position; } return $min; diff --git a/src/Subscription/Lookup/Lookup.php b/src/Subscription/Lookup/Lookup.php index 4e761b6de..d4d902e5c 100644 --- a/src/Subscription/Lookup/Lookup.php +++ b/src/Subscription/Lookup/Lookup.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\Subscription\Lookup; use Patchlevel\EventSourcing\Message\Message; +use Patchlevel\EventSourcing\Message\Stream; use Patchlevel\EventSourcing\Metadata\Event\EventRegistry; use Patchlevel\EventSourcing\Store\Criteria\Criteria; use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion; @@ -13,7 +14,6 @@ use Patchlevel\EventSourcing\Store\Header\IndexHeader; use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; use Patchlevel\EventSourcing\Store\Store; -use Patchlevel\EventSourcing\Store\Stream; use function array_map; use function array_values; diff --git a/src/Subscription/Store/DoctrineSubscriptionStore.php b/src/Subscription/Store/DoctrineSubscriptionStore.php index 34e86ae09..5b1759a49 100644 --- a/src/Subscription/Store/DoctrineSubscriptionStore.php +++ b/src/Subscription/Store/DoctrineSubscriptionStore.php @@ -36,7 +36,7 @@ * id: string, * group_name: string, * run_mode: string, - * position: int, + * position: int|null, * status: string, * error_message: string|null, * error_previous_status: string|null, @@ -230,7 +230,7 @@ public function configureSchema(Schema $schema, Connection $connection): void ->setLength(16) ->setNotnull(true); $table->addColumn('position', Types::INTEGER) - ->setNotnull(true); + ->setNotnull(false); $table->addColumn('status', Types::STRING) ->setLength(32) ->setNotnull(true); diff --git a/src/Subscription/Subscription.php b/src/Subscription/Subscription.php index 0dd056de8..fb06a2a52 100644 --- a/src/Subscription/Subscription.php +++ b/src/Subscription/Subscription.php @@ -17,7 +17,7 @@ public function __construct( private string $group = self::DEFAULT_GROUP, private RunMode $runMode = RunMode::FromBeginning, private Status $status = Status::New, - private int $position = 0, + private int|null $position = null, private SubscriptionError|null $error = null, private int $retryAttempt = 0, private DateTimeImmutable|null $lastSavedAt = null, @@ -55,7 +55,7 @@ public function status(): Status return $this->status; } - public function position(): int + public function position(): int|null { return $this->position; } diff --git a/tests/Integration/BasicImplementation/BasicIntegrationTest.php b/tests/Integration/BasicImplementation/BasicIntegrationTest.php index 486a1e6cb..45a1f1b1c 100644 --- a/tests/Integration/BasicImplementation/BasicIntegrationTest.php +++ b/tests/Integration/BasicImplementation/BasicIntegrationTest.php @@ -10,7 +10,6 @@ use Patchlevel\EventSourcing\CommandBus\ServiceLocator; use Patchlevel\EventSourcing\CommandBus\SyncCommandBus; use Patchlevel\EventSourcing\Message\Message; -use Patchlevel\EventSourcing\Message\Pipe; use Patchlevel\EventSourcing\Message\Reducer; use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer; use Patchlevel\EventSourcing\Message\Translator\UntilEventTranslator; @@ -240,10 +239,9 @@ public function testTempProjection(): void }, ]) ->reduce( - new Pipe( - $store->load(new Criteria( - new StreamCriterion('profile-' . $profileId->toString()), - )), + $store->load(new Criteria( + new StreamCriterion('profile-' . $profileId->toString()), + ))->transform( new UntilEventTranslator(new DateTimeImmutable()), ), ); diff --git a/tests/Integration/Store/StreamDoctrineDbalStoreTest.php b/tests/Integration/Store/StreamDoctrineDbalStoreTest.php index f81ef7ddc..d9798e8b5 100644 --- a/tests/Integration/Store/StreamDoctrineDbalStoreTest.php +++ b/tests/Integration/Store/StreamDoctrineDbalStoreTest.php @@ -469,6 +469,8 @@ public function testLoadWithWildcard(): void $stream?->close(); } + $stream = null; + try { $stream = $this->store->load(new Criteria(new StreamCriterion('*-*'))); diff --git a/tests/Integration/Store/TaggableDoctrineDbalStoreTest.php b/tests/Integration/Store/TaggableDoctrineDbalStoreTest.php index f61a0df3d..ad9d732a2 100644 --- a/tests/Integration/Store/TaggableDoctrineDbalStoreTest.php +++ b/tests/Integration/Store/TaggableDoctrineDbalStoreTest.php @@ -526,7 +526,7 @@ public function testTags(): void ), ); - $messages = iterator_to_array($stream); + $messages = $stream->toList(); self::assertCount(2, $messages); diff --git a/tests/PhpunitHelper.php b/tests/PhpunitHelper.php index acbbb84d8..c45ddb1d4 100644 --- a/tests/PhpunitHelper.php +++ b/tests/PhpunitHelper.php @@ -5,8 +5,8 @@ namespace Patchlevel\EventSourcing\Tests; use Patchlevel\EventSourcing\Message\Message; +use Patchlevel\EventSourcing\Message\Stream; use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; -use Patchlevel\EventSourcing\Store\Stream; use RuntimeException; use function count; diff --git a/tests/Unit/Console/Command/ShowAggregateCommandTest.php b/tests/Unit/Console/Command/ShowAggregateCommandTest.php index b360bf9f1..6f4c965d5 100644 --- a/tests/Unit/Console/Command/ShowAggregateCommandTest.php +++ b/tests/Unit/Console/Command/ShowAggregateCommandTest.php @@ -9,11 +9,11 @@ use Patchlevel\EventSourcing\Console\Command\ShowAggregateCommand; use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Message\Serializer\HeadersSerializer; +use Patchlevel\EventSourcing\Message\Stream; use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry; use Patchlevel\EventSourcing\Serializer\Encoder\Encoder; use Patchlevel\EventSourcing\Serializer\EventSerializer; use Patchlevel\EventSourcing\Serializer\SerializedEvent; -use Patchlevel\EventSourcing\Store\ArrayStream; use Patchlevel\EventSourcing\Store\Criteria\Criteria; use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion; use Patchlevel\EventSourcing\Store\Header\PlayheadHeader; @@ -47,7 +47,7 @@ public function testSuccessful(): void ->with(new Criteria( new StreamCriterion('profile-1'), )) - ->willReturn(new ArrayStream([$message])); + ->willReturn(new Stream([$message])); $serializer = $this->createMock(EventSerializer::class); $serializer @@ -176,7 +176,7 @@ public function testNotFound(): void $store = $this->createMock(Store::class); $store->method('load')->with(new Criteria( new StreamCriterion('profile-test'), - ))->willReturn(new ArrayStream()); + ))->willReturn(new Stream()); $serializer = $this->createMock(EventSerializer::class); @@ -251,7 +251,7 @@ public function testInteractiveSuccessful(): void $store->method('load')->with(new Criteria( new StreamCriterion('profile-1'), ))->willReturn( - new ArrayStream([$message]), + new Stream([$message]), ); $eventSerializer = $this->createMock(EventSerializer::class); diff --git a/tests/Unit/Console/Command/StoreMigrateCommandTest.php b/tests/Unit/Console/Command/StoreMigrateCommandTest.php index 15e497f6c..4fcc76c36 100644 --- a/tests/Unit/Console/Command/StoreMigrateCommandTest.php +++ b/tests/Unit/Console/Command/StoreMigrateCommandTest.php @@ -17,8 +17,6 @@ use Symfony\Component\Console\Input\ArrayInput; use Symfony\Component\Console\Output\BufferedOutput; -use function iterator_to_array; - #[CoversClass(StoreMigrateCommand::class)] final class StoreMigrateCommandTest extends TestCase { @@ -70,7 +68,7 @@ public function testOneMessage(): void self::assertStringContainsString('1', $content); self::assertStringContainsString('Migration finished', $content); - self::assertCount(1, iterator_to_array($toStore->load()->getIterator())); + self::assertCount(1, $toStore->load()->toList()); } public function testTenMessages(): void @@ -154,7 +152,7 @@ public function testTenMessages(): void self::assertStringContainsString('10', $content); self::assertStringContainsString('Migration finished', $content); - self::assertCount(10, iterator_to_array($toStore->load()->getIterator())); + self::assertCount(10, $toStore->load()->toList()); } public function testTenMessagesWithBufferAt2(): void @@ -238,7 +236,7 @@ public function testTenMessagesWithBufferAt2(): void self::assertStringContainsString('10', $content); self::assertStringContainsString('Migration finished', $content); - self::assertCount(10, iterator_to_array($toStore->load()->getIterator())); + self::assertCount(10, $toStore->load()->toList()); } public function testTenMessagesWithDroppingTranslator(): void @@ -325,6 +323,6 @@ public function testTenMessagesWithDroppingTranslator(): void self::assertStringContainsString('10', $content); self::assertStringContainsString('Migration finished', $content); - self::assertCount(1, iterator_to_array($toStore->load()->getIterator())); + self::assertCount(1, $toStore->load()->toList()); } } diff --git a/tests/Unit/DecisionModel/StoreDecisionModelBuilderTest.php b/tests/Unit/DecisionModel/StoreDecisionModelBuilderTest.php index ceb4ba743..fe379c3ca 100644 --- a/tests/Unit/DecisionModel/StoreDecisionModelBuilderTest.php +++ b/tests/Unit/DecisionModel/StoreDecisionModelBuilderTest.php @@ -6,9 +6,9 @@ use Patchlevel\EventSourcing\DecisionModel\StoreDecisionModelBuilder; use Patchlevel\EventSourcing\Message\Message; +use Patchlevel\EventSourcing\Message\Stream; use Patchlevel\EventSourcing\Store\AppendCondition; use Patchlevel\EventSourcing\Store\AppendStore; -use Patchlevel\EventSourcing\Store\ArrayStream; use Patchlevel\EventSourcing\Store\Header\TagsHeader; use Patchlevel\EventSourcing\Store\Query; use Patchlevel\EventSourcing\Store\SubQuery; @@ -25,7 +25,7 @@ final class StoreDecisionModelBuilderTest extends TestCase public function testEmpty(): void { $store = $this->createMock(AppendStore::class); - $store->expects($this->once())->method('query')->with(new Query())->willReturn(new ArrayStream([])); + $store->expects($this->once())->method('query')->with(new Query())->willReturn(new Stream()); $builder = new StoreDecisionModelBuilder($store); @@ -54,7 +54,7 @@ public function testWithProjections(): void ), ); - $store->expects($this->once())->method('query')->with($expectedQuery)->willReturn(new ArrayStream([$message])); + $store->expects($this->once())->method('query')->with($expectedQuery)->willReturn(new Stream([1 => $message])); $builder = new StoreDecisionModelBuilder($store); @@ -78,7 +78,7 @@ public function testEmptyStream(): void ), ); - $store->expects($this->once())->method('query')->with($expectedQuery)->willReturn(new ArrayStream()); + $store->expects($this->once())->method('query')->with($expectedQuery)->willReturn(new Stream()); $builder = new StoreDecisionModelBuilder($store); diff --git a/tests/Unit/Message/PipeTest.php b/tests/Unit/Message/StreamTest.php similarity index 61% rename from tests/Unit/Message/PipeTest.php rename to tests/Unit/Message/StreamTest.php index a678ab36d..22be44322 100644 --- a/tests/Unit/Message/PipeTest.php +++ b/tests/Unit/Message/StreamTest.php @@ -6,7 +6,9 @@ use DateTimeImmutable; use Patchlevel\EventSourcing\Message\Message; -use Patchlevel\EventSourcing\Message\Pipe; +use Patchlevel\EventSourcing\Message\Stream; +use Patchlevel\EventSourcing\Message\StreamClosed; +use Patchlevel\EventSourcing\Message\StreamNotRewindable; use Patchlevel\EventSourcing\Message\Translator\ExcludeEventTranslator; use Patchlevel\EventSourcing\Message\Translator\RecalculatePlayheadTranslator; use Patchlevel\EventSourcing\Store\Header\PlayheadHeader; @@ -19,50 +21,94 @@ use PHPUnit\Framework\Attributes\CoversClass; use PHPUnit\Framework\TestCase; -use function iterator_to_array; - -#[CoversClass(Pipe::class)] -final class PipeTest extends TestCase +#[CoversClass(Stream::class)] +final class StreamTest extends TestCase { public function testEmpty(): void { - $stream = new Pipe([]); + $stream = new Stream(); + + self::assertSame(null, $stream->position()); + self::assertSame(null, $stream->current()); + self::assertSame(null, $stream->index()); + self::assertSame(true, $stream->end()); + } + + public function testOneMessageInList(): void + { + $message = Message::create( + new ProfileCreated( + ProfileId::fromString('foo'), + Email::fromString('info@patchlevel.de'), + ), + ); - $result = iterator_to_array($stream); + $stream = new Stream([$message]); - self::assertSame([], $result); + self::assertSame(0, $stream->position()); + self::assertSame(0, $stream->index()); + self::assertSame($message, $stream->current()); + self::assertSame(false, $stream->end()); + + $stream->next(); + + self::assertSame(0, $stream->position()); + self::assertSame(null, $stream->index()); + self::assertSame(null, $stream->current()); + self::assertSame(true, $stream->end()); } - public function testWithMessages(): void + public function testWithOffsetKey(): void { - $messages = $this->messages(); + $message = Message::create( + new ProfileCreated( + ProfileId::fromString('foo'), + Email::fromString('info@patchlevel.de'), + ), + ); + + $stream = new Stream([5 => $message]); - $stream = new Pipe($messages); + self::assertSame(0, $stream->position()); + self::assertSame(5, $stream->index()); + self::assertSame($message, $stream->current()); + self::assertSame(false, $stream->end()); - $resultMessages = iterator_to_array($stream); + $stream->next(); - self::assertSame($messages, $resultMessages); + self::assertSame(0, $stream->position()); + self::assertSame(null, $stream->index()); + self::assertSame(null, $stream->current()); + self::assertSame(true, $stream->end()); } - public function testToArray(): void + public function testClose(): void { - $messages = $this->messages(); + $this->expectException(StreamClosed::class); + + $message = Message::create( + new ProfileCreated( + ProfileId::fromString('foo'), + Email::fromString('info@patchlevel.de'), + ), + ); - $stream = new Pipe($messages); + $stream = new Stream([$message]); - self::assertSame($messages, $stream->toArray()); + $stream->close(); + $stream->index(); } - public function testWithOneMiddleware(): void + public function testWithOneTranslator(): void { $messages = $this->messages(); - $stream = new Pipe( - $messages, + $stream = new Stream($messages); + $stream = $stream->transform( new ExcludeEventTranslator([ProfileCreated::class]), ); - $resultMessages = iterator_to_array($stream); + $resultMessages = $stream->toList(); self::assertCount(3, $resultMessages); @@ -83,13 +129,13 @@ public function testWithMiddlewares(): void { $messages = $this->messages(); - $stream = new Pipe( - $messages, + $stream = new Stream($messages); + $stream = $stream->transform( new ExcludeEventTranslator([ProfileCreated::class]), new RecalculatePlayheadTranslator(), ); - $resultMessages = iterator_to_array($stream); + $resultMessages = $stream->toList(); self::assertCount(3, $resultMessages); @@ -106,6 +152,24 @@ public function testWithMiddlewares(): void self::assertSame(1, $resultMessages[2]->header(PlayheadHeader::class)->playhead); } + public function testTransformIsSinglePass(): void + { + $message = Message::create( + new ProfileCreated( + ProfileId::fromString('foo'), + Email::fromString('info@patchlevel.de'), + ), + ); + + $stream = new Stream([$message]); + $transformedStream = $stream->transform(); + + self::assertCount(1, $transformedStream->toList()); + + $this->expectException(StreamNotRewindable::class); + $transformedStream->toList(); + } + /** @return list */ private function messages(): array { diff --git a/tests/Unit/Repository/DefaultRepositoryTest.php b/tests/Unit/Repository/DefaultRepositoryTest.php index c3f4f8dcd..772ceb1dd 100644 --- a/tests/Unit/Repository/DefaultRepositoryTest.php +++ b/tests/Unit/Repository/DefaultRepositoryTest.php @@ -7,6 +7,7 @@ use DateTimeImmutable; use Patchlevel\EventSourcing\EventBus\EventBus; use Patchlevel\EventSourcing\Message\Message; +use Patchlevel\EventSourcing\Message\Stream; use Patchlevel\EventSourcing\Metadata\Event\AttributeEventMetadataFactory; use Patchlevel\EventSourcing\Repository\AggregateAlreadyExists; use Patchlevel\EventSourcing\Repository\AggregateDetached; @@ -20,7 +21,6 @@ use Patchlevel\EventSourcing\Snapshot\SnapshotNotFound; use Patchlevel\EventSourcing\Snapshot\SnapshotStore; use Patchlevel\EventSourcing\Store\ArchivedHeader; -use Patchlevel\EventSourcing\Store\ArrayStream; use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion; use Patchlevel\EventSourcing\Store\Criteria\Criteria; use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion; @@ -355,7 +355,7 @@ public function testLoadAggregate(): void ->with(new Criteria( new StreamCriterion('profile-1'), new ArchivedCriterion(false), - ))->willReturn(new ArrayStream([ + ))->willReturn(new Stream([ Message::create( new ProfileCreated( ProfileId::fromString('1'), @@ -387,7 +387,7 @@ public function testLoadAggregateTwice(): void new StreamCriterion('profile-1'), new ArchivedCriterion(false), ))->willReturn( - new ArrayStream([ + new Stream([ Message::create( new ProfileCreated( ProfileId::fromString('1'), @@ -398,7 +398,7 @@ public function testLoadAggregateTwice(): void ->withHeader(new PlayheadHeader(1)) ->withHeader(new RecordedOnHeader(new DateTimeImmutable())), ]), - new ArrayStream([ + new Stream([ Message::create( new ProfileCreated( ProfileId::fromString('1'), @@ -432,7 +432,7 @@ public function testAggregateNotFound(): void new StreamCriterion('profile-1'), new ArchivedCriterion(false), )) - ->willReturn(new ArrayStream()); + ->willReturn(new Stream()); $repository = new DefaultRepository($store, Profile::metadata()); @@ -488,7 +488,7 @@ public function testLoadAggregateWithSnapshot(): void new StreamCriterion('profile_with_snapshot-1'), new FromPlayheadCriterion(1), )) - ->willReturn(new ArrayStream()); + ->willReturn(new Stream()); $snapshotStore = $this->createMock(SnapshotStore::class); $snapshotStore->method('load')->with(ProfileWithSnapshot::class, $id)->willReturn($profile); @@ -519,7 +519,7 @@ public function testLoadAggregateWithSnapshotFirstTime(): void new ArchivedCriterion(false), )) ->willReturn( - new ArrayStream([ + new Stream([ Message::create( new ProfileCreated( ProfileId::fromString('1'), @@ -596,7 +596,7 @@ public function testLoadAggregateWithSnapshotAndSaveNewVersion(): void new StreamCriterion('profile_with_snapshot-1'), new FromPlayheadCriterion(1), )) - ->willReturn(new ArrayStream([ + ->willReturn(new Stream([ Message::create( new ProfileVisited( ProfileId::fromString('1'), @@ -660,7 +660,7 @@ public function testLoadAggregateWithoutSnapshot(): void new StreamCriterion('profile_with_snapshot-1'), new ArchivedCriterion(false), )) - ->willReturn(new ArrayStream([ + ->willReturn(new Stream([ Message::create( new ProfileCreated( ProfileId::fromString('1'), @@ -729,7 +729,7 @@ public function testLoadAggregateFromOtherStream(): void new StreamCriterion('other-1'), new ArchivedCriterion(false), )) - ->willReturn(new ArrayStream([ + ->willReturn(new Stream([ Message::create( new ProfileCreated( ProfileId::fromString('1'), @@ -755,7 +755,7 @@ public function testLoadInitializableAggregate(): void $store ->expects($this->once()) ->method('load') - ->willReturn(new ArrayStream([])); + ->willReturn(new Stream([])); $repository = new DefaultRepository($store, AutoInitializableProfile::metadata()); $aggregate = $repository->load(ProfileId::fromString('1')); diff --git a/tests/Unit/Store/ArrayStreamTest.php b/tests/Unit/Store/ArrayStreamTest.php deleted file mode 100644 index 35286bd29..000000000 --- a/tests/Unit/Store/ArrayStreamTest.php +++ /dev/null @@ -1,187 +0,0 @@ -position()); - self::assertSame(null, $stream->current()); - self::assertSame(null, $stream->index()); - self::assertSame(true, $stream->end()); - - $array = iterator_to_array($stream); - - self::assertSame([], $array); - } - - public function testOneMessage(): void - { - $message = Message::create( - new ProfileCreated( - ProfileId::fromString('foo'), - Email::fromString('info@patchlevel.de'), - ), - ); - - $messages = [$message]; - - $stream = new ArrayStream($messages); - - self::assertSame(1, $stream->index()); - self::assertSame(0, $stream->position()); - self::assertSame($message, $stream->current()); - self::assertSame(false, $stream->end()); - - $stream->next(); - - self::assertSame(1, $stream->index()); - self::assertSame(0, $stream->position()); - self::assertSame(null, $stream->current()); - self::assertSame(true, $stream->end()); - } - - public function testMultipleMessages(): void - { - $messages = [ - Message::create( - new ProfileCreated( - ProfileId::fromString('foo'), - Email::fromString('info@patchlevel.de'), - ), - ), - Message::create( - new ProfileCreated( - ProfileId::fromString('foo'), - Email::fromString('info@patchlevel.de'), - ), - ), - Message::create( - new ProfileCreated( - ProfileId::fromString('foo'), - Email::fromString('info@patchlevel.de'), - ), - ), - ]; - - $stream = new ArrayStream($messages); - - self::assertSame(1, $stream->index()); - self::assertSame(0, $stream->position()); - self::assertSame($messages[0], $stream->current()); - self::assertSame(false, $stream->end()); - - $stream->next(); - - self::assertSame(2, $stream->index()); - self::assertSame(1, $stream->position()); - self::assertSame($messages[1], $stream->current()); - self::assertSame(false, $stream->end()); - - $stream->next(); - - self::assertSame(3, $stream->index()); - self::assertSame(2, $stream->position()); - self::assertSame($messages[2], $stream->current()); - self::assertSame(false, $stream->end()); - - $stream->next(); - - self::assertSame(3, $stream->index()); - self::assertSame(2, $stream->position()); - self::assertSame(null, $stream->current()); - self::assertSame(true, $stream->end()); - } - - public function testWithNoList(): void - { - $message = Message::create( - new ProfileCreated( - ProfileId::fromString('foo'), - Email::fromString('info@patchlevel.de'), - ), - ); - - $messages = [5 => $message]; - - $stream = new ArrayStream($messages); - - self::assertSame(5, $stream->index()); - self::assertSame(0, $stream->position()); - self::assertSame($message, $stream->current()); - self::assertSame(false, $stream->end()); - - $stream->next(); - - self::assertSame(5, $stream->index()); - self::assertSame(0, $stream->position()); - self::assertSame(null, $stream->current()); - self::assertSame(true, $stream->end()); - } - - public function testClose(): void - { - $this->expectException(StreamClosed::class); - - $message = Message::create( - new ProfileCreated( - ProfileId::fromString('foo'), - Email::fromString('info@patchlevel.de'), - ), - ); - - $messages = [$message]; - - $stream = new ArrayStream($messages); - - self::assertSame(1, $stream->index()); - self::assertSame(0, $stream->position()); - self::assertSame($message, $stream->current()); - self::assertSame(false, $stream->end()); - - $stream->close(); - $stream->index(); - } - - public function testPositionEmpty(): void - { - $stream = new ArrayStream([]); - $position = $stream->position(); - - self::assertNull($position); - } - - public function testPosition(): void - { - $message = Message::create( - new ProfileCreated( - ProfileId::fromString('foo'), - Email::fromString('info@patchlevel.de'), - ), - ); - - $messages = [$message]; - - $stream = new ArrayStream($messages); - $position = $stream->position(); - - self::assertSame(0, $position); - } -} diff --git a/tests/Unit/Store/InMemoryStoreTest.php b/tests/Unit/Store/InMemoryStoreTest.php index 89d84f619..913ece46c 100644 --- a/tests/Unit/Store/InMemoryStoreTest.php +++ b/tests/Unit/Store/InMemoryStoreTest.php @@ -62,9 +62,7 @@ public function testLoadMessages(): void $stream = $store->load(); - $messages = iterator_to_array($stream); - - self::assertSame($expected, $messages); + self::assertSame($expected, $stream->toList()); } public function testLoadByStreamName(): void @@ -88,9 +86,7 @@ public function testLoadByStreamName(): void $stream = $store->load(new Criteria(new StreamCriterion('bar'))); - $messages = iterator_to_array($stream); - - self::assertSame([$message2], $messages); + self::assertSame([$message2], $stream->toList()); } public function testLoadByStreamNameWithLike(): void @@ -115,9 +111,7 @@ public function testLoadByStreamNameWithLike(): void $stream = $store->load(new Criteria(new StreamCriterion('bar-*'))); - $messages = iterator_to_array($stream); - - self::assertSame([$message2, $message3], $messages); + self::assertSame([$message2, $message3], $stream->toList()); } public function testLoadFromPlayhead(): void @@ -147,9 +141,7 @@ public function testLoadFromPlayhead(): void $stream = $store->load(new Criteria(new FromPlayheadCriterion(2))); - $messages = iterator_to_array($stream); - - self::assertSame([$message2, $message3], $messages); + self::assertSame([$message2, $message3], $stream->toList()); } public function testLoadFromIndex(): void @@ -179,7 +171,7 @@ public function testLoadFromIndex(): void $stream = $store->load(new Criteria(new FromIndexCriterion(2))); - $messages = iterator_to_array($stream); + $messages = $stream->toList(); self::assertCount(2, $messages); self::assertSame( @@ -226,7 +218,7 @@ public function testLoadToIndex(): void $stream = $store->load(new Criteria(new ToIndexCriterion(3))); - $messages = iterator_to_array($stream); + $messages = $stream->toList(); self::assertCount(2, $messages); self::assertSame( @@ -269,9 +261,7 @@ public function testLoadByStreamNameWithLikeAll(): void $stream = $store->load(new Criteria(new StreamCriterion('*'))); - $messages = iterator_to_array($stream); - - self::assertSame([$message1, $message2, $message3], $messages); + self::assertSame([$message1, $message2, $message3], $stream->toList()); } public function testLoadArchived(): void @@ -290,9 +280,7 @@ public function testLoadArchived(): void $stream = $store->load(new Criteria(new ArchivedCriterion(true))); - $messages = iterator_to_array($stream); - - self::assertSame([$message1], $messages); + self::assertSame([$message1], $stream->toList()); } public function testLoadByEventName(): void @@ -321,15 +309,13 @@ public function testLoadByEventName(): void ); $stream = $store->load(new Criteria(new EventsCriterion(['profile_created']))); - $messages = iterator_to_array($stream); - self::assertSame([$message1], $messages); + self::assertSame([$message1], $stream->toList()); $stream = $store->load(new Criteria(new EventsCriterion(['profile_visited']))); - $messages = iterator_to_array($stream); - self::assertSame([$message2, $message3], $messages); - self::assertSame([], iterator_to_array($store->load(new Criteria(new EventsCriterion(['profile_deleted']))))); + self::assertSame([$message2, $message3], $stream->toList()); + self::assertSame([], $store->load(new Criteria(new EventsCriterion(['profile_deleted'])))->toList()); } public function testLoadByEventNameWithoutRegistry(): void @@ -388,9 +374,7 @@ public function testLoadLimit(): void $stream = $store->load(null, 1); - $messages = iterator_to_array($stream); - - self::assertSame([$message1], $messages); + self::assertSame([$message1], $stream->toList()); } public function testLoadOffset(): void @@ -408,9 +392,7 @@ public function testLoadOffset(): void $stream = $store->load(null, null, 1); - $messages = iterator_to_array($stream); - - self::assertSame([$message2], $messages); + self::assertSame([$message2], $stream->toList()); } public function testLoadBackwards(): void @@ -428,9 +410,7 @@ public function testLoadBackwards(): void $stream = $store->load(null, null, null, true); - $messages = iterator_to_array($stream); - - self::assertSame([$message2, $message1], $messages); + self::assertSame([$message2, $message1], $stream->toList()); } public function testCount(): void @@ -469,9 +449,7 @@ public function testSaveEmpty(): void $stream = $store->load(); - $messages = iterator_to_array($stream); - - self::assertSame($expected, $messages); + self::assertSame($expected, $stream->toList()); } public function testSaveWithExistingMessages(): void @@ -498,9 +476,7 @@ public function testSaveWithExistingMessages(): void $stream = $store->load(); - $messages = iterator_to_array($stream); - - self::assertSame([...$startMessages, $message1], $messages); + self::assertSame([...$startMessages, $message1], $stream->toList()); } public function testSaveWithoutHeaders(): void @@ -509,7 +485,7 @@ public function testSaveWithoutHeaders(): void $store->save(new Message(new ProfileVisited(ProfileId::fromString('1')))); $stream = $store->load(); - $messages = iterator_to_array($stream); + $messages = $stream->toList(); self::assertCount(2, $messages); @@ -575,9 +551,7 @@ public function testRemove(): void $stream = $store->load(); - $messages = iterator_to_array($stream); - - self::assertSame([$message1, $message4], $messages); + self::assertSame([$message1, $message4], $stream->toList()); } public function testTransactional(): void @@ -658,8 +632,6 @@ public function testClear(): void $stream = $store->load(); - $messages = iterator_to_array($stream); - - self::assertSame([], $messages); + self::assertSame([], $stream->toList()); } } diff --git a/tests/Unit/Store/ReadOnlyStoreTest.php b/tests/Unit/Store/ReadOnlyStoreTest.php index 8adf4bd77..1112195b6 100644 --- a/tests/Unit/Store/ReadOnlyStoreTest.php +++ b/tests/Unit/Store/ReadOnlyStoreTest.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\Tests\Unit\Store; use Patchlevel\EventSourcing\Message\Message; +use Patchlevel\EventSourcing\Message\Stream; use Patchlevel\EventSourcing\Store\Criteria\Criteria; use Patchlevel\EventSourcing\Store\ReadOnlyStore; use Patchlevel\EventSourcing\Store\Store; @@ -19,11 +20,15 @@ public function testLoad(): void { $criteria = new Criteria(); + $stream = new Stream(); + $parentStore = $this->createMock(Store::class); - $parentStore->expects($this->atLeastOnce())->method('load')->with($criteria, 8, 42, true); + $parentStore->expects($this->atLeastOnce())->method('load')->with($criteria, 8, 42, true)->willReturn($stream); $store = new ReadOnlyStore($parentStore); - $store->load($criteria, 8, 42, true); + $result = $store->load($criteria, 8, 42, true); + + self::assertSame($stream, $result); } public function testCount(): void diff --git a/tests/Unit/Store/StreamDoctrineDbalStoreTest.php b/tests/Unit/Store/StreamDoctrineDbalStoreTest.php index 35d50426c..0c0501e46 100644 --- a/tests/Unit/Store/StreamDoctrineDbalStoreTest.php +++ b/tests/Unit/Store/StreamDoctrineDbalStoreTest.php @@ -459,7 +459,7 @@ public function testLoadWithOneEvent(): void iterator_to_array($stream); - self::assertSame(1, $stream->index()); + self::assertSame(null, $stream->index()); self::assertSame(0, $stream->position()); } diff --git a/tests/Unit/Store/StreamDoctrineDbalStreamTest.php b/tests/Unit/Store/StreamDoctrineDbalStreamTest.php deleted file mode 100644 index e15c3fac5..000000000 --- a/tests/Unit/Store/StreamDoctrineDbalStreamTest.php +++ /dev/null @@ -1,437 +0,0 @@ -createMock(EventSerializer::class); - $headersSerializer = $this->createMock(HeadersSerializer::class); - $platform = $this->createMock(AbstractPlatform::class); - - $result = $this->createMock(Result::class); - $result->expects($this->once())->method('iterateAssociative')->willReturn(new ArrayIterator()); - - $stream = new StreamDoctrineDbalStoreStream( - $result, - $eventSerializer, - $headersSerializer, - $platform, - ); - - self::assertSame(null, $stream->position()); - self::assertSame(null, $stream->current()); - self::assertSame(null, $stream->index()); - self::assertSame(true, $stream->end()); - - $this->expectException(Throwable::class); - iterator_to_array($stream); - } - - public function testOneMessage(): void - { - $messages = [ - [ - 'id' => 1, - 'event_id' => '1', - 'event_name' => 'profile_created', - 'event_payload' => '{}', - 'stream' => 'profile-1', - 'playhead' => 1, - 'recorded_on' => '2022-10-10 10:10:10', - 'archived' => '0', - 'new_stream_start' => '0', - 'custom_headers' => '{}', - ], - ]; - - $event = new ProfileCreated( - ProfileId::fromString('foo'), - Email::fromString('info@patchlevel.de'), - ); - $message = Message::create($event) - ->withHeader(new IndexHeader(1)) - ->withHeader(new StreamNameHeader('profile-1')) - ->withHeader(new PlayheadHeader(1)) - ->withHeader(new EventIdHeader('1')) - ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2022-10-10 10:10:10'))); - - $eventSerializer = $this->createMock(EventSerializer::class); - $eventSerializer->expects($this->once())->method('deserialize')->with(new SerializedEvent('profile_created', '{}')) - ->willReturn($event); - - $headersSerializer = $this->createMock(HeadersSerializer::class); - $headersSerializer->expects($this->once())->method('deserialize')->with('{}')->willReturn([]); - - $platform = $this->createMock(AbstractPlatform::class); - $platform->expects($this->once())->method('getDateTimeTzFormatString')->willReturn('Y-m-d H:i:s'); - - $result = $this->createMock(Result::class); - $result->expects($this->once())->method('iterateAssociative')->willReturn(new ArrayIterator($messages)); - - $stream = new StreamDoctrineDbalStoreStream( - $result, - $eventSerializer, - $headersSerializer, - $platform, - ); - - self::assertSame(1, $stream->index()); - self::assertSame(0, $stream->position()); - self::assertEquals($message, $stream->current()); - self::assertSame(false, $stream->end()); - - $stream->next(); - - self::assertSame(1, $stream->index()); - self::assertSame(0, $stream->position()); - self::assertSame(null, $stream->current()); - self::assertSame(true, $stream->end()); - } - - public function testMultipleMessages(): void - { - $messagesArray = [ - [ - 'id' => 1, - 'event_id' => '1', - 'event_name' => 'profile_created', - 'event_payload' => '{}', - 'stream' => 'profile-1', - 'playhead' => 1, - 'recorded_on' => '2022-10-10 10:10:10', - 'archived' => '0', - 'new_stream_start' => '0', - 'custom_headers' => '{}', - ], - [ - 'id' => 2, - 'event_id' => '2', - 'event_name' => 'profile_created2', - 'event_payload' => '{}', - 'stream' => 'profile-2', - 'playhead' => null, - 'recorded_on' => '2022-10-10 10:10:10', - 'archived' => '0', - 'new_stream_start' => '0', - 'custom_headers' => '{}', - ], - [ - 'id' => 3, - 'event_id' => '3', - 'event_name' => 'profile_created3', - 'event_payload' => '{}', - 'stream' => 'profile-3', - 'playhead' => 1, - 'recorded_on' => '2022-10-10 10:10:10', - 'archived' => '0', - 'new_stream_start' => '0', - 'custom_headers' => '{}', - ], - ]; - - $event = new ProfileCreated( - ProfileId::fromString('foo'), - Email::fromString('info@patchlevel.de'), - ); - - $messages = [ - Message::create($event) - ->withHeader(new IndexHeader(1)) - ->withHeader(new StreamNameHeader('profile-1')) - ->withHeader(new PlayheadHeader(1)) - ->withHeader(new EventIdHeader('1')) - ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2022-10-10 10:10:10'))), - Message::create($event) - ->withHeader(new IndexHeader(2)) - ->withHeader(new StreamNameHeader('profile-2')) - ->withHeader(new EventIdHeader('2')) - ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2022-10-10 10:10:10'))), - Message::create($event) - ->withHeader(new IndexHeader(3)) - ->withHeader(new StreamNameHeader('profile-3')) - ->withHeader(new PlayheadHeader(1)) - ->withHeader(new EventIdHeader('3')) - ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2022-10-10 10:10:10'))), - ]; - - $eventSerializer = $this->createMock(EventSerializer::class); - $eventSerializer - ->expects($this->exactly(3)) - ->method('deserialize') - ->willReturnCallback(new ReturnCallback([ - [ - [new SerializedEvent('profile_created', '{}'), []], - $event, - ], - [ - [new SerializedEvent('profile_created2', '{}'), []], - $event, - ], - [ - [new SerializedEvent('profile_created3', '{}'), []], - $event, - ], - ])); - - $headersSerializer = $this->createMock(HeadersSerializer::class); - $headersSerializer->expects($this->exactly(3))->method('deserialize')->with('{}')->willReturn([]); - - $platform = $this->createMock(AbstractPlatform::class); - $platform->expects($this->exactly(3))->method('getDateTimeTzFormatString')->willReturn('Y-m-d H:i:s'); - - $result = $this->createMock(Result::class); - $result->expects($this->once())->method('iterateAssociative')->willReturn(new ArrayIterator($messagesArray)); - - $stream = new StreamDoctrineDbalStoreStream( - $result, - $eventSerializer, - $headersSerializer, - $platform, - ); - - self::assertSame(1, $stream->index()); - self::assertSame(0, $stream->position()); - self::assertEquals($messages[0], $stream->current()); - self::assertSame(false, $stream->end()); - - $stream->next(); - - self::assertSame(2, $stream->index()); - self::assertSame(1, $stream->position()); - self::assertEquals($messages[1], $stream->current()); - self::assertSame(false, $stream->end()); - - $stream->next(); - - self::assertSame(3, $stream->index()); - self::assertSame(2, $stream->position()); - self::assertEquals($messages[2], $stream->current()); - self::assertSame(false, $stream->end()); - - $stream->next(); - - self::assertSame(3, $stream->index()); - self::assertSame(2, $stream->position()); - self::assertSame(null, $stream->current()); - self::assertSame(true, $stream->end()); - } - - public function testWithNoList(): void - { - $messages = [ - 5 => [ - 'id' => 5, - 'event_id' => '1', - 'event_name' => 'profile_created', - 'event_payload' => '{}', - 'stream' => 'profile-1', - 'playhead' => 1, - 'recorded_on' => '2022-10-10 10:10:10', - 'archived' => '0', - 'new_stream_start' => '0', - 'custom_headers' => '{}', - ], - ]; - - $event = new ProfileCreated( - ProfileId::fromString('foo'), - Email::fromString('info@patchlevel.de'), - ); - $message = Message::create($event) - ->withHeader(new IndexHeader(5)) - ->withHeader(new StreamNameHeader('profile-1')) - ->withHeader(new PlayheadHeader(1)) - ->withHeader(new EventIdHeader('1')) - ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2022-10-10 10:10:10'))); - - $eventSerializer = $this->createMock(EventSerializer::class); - $eventSerializer->expects($this->once())->method('deserialize')->with(new SerializedEvent('profile_created', '{}')) - ->willReturn($event); - - $headersSerializer = $this->createMock(HeadersSerializer::class); - $headersSerializer->expects($this->once())->method('deserialize')->with('{}')->willReturn([]); - - $platform = $this->createMock(AbstractPlatform::class); - $platform->expects($this->once())->method('getDateTimeTzFormatString')->willReturn('Y-m-d H:i:s'); - - $result = $this->createMock(Result::class); - $result->expects($this->once())->method('iterateAssociative')->willReturn(new ArrayIterator($messages)); - - $stream = new StreamDoctrineDbalStoreStream( - $result, - $eventSerializer, - $headersSerializer, - $platform, - ); - - self::assertSame(5, $stream->index()); - self::assertSame(0, $stream->position()); - self::assertEquals($message, $stream->current()); - self::assertSame(false, $stream->end()); - - $stream->next(); - - self::assertSame(5, $stream->index()); - self::assertSame(0, $stream->position()); - self::assertSame(null, $stream->current()); - self::assertSame(true, $stream->end()); - } - - public function testClose(): void - { - $messages = [ - [ - 'id' => 1, - 'event_id' => '1', - 'event_name' => 'profile_created', - 'event_payload' => '{}', - 'stream' => 'profile-1', - 'playhead' => 1, - 'recorded_on' => '2022-10-10 10:10:10', - 'archived' => '0', - 'new_stream_start' => '0', - 'custom_headers' => '{}', - ], - ]; - - $event = new ProfileCreated( - ProfileId::fromString('foo'), - Email::fromString('info@patchlevel.de'), - ); - $message = Message::create($event) - ->withHeader(new IndexHeader(1)) - ->withHeader(new StreamNameHeader('profile-1')) - ->withHeader(new PlayheadHeader(1)) - ->withHeader(new EventIdHeader('1')) - ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2022-10-10 10:10:10'))); - - $eventSerializer = $this->createMock(EventSerializer::class); - $eventSerializer->expects($this->once())->method('deserialize')->with(new SerializedEvent('profile_created', '{}')) - ->willReturn($event); - - $headersSerializer = $this->createMock(HeadersSerializer::class); - $headersSerializer->expects($this->once())->method('deserialize')->with('{}')->willReturn([]); - - $platform = $this->createMock(AbstractPlatform::class); - $platform->expects($this->once())->method('getDateTimeTzFormatString')->willReturn('Y-m-d H:i:s'); - - $result = $this->createMock(Result::class); - $result->expects($this->once())->method('iterateAssociative')->willReturn(new ArrayIterator($messages)); - $result->expects($this->once())->method('free'); - - $stream = new StreamDoctrineDbalStoreStream( - $result, - $eventSerializer, - $headersSerializer, - $platform, - ); - - self::assertSame(1, $stream->index()); - self::assertSame(0, $stream->position()); - self::assertEquals($message, $stream->current()); - self::assertSame(false, $stream->end()); - - $stream->close(); - - $this->expectException(StreamClosed::class); - $stream->index(); - } - - public function testPositionEmpty(): void - { - $eventSerializer = $this->createMock(EventSerializer::class); - $headersSerializer = $this->createMock(HeadersSerializer::class); - $platform = $this->createMock(AbstractPlatform::class); - - $result = $this->createMock(Result::class); - $result->expects($this->once())->method('iterateAssociative')->willReturn(new ArrayIterator()); - - $stream = new StreamDoctrineDbalStoreStream( - $result, - $eventSerializer, - $headersSerializer, - $platform, - ); - - $position = $stream->position(); - - self::assertNull($position); - } - - public function testPosition(): void - { - $messages = [ - [ - 'id' => 1, - 'event_id' => '1', - 'event_name' => 'profile_created', - 'event_payload' => '{}', - 'stream' => 'profile-1', - 'playhead' => 1, - 'recorded_on' => '2022-10-10 10:10:10', - 'archived' => '0', - 'new_stream_start' => '0', - 'custom_headers' => '{}', - ], - ]; - - $event = new ProfileCreated( - ProfileId::fromString('foo'), - Email::fromString('info@patchlevel.de'), - ); - - $eventSerializer = $this->createMock(EventSerializer::class); - $eventSerializer->expects($this->once())->method('deserialize')->with(new SerializedEvent('profile_created', '{}')) - ->willReturn($event); - - $headersSerializer = $this->createMock(HeadersSerializer::class); - $headersSerializer->expects($this->once())->method('deserialize')->with('{}')->willReturn([]); - - $platform = $this->createMock(AbstractPlatform::class); - $platform->expects($this->once())->method('getDateTimeTzFormatString')->willReturn('Y-m-d H:i:s'); - - $result = $this->createMock(Result::class); - $result->expects($this->once())->method('iterateAssociative')->willReturn(new ArrayIterator($messages)); - - $stream = new StreamDoctrineDbalStoreStream( - $result, - $eventSerializer, - $headersSerializer, - $platform, - ); - - $position = $stream->position(); - - self::assertSame(0, $position); - } -} diff --git a/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php b/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php index 18a69148a..45f01aec2 100644 --- a/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php +++ b/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php @@ -13,7 +13,7 @@ use Patchlevel\EventSourcing\Attribute\Subscriber; use Patchlevel\EventSourcing\Attribute\Teardown; use Patchlevel\EventSourcing\Message\Message; -use Patchlevel\EventSourcing\Store\ArrayStream; +use Patchlevel\EventSourcing\Message\Stream; use Patchlevel\EventSourcing\Subscription\Cleanup\Cleaner; use Patchlevel\EventSourcing\Subscription\Cleanup\CleanupFailed; use Patchlevel\EventSourcing\Subscription\Cleanup\CleanupTaskHandler; @@ -594,7 +594,7 @@ public function handle(Message $message): void $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(null)->willReturn(new Stream([1 => $message])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -653,7 +653,7 @@ public function handle(Message $message): void $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(null)->willReturn(new Stream([1 => $message])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -720,7 +720,7 @@ public function handle(Message $message): void $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(null)->willReturn(new Stream([1 => $message])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -792,7 +792,7 @@ public function onFailed(): void $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(null)->willReturn(new Stream([1 => $message])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -860,7 +860,7 @@ public function onFailed(): void $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(null)->willReturn(new Stream([1 => $message])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -952,7 +952,7 @@ public function forceCommit(): bool $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(null)->willReturn(new Stream([1 => $message])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -1015,7 +1015,7 @@ public function handle(Message $message): void $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([1 => $message])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -1090,7 +1090,7 @@ public function handle(Message $message): void $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(null)->willReturn(new Stream([1 => $message])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -1154,7 +1154,7 @@ public function handle(Message $message): void $message2 = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([ + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([ 1 => $message1, 3 => $message2, ])); @@ -1211,7 +1211,7 @@ public function handle(Message $message): void $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message1])); + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([1 => $message1])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -1265,7 +1265,7 @@ public function handle(): void $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message1])); + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([1 => $message1])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -1314,11 +1314,11 @@ public function handle(Message $message): void ->willReturnCallback(new ReturnCallback([ [ [0, [$subscription]], - new ArrayStream([$message]), + new Stream([1 => $message]), ], [ [1, [$subscription]], - new ArrayStream([]), + new Stream([]), ], ])); @@ -1383,7 +1383,7 @@ public function testBootWithoutSubscriber(): void $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([1 => $message])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -1417,7 +1417,7 @@ public function testBootBatchingSuccess(): void $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([1 => $message])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -1469,9 +1469,9 @@ public function testBootBatchingSuccessForceCommit(): void $message2 = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([ - $message1, - $message2, + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([ + 1 => $message1, + 2 => $message2, ])); $engine = new DefaultSubscriptionEngine( @@ -1523,7 +1523,7 @@ public function testBootBatchingWithHandleError(): void $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([1 => $message])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -1582,7 +1582,7 @@ public function testBootBatchingWithBeginBatchError(): void $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([1 => $message])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -1641,7 +1641,7 @@ public function testBootBatchingWithCommitBatchError(): void $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([1 => $message])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -1701,7 +1701,7 @@ public function testBootBatchingWithRollbackBatchError(): void $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([1 => $message])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -1846,7 +1846,7 @@ public function handle(Message $message): void $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([1 => $message])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -1902,7 +1902,7 @@ public function handle(Message $message): void $streamableStore = $this->createMock(MessageLoader::class); $streamableStore->expects($this->once())->method('load')->with(0) - ->willReturn(new ArrayStream([$message1, $message2])); + ->willReturn(new Stream([1 => $message1, 2 => $message2])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -1975,7 +1975,7 @@ public function handle(Message $message): void $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(null)->willReturn(new Stream([1 => $message])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -2040,7 +2040,7 @@ public function handle(Message $message): void $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([1 => $message])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -2107,7 +2107,7 @@ public function handle(Message $message): void $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([1 => $message])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -2179,7 +2179,7 @@ public function onFailed(): void $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([1 => $message])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -2247,7 +2247,7 @@ public function onFailed(): void $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([1 => $message])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -2384,7 +2384,7 @@ public function handle(Message $message): void $message2 = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([ + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([ 1 => $message1, 3 => $message2, ])); @@ -2441,7 +2441,7 @@ public function handle(Message $message): void $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message1])); + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([1 => $message1])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -2495,7 +2495,7 @@ public function handle(): void $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message1])); + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([1 => $message1])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -2544,11 +2544,11 @@ public function handle(Message $message): void ->willReturnCallback(new ReturnCallback([ [ [0, [$subscription]], - new ArrayStream([$message]), + new Stream([1 => $message]), ], [ [1, [$subscription]], - new ArrayStream([]), + new Stream([]), ], ])); @@ -2604,7 +2604,7 @@ public function testRunningBatchingSuccess(): void $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([1 => $message])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -2655,9 +2655,9 @@ public function testRunningBatchingSuccessForceCommit(): void $message2 = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([ - $message1, - $message2, + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([ + 1 => $message1, + 2 => $message2, ])); $engine = new DefaultSubscriptionEngine( @@ -2708,7 +2708,7 @@ public function testRunningBatchingWithHandleError(): void $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([1 => $message])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -2767,7 +2767,7 @@ public function testRunningBatchingWithBeginBatchError(): void $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([1 => $message])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -2826,7 +2826,7 @@ public function testRunningBatchingWithCommitBatchError(): void $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([1 => $message])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -2886,7 +2886,7 @@ public function testRunningBatchingWithRollbackBatchError(): void $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([1 => $message])); $engine = new DefaultSubscriptionEngine( $streamableStore, @@ -4305,7 +4305,7 @@ public function subscribe(): void $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->createMock(MessageLoader::class); - $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new ArrayStream([$message])); + $streamableStore->expects($this->once())->method('load')->with(0)->willReturn(new Stream([1 => $message])); $subscription = new Subscription( $subscriptionId, diff --git a/tests/Unit/Subscription/Engine/GapResolverStoreMessageLoaderTest.php b/tests/Unit/Subscription/Engine/GapResolverStoreMessageLoaderTest.php index 783d90495..e18e166d6 100644 --- a/tests/Unit/Subscription/Engine/GapResolverStoreMessageLoaderTest.php +++ b/tests/Unit/Subscription/Engine/GapResolverStoreMessageLoaderTest.php @@ -6,12 +6,11 @@ use DateTimeImmutable; use Patchlevel\EventSourcing\Message\Message; -use Patchlevel\EventSourcing\Store\ArrayStream; +use Patchlevel\EventSourcing\Message\Stream; use Patchlevel\EventSourcing\Store\Criteria\Criteria; use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion; use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader; use Patchlevel\EventSourcing\Store\Store; -use Patchlevel\EventSourcing\Store\Stream; use Patchlevel\EventSourcing\Subscription\Engine\GapResolverStoreMessageLoader; use PHPUnit\Framework\TestCase; use Psr\Clock\ClockInterface; @@ -23,7 +22,7 @@ final class GapResolverStoreMessageLoaderTest extends TestCase public function testEmpty(): void { $store = $this->createMock(Store::class); - $store->method('load')->with(new Criteria(new FromIndexCriterion(0)))->willReturn(new ArrayStream([])); + $store->method('load')->with(new Criteria(new FromIndexCriterion(0)))->willReturn(new Stream([])); $loader = new GapResolverStoreMessageLoader($store); @@ -40,7 +39,7 @@ public function testNoGap(): void ->expects($this->once()) ->method('load') ->with(new Criteria(new FromIndexCriterion(0))) - ->willReturn(new ArrayStream([ + ->willReturn(new Stream([ 1 => new Message(new stdClass()), 2 => new Message(new stdClass()), 3 => new Message(new stdClass()), @@ -67,7 +66,7 @@ public function testNoGapFromHigherIndex(): void ->expects($this->once()) ->method('load') ->with(new Criteria(new FromIndexCriterion(5))) - ->willReturn(new ArrayStream([ + ->willReturn(new Stream([ 6 => new Message(new stdClass()), 7 => new Message(new stdClass()), 8 => new Message(new stdClass()), @@ -98,12 +97,12 @@ public function testWithGapAndFill(): void ->method('load') ->willReturnCallback( static fn (Criteria $criteria) => match ($criteria->get(FromIndexCriterion::class)->fromIndex) { - 5 => new ArrayStream([ + 5 => new Stream([ 6 => new Message(new stdClass()), 7 => new Message(new stdClass()), 9 => new Message(new stdClass()), ]), - 7 => new ArrayStream([ + 7 => new Stream([ 8 => new Message(new stdClass()), 9 => new Message(new stdClass()), ]), @@ -131,12 +130,12 @@ public function testWithGapWithoutFill(): void ->expects($this->exactly(5)) ->method('load') ->willReturnCallback(static fn (Criteria $criteria) => match ($criteria->get(FromIndexCriterion::class)->fromIndex) { - 5 => new ArrayStream([ + 5 => new Stream([ 6 => new Message(new stdClass()), 7 => new Message(new stdClass()), 9 => new Message(new stdClass()), ]), - 7 => new ArrayStream([ + 7 => new Stream([ 9 => new Message(new stdClass()), ]), default => new RuntimeException('Unmatched case!') @@ -166,12 +165,12 @@ public function testGapAndInDetectionWindowForRecordedOnHeader(): void ->expects($this->exactly(5)) ->method('load') ->willReturnCallback(fn (Criteria $criteria) => match ($criteria->get(FromIndexCriterion::class)->fromIndex) { - 5 => new ArrayStream([ + 5 => new Stream([ 6 => $this->createMessageWithRecordedOn(new DateTimeImmutable('2023-10-01 00:00:00')), 7 => $this->createMessageWithRecordedOn(new DateTimeImmutable('2023-10-01 00:00:01')), 9 => $this->createMessageWithRecordedOn(new DateTimeImmutable('2023-10-01 00:00:02')), ]), - 7 => new ArrayStream([ + 7 => new Stream([ 9 => $this->createMessageWithRecordedOn(new DateTimeImmutable('2023-10-01 00:00:02')), ]), default => new RuntimeException('Unmatched case!') @@ -197,7 +196,7 @@ public function testGapAndNotInDetectionWindowForRecordedOnHeader(): void $clock->expects($this->exactly(1))->method('now')->willReturn(new DateTimeImmutable('2023-12-01 00:00:00')); - $store->expects($this->once())->method('load')->with(new Criteria(new FromIndexCriterion(5)))->willReturn(new ArrayStream([ + $store->expects($this->once())->method('load')->with(new Criteria(new FromIndexCriterion(5)))->willReturn(new Stream([ 6 => $this->createMessageWithRecordedOn(new DateTimeImmutable('2023-10-01 00:00:00')), 7 => $this->createMessageWithRecordedOn(new DateTimeImmutable('2023-10-01 00:00:01')), 9 => $this->createMessageWithRecordedOn(new DateTimeImmutable('2023-10-01 00:00:02')), diff --git a/tests/Unit/Subscription/Engine/SubscriptionCollectionTest.php b/tests/Unit/Subscription/Engine/SubscriptionCollectionTest.php index 5c7d2bdf5..fb3e2e715 100644 --- a/tests/Unit/Subscription/Engine/SubscriptionCollectionTest.php +++ b/tests/Unit/Subscription/Engine/SubscriptionCollectionTest.php @@ -18,7 +18,7 @@ public function testEmpty(): void self::assertCount(0, $collection); self::assertEquals([], iterator_to_array($collection)); - self::assertEquals(0, $collection->lowestPosition()); + self::assertSame(null, $collection->lowestPosition()); } public function testSomeSubscription(): void @@ -30,7 +30,7 @@ public function testSomeSubscription(): void self::assertCount(2, $collection); self::assertEquals([$subscription1, $subscription2], iterator_to_array($collection)); - self::assertEquals(5, $collection->lowestPosition()); + self::assertSame(5, $collection->lowestPosition()); } public function testRemove(): void @@ -43,6 +43,16 @@ public function testRemove(): void self::assertCount(1, $collection); self::assertEquals([$subscription2], iterator_to_array($collection)); - self::assertEquals(10, $collection->lowestPosition()); + self::assertSame(10, $collection->lowestPosition()); + } + + public function testNullPositionStaysNullAsLowestPosition(): void + { + $subscription1 = new Subscription('foo', position: null); + $subscription2 = new Subscription('bar', position: 10); + + $collection = new SubscriptionCollection([$subscription1, $subscription2]); + + self::assertSame(null, $collection->lowestPosition()); } } diff --git a/tests/Unit/Subscription/Lookup/LookupTest.php b/tests/Unit/Subscription/Lookup/LookupTest.php index 7fe108fde..9f8dacdc0 100644 --- a/tests/Unit/Subscription/Lookup/LookupTest.php +++ b/tests/Unit/Subscription/Lookup/LookupTest.php @@ -6,8 +6,8 @@ use Patchlevel\EventSourcing\Message\HeaderNotFound; use Patchlevel\EventSourcing\Message\Message; +use Patchlevel\EventSourcing\Message\Stream; use Patchlevel\EventSourcing\Metadata\Event\EventRegistry; -use Patchlevel\EventSourcing\Store\ArrayStream; use Patchlevel\EventSourcing\Store\Criteria\Criteria; use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion; use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion; @@ -42,7 +42,7 @@ public function testMissingIndexHeader(): void public function testEmpty(): void { - $expectedResult = new ArrayStream([]); + $expectedResult = new Stream([]); $store = $this->createMock(Store::class); $expectedCriteria = new Criteria(new ToIndexCriterion(1)); @@ -68,7 +68,7 @@ public function testEmpty(): void public function testEvents(): void { - $expectedResult = new ArrayStream([]); + $expectedResult = new Stream([]); $expectedCriteria = new Criteria( new EventsCriterion(['foo']), new ToIndexCriterion(1), @@ -96,7 +96,7 @@ public function testEvents(): void public function testEventClasses(): void { - $expectedResult = new ArrayStream([]); + $expectedResult = new Stream([]); $expectedCriteria = new Criteria( new EventsCriterion(['foo', 'profile_created']), new ToIndexCriterion(1), @@ -125,7 +125,7 @@ public function testEventClasses(): void public function testBackwards(): void { - $expectedResult = new ArrayStream([]); + $expectedResult = new Stream([]); $expectedCriteria = new Criteria( new ToIndexCriterion(1), ); @@ -152,7 +152,7 @@ public function testBackwards(): void public function testStream(): void { - $expectedResult = new ArrayStream([]); + $expectedResult = new Stream([]); $expectedCriteria = new Criteria( new StreamCriterion('foo'), new ToIndexCriterion(1), @@ -180,7 +180,7 @@ public function testStream(): void public function testCurrentStream(): void { - $expectedResult = new ArrayStream([]); + $expectedResult = new Stream([]); $expectedCriteria = new Criteria( new ToIndexCriterion(1), new StreamCriterion('foo'), @@ -215,7 +215,7 @@ public function testFetchFirst(): void $message2 = new Message(new class () { }); - $expectedResult = new ArrayStream([ + $expectedResult = new Stream([ $message1, $message2, ]); @@ -252,7 +252,7 @@ public function testFetchLast(): void $message2 = new Message(new class () { }); - $expectedResult = new ArrayStream([ + $expectedResult = new Stream([ $message2, $message1, ]);