diff --git a/CHANGELOG.md b/CHANGELOG.md index 21a5f1d..7d6d9d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +3.4.0 (unreleased) +===== + +* (improvement) Require Symfony 8.1+ +* (feature) Use [Symfonys native message deduplication](https://symfony.com/doc/current/messenger.html#message-deduplication) + + 3.3.2 ===== diff --git a/composer.json b/composer.json index 64e09f8..a975ad1 100644 --- a/composer.json +++ b/composer.json @@ -13,33 +13,33 @@ "require": { "php": ">= 8.5", "21torr/bundle-helpers": "^2.3.2", - "21torr/cli": "^1.2.3", - "21torr/hosting": "^4.1.1", - "21torr/snail": "^1.0.1", + "21torr/cli": "^1.3.0", + "21torr/hosting": "^4.1.2", + "21torr/snail": "^1.0.2", "doctrine/collections": "^2.6", "doctrine/doctrine-bundle": "^3.2.2", "doctrine/orm": "^3.6", "dragonmantank/cron-expression": "^3.6", "psr/log": "^3.0", - "symfony/clock": "^7.4 || ^8.0", - "symfony/config": "^7.4 || ^8.0", - "symfony/console": "^7.4 || ^8.0", - "symfony/dependency-injection": "^7.4 || ^8.0", - "symfony/event-dispatcher": "^7.4 || ^8.0", - "symfony/http-kernel": "^7.4 || ^8.0", - "symfony/lock": "^7.4 || ^8.0", - "symfony/messenger": "^7.4 || ^8.0", - "symfony/scheduler": "^7.4 || ^8.0", - "symfony/serializer": "^7.4 || ^8.0", - "symfony/string": "^7.4 || ^8.0", - "symfony/uid": "^7.4 || ^8.0" + "symfony/clock": "^8.1", + "symfony/config": "^8.1", + "symfony/console": "^8.1", + "symfony/dependency-injection": "^8.1", + "symfony/event-dispatcher": "^8.1", + "symfony/http-kernel": "^8.1", + "symfony/lock": "^8.1", + "symfony/messenger": "^8.1", + "symfony/scheduler": "^8.1", + "symfony/serializer": "^8.1", + "symfony/string": "^8.1", + "symfony/uid": "^8.1" }, "require-dev": { - "21torr/janus": "^2.0.3", + "21torr/janus": "^2.1.1", "bamarni/composer-bin-plugin": "^1.9.1", - "phpunit/phpunit": "^13.0.5", + "phpunit/phpunit": "^13.2.1", "roave/security-advisories": "dev-latest", - "symfony/translation-contracts": "^3.6.1" + "symfony/translation-contracts": "^3.7.0" }, "autoload": { "psr-4": { diff --git a/src/Manager/TaskManager.php b/src/Manager/TaskManager.php index b1be0f3..59e2023 100644 --- a/src/Manager/TaskManager.php +++ b/src/Manager/TaskManager.php @@ -4,11 +4,8 @@ use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Stamp\DeduplicateStamp; use Symfony\Component\Messenger\Stamp\StampInterface; -use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface; -use Symfony\Component\Messenger\Transport\Sync\SyncTransport; -use Symfony\Component\Scheduler\Messenger\SchedulerTransport; -use Torr\TaskManager\Exception\Transport\InvalidMessageTransportException; use Torr\TaskManager\Task\Task; use Torr\TaskManager\Transport\TransportsHelper; @@ -32,78 +29,18 @@ public function __construct ( */ public function enqueue (Task $task, array $stamps = []) : bool { - // if we find a message with the same unique task id, we don't queue it again - if ($this->isTaskWithSameTaskIdAlreadyQueued($task->getMetaData()->uniqueTaskId)) - { - return false; - } - - $envelope = new Envelope($task, $stamps); - - $this->messageBus->dispatch($envelope); - - return true; - } - - /** - * Finds a queued message with the given job id - */ - private function isTaskWithSameTaskIdAlreadyQueued (?string $uniqueTaskId) : bool - { - // no task id, so this task is not deduplicated. No need to check anything, just enqueue it. - if (null === $uniqueTaskId) - { - return false; - } - - foreach ($this->transportsHelper->getOrderedQueueNames() as $queueName) - { - foreach ($this->fetchTasksInQueue($queueName) as $envelope) - { - $message = $envelope->getMessage(); - - if ($message instanceof Task && $message->getMetaData()->uniqueTaskId === $uniqueTaskId) - { - return true; - } - } - } - - return false; - } - - /** - * Fetches all tasks for the given priority - * - * @return iterable - * - * @api - */ - public function fetchTasksInQueue (string $queueName) : iterable - { - $receiver = $this->transportsHelper->getTransport($queueName); - - // ignore schedulers - if ($receiver instanceof SchedulerTransport) - { - return []; - } + $uniqueTaskId = $task->getMetaData()->uniqueTaskId; - // skip, as sync transports can't queue messages like regular transports - if ($receiver instanceof SyncTransport) + if (null !== $uniqueTaskId) { - return []; + $stamps[] = new DeduplicateStamp($uniqueTaskId); } - if (!$receiver instanceof ListableReceiverInterface) - { - throw new InvalidMessageTransportException(\sprintf( - "Transport for queue '%s' must implement ListableReceiverInterface", - $queueName, - )); - } + $this->messageBus->dispatch( + new Envelope($task, $stamps), + ); - return $receiver->all(); + return true; } /** diff --git a/tests/Entity/TaskRunTest.php b/tests/Entity/TaskRunTest.php index a9d1aef..15c75ab 100644 --- a/tests/Entity/TaskRunTest.php +++ b/tests/Entity/TaskRunTest.php @@ -4,6 +4,7 @@ use PHPUnit\Framework\TestCase; use Psr\Log\LoggerInterface; +use Symfony\Component\Clock\Test\ClockSensitiveTrait; use Torr\TaskManager\Entity\TaskLog; use Torr\TaskManager\Entity\TaskRun; use Torr\TaskManager\Task\Task; @@ -14,8 +15,9 @@ */ final class TaskRunTest extends TestCase { - // region Helpers + use ClockSensitiveTrait; + // region Helpers private function createLog () : TaskLog { // @phpstan-ignore-next-line 21torr.custom.task.suffix @@ -45,14 +47,16 @@ public function testInitialStateIsUnfinished () : void public function testFinishMarksAsSuccessful () : void { + self::mockTime("2026-06-16 12:00:00"); $run = new TaskRun($this->createLog()); + self::mockTime("2026-06-16 12:00:05"); $run->finish(true, "some output"); self::assertTrue($run->isFinished); self::assertTrue($run->success); self::assertSame("some output", $run->output); self::assertTrue($run->finishedProperly); - self::assertGreaterThan(0, $run->duration); + self::assertEquals(5e9, $run->duration); } public function testFinishMarksAsFailure () : void diff --git a/tests/Manager/TaskManagerTest.php b/tests/Manager/TaskManagerTest.php index fba744c..b29c292 100644 --- a/tests/Manager/TaskManagerTest.php +++ b/tests/Manager/TaskManagerTest.php @@ -6,6 +6,7 @@ use Symfony\Component\DependencyInjection\ServiceLocator; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Stamp\DeduplicateStamp; use Symfony\Component\Messenger\Stamp\StampInterface; use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface; use Symfony\Component\Messenger\Transport\TransportInterface; @@ -21,7 +22,6 @@ final class TaskManagerTest extends TestCase { // region Helpers - private function createTask (?string $uniqueTaskId = null) : Task { // @phpstan-ignore-next-line 21torr.custom.task.suffix @@ -46,7 +46,7 @@ public function getMetaData () : TaskMetaData * * @param Envelope[] $envelopes */ - private function createListableTransport (array $envelopes = []) : TransportInterface&ListableReceiverInterface + private function createListableTransport (array $envelopes = []) : ListableReceiverInterface|TransportInterface { return new class($envelopes) implements TransportInterface, ListableReceiverInterface { /** @param Envelope[] $envelopes */ @@ -112,46 +112,17 @@ public function testEnqueueReturnsTrueWhenNoConflict () : void public function testEnqueueReturnsFalseWhenDuplicateInQueue () : void { - $existingTask = $this->createTask("test.task"); - $transport = $this->createListableTransport([new Envelope($existingTask)]); - - $bus = $this->createMock(MessageBusInterface::class); - $bus->expects(self::never())->method("dispatch"); - - $manager = $this->createManager(["queue" => $transport], $bus); - $newTask = $this->createTask("test.task"); - - self::assertFalse($manager->enqueue($newTask)); - } - - public function testEnqueueScansMultipleQueuesForDuplicate () : void - { - $existingTask = $this->createTask("test.task"); - $emptyTransport = $this->createListableTransport([]); - $fullTransport = $this->createListableTransport([new Envelope($existingTask)]); - - $bus = $this->createMock(MessageBusInterface::class); - $bus->expects(self::never())->method("dispatch"); - - $manager = $this->createManager([ - "queue_a" => $emptyTransport, - "queue_b" => $fullTransport, - ], $bus); - - self::assertFalse($manager->enqueue($this->createTask("test.task"))); - } - - public function testEnqueueDispatchesWhenDifferentUniqueTaskIdInQueue () : void - { - $otherTask = $this->createTask("other.task"); - $transport = $this->createListableTransport([new Envelope($otherTask)]); + $bus = self::createMock(MessageBusInterface::class); - $bus = $this->createMock(MessageBusInterface::class); - $bus->expects(self::once())->method("dispatch")->willReturnArgument(0); - - $manager = $this->createManager(["queue" => $transport], $bus); + $bus->expects(self::once()) + ->method("dispatch") + ->with(self::callback( + static fn (Envelope $envelope) : bool => "unique.key" === $envelope->last(DeduplicateStamp::class)?->getKey()->__toString(), + )) + ->willReturnArgument(0); - self::assertTrue($manager->enqueue($this->createTask("test.task"))); + $manager = $this->createManager(["queue" => $this->createListableTransport()], $bus); + $manager->enqueue($this->createTask("unique.key")); } public function testEnqueueForwardsStampsToDispatchedEnvelope () : void