Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions docs/guide/en/middleware-pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,3 @@ See [Configuration with yiisoft/config](configuration-with-config.md) for exampl
When configuring the component manually, you instantiate the middleware dispatchers and pass them to `Queue` / `Worker`.

See [Manual configuration](configuration-manual.md) for a full runnable example.

## Runtime overrides

You can override middleware stacks at runtime:

- `Queue::withMiddlewares(...)` replaces the whole push middleware stack for that queue instance.
- `Queue::withMiddlewaresAdded(...)` appends middlewares to the existing stack.

These methods affect only the push pipeline of the `Queue` instance they are called on.
10 changes: 0 additions & 10 deletions src/Debug/QueueDecorator.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,4 @@ public function getName(): string
{
return $this->queue->getName();
}

public function withMiddlewares(mixed ...$middlewareDefinitions): self
{
return new self($this->queue->withMiddlewares(...$middlewareDefinitions), $this->collector);
}

public function withMiddlewaresAdded(mixed ...$middlewareDefinitions): self
{
return new self($this->queue->withMiddlewaresAdded(...$middlewareDefinitions), $this->collector);
}
}
47 changes: 5 additions & 42 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,14 @@

final class Queue implements QueueInterface
{
/**
* @var mixed[] Queue-specific middleware definitions.
*/
private array $middlewareDefinitions;

private string $name;

/**
* @var PushMiddlewareDispatcher The dispatcher used for push messages, combining base dispatcher middleware with
* queue-specific middleware.
* @var PushMiddlewareDispatcher The dispatcher used for push messages, combining common middleware from
* {@see PushMiddlewareConfig} with queue-specific middleware.
*/
private PushMiddlewareDispatcher $dispatcher;

/**
* @var PushMiddlewareDispatcher The base dispatcher built from {@see PushMiddlewareConfig}.
* Holds the common middleware applied to all queues.
*/
private PushMiddlewareDispatcher $baseDispatcher;

/**
* @param WorkerInterface $worker The worker that processes messages.
* @param LoopInterface $loop The loop for controlling message processing.
Expand All @@ -59,19 +48,16 @@ public function __construct(
mixed ...$middlewareDefinitions,
) {
$this->name = StringNormalizer::normalize($name);
$this->baseDispatcher = new PushMiddlewareDispatcher(
$this->dispatcher = (new PushMiddlewareDispatcher(
$middlewareConfig->middlewareFactory,
$middlewareConfig->commonMiddlewareDefinitions,
$this->createFinalPushHandler(),
);
$this->setMiddlewaresAndPrepareDispatcher($middlewareDefinitions);
))->withMiddlewaresAdded($middlewareDefinitions);
}

public function __clone()
{
$finalPushHandler = $this->createFinalPushHandler();
$this->baseDispatcher = $this->baseDispatcher->withFinishHandler($finalPushHandler);
$this->dispatcher = $this->dispatcher->withFinishHandler($finalPushHandler);
$this->dispatcher = $this->dispatcher->withFinishHandler($this->createFinalPushHandler());
}

public function getName(): string
Expand Down Expand Up @@ -164,29 +150,6 @@ public function status(string|int $id): MessageStatus
return $this->adapter->status($id);
}

public function withMiddlewares(mixed ...$middlewareDefinitions): self
{
$instance = clone $this;
$instance->setMiddlewaresAndPrepareDispatcher($middlewareDefinitions);
return $instance;
}

public function withMiddlewaresAdded(mixed ...$middlewareDefinitions): self
{
$instance = clone $this;
$instance->setMiddlewaresAndPrepareDispatcher([...array_values($instance->middlewareDefinitions), ...array_values($middlewareDefinitions)]);
return $instance;
}

/**
* @param mixed[] $middlewareDefinitions
*/
private function setMiddlewaresAndPrepareDispatcher(array $middlewareDefinitions): void
{
$this->middlewareDefinitions = $middlewareDefinitions;
$this->dispatcher = $this->baseDispatcher->withMiddlewaresAdded($middlewareDefinitions);
}

private function handle(MessageInterface $message): bool
{
$this->worker->process($message, $this);
Expand Down
14 changes: 0 additions & 14 deletions src/QueueInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,4 @@ public function status(string|int $id): MessageStatus;
* Returns the logical name of the queue.
*/
public function getName(): string;

/**
* Creates a new instance with the specified middlewares. All the existing middlewares are replaced.
*
* @param mixed ...$middlewareDefinitions The middleware definitions.
*/
public function withMiddlewares(mixed ...$middlewareDefinitions): self;

/**
* Creates a new instance with the specified middlewares added after the existing ones.
*
* @param mixed ...$middlewareDefinitions The middleware definitions.
*/
public function withMiddlewaresAdded(mixed ...$middlewareDefinitions): self;
}
10 changes: 0 additions & 10 deletions stubs/StubQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,4 @@ public function getName(): string
{
return $this->name;
}

public function withMiddlewares(mixed ...$middlewareDefinitions): self
{
return $this;
}

public function withMiddlewaresAdded(mixed ...$middlewareDefinitions): self
{
return $this;
}
}
11 changes: 6 additions & 5 deletions tests/Integration/MiddlewareTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,13 @@ public function testFullStackPush(): void
$this->createMock(LoopInterface::class),
$this->createMock(LoggerInterface::class),
$pushMiddlewareConfig,
name: 'test',
null,
'test',
new TestMiddleware('channel 1'),
new TestMiddleware('channel 2'),
new TestMiddleware('channel 3'),
new TestMiddleware('channel 4'),
);
$queue = $queue
->withMiddlewares(new TestMiddleware('Won\'t be executed'))
->withMiddlewares(new TestMiddleware('channel 1'), new TestMiddleware('channel 2'))
->withMiddlewaresAdded(new TestMiddleware('channel 3'), new TestMiddleware('channel 4'));

$message = new GenericMessage('test', ['initial']);
$messagePushed = $queue->push($message);
Expand Down
Loading