Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
3.4.0 (unreleased)
=====

* (improvement) Require Symfony 8.1+
* (feature) Use [Symfonys native message deduplication](https://symfony.com/doc/current/messenger.html#message-deduplication)
* (improvement) Use UUIDv7 instead of ULID for task ids
* (deprecation) Deprecate `TaskLog::$ulid`, use `TaskLog::$uuid` instead.


3.3.2
=====

Expand Down
1 change: 1 addition & 0 deletions UPGRADE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
==========

* `TaskLog::getTaskObject()` was removed. Use `TaskDetailsNormalizer::deserializeTask($log)` instead.
* `TaskLog::$ulid` was removed, use `TaskLog::$uuid` instead.


1.x to 2.0
Expand Down
36 changes: 18 additions & 18 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,33 @@
"require": {
"php": ">= 8.5",
"21torr/bundle-helpers": "^2.3.2",
"21torr/cli": "^1.2.3",
"21torr/hosting": "^4.1.1",
"21torr/snail": "^1.0.1",
"21torr/cli": "^1.3.0",
"21torr/hosting": "^4.1.2",
"21torr/snail": "^1.0.2",
"doctrine/collections": "^2.6",
"doctrine/doctrine-bundle": "^3.2.2",
"doctrine/orm": "^3.6",
"dragonmantank/cron-expression": "^3.6",
"psr/log": "^3.0",
"symfony/clock": "^7.4 || ^8.0",
"symfony/config": "^7.4 || ^8.0",
"symfony/console": "^7.4 || ^8.0",
"symfony/dependency-injection": "^7.4 || ^8.0",
"symfony/event-dispatcher": "^7.4 || ^8.0",
"symfony/http-kernel": "^7.4 || ^8.0",
"symfony/lock": "^7.4 || ^8.0",
"symfony/messenger": "^7.4 || ^8.0",
"symfony/scheduler": "^7.4 || ^8.0",
"symfony/serializer": "^7.4 || ^8.0",
"symfony/string": "^7.4 || ^8.0",
"symfony/uid": "^7.4 || ^8.0"
"symfony/clock": "^8.1",
"symfony/config": "^8.1",
"symfony/console": "^8.1",
"symfony/dependency-injection": "^8.1",
"symfony/event-dispatcher": "^8.1",
"symfony/http-kernel": "^8.1",
"symfony/lock": "^8.1",
"symfony/messenger": "^8.1",
"symfony/scheduler": "^8.1",
"symfony/serializer": "^8.1",
"symfony/string": "^8.1",
"symfony/uid": "^8.1"
},
"require-dev": {
"21torr/janus": "^2.0.3",
"21torr/janus": "^2.1.1",
"bamarni/composer-bin-plugin": "^1.9.1",
"phpunit/phpunit": "^13.0.5",
"phpunit/phpunit": "^13.2.1",
"roave/security-advisories": "dev-latest",
"symfony/translation-contracts": "^3.6.1"
"symfony/translation-contracts": "^3.7.0"
},
"autoload": {
"psr-4": {
Expand Down
4 changes: 2 additions & 2 deletions src/Entity/TaskLog.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class TaskLog
public private(set) ?int $id = null;

/**
* ULIDs have only 22 characters, but just to be sure
* UUIDv7s have only 36 characters, but just to be sure we use 50 characters
*/
#[ORM\Column(type: Types::STRING, length: 50, unique: true)]
public private(set) string $taskId;
Expand Down Expand Up @@ -73,7 +73,7 @@ public function __construct (
)
{
$this->taskClass = $task::class;
$this->taskId = $task->ulid;
$this->taskId = $task->uuid;
$this->runs = new ArrayCollection();
$this->timeQueued = now();
}
Expand Down
79 changes: 8 additions & 71 deletions src/Manager/TaskManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DeduplicateStamp;
use Symfony\Component\Messenger\Stamp\StampInterface;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Transport\Sync\SyncTransport;
use Symfony\Component\Scheduler\Messenger\SchedulerTransport;
use Torr\TaskManager\Exception\Transport\InvalidMessageTransportException;
use Torr\TaskManager\Task\Task;
use Torr\TaskManager\Transport\TransportsHelper;

Expand All @@ -32,78 +29,18 @@ public function __construct (
*/
public function enqueue (Task $task, array $stamps = []) : bool
{
// if we find a message with the same unique task id, we don't queue it again
if ($this->isTaskWithSameTaskIdAlreadyQueued($task->getMetaData()->uniqueTaskId))
{
return false;
}

$envelope = new Envelope($task, $stamps);

$this->messageBus->dispatch($envelope);

return true;
}

/**
* Finds a queued message with the given job id
*/
private function isTaskWithSameTaskIdAlreadyQueued (?string $uniqueTaskId) : bool
{
// no task id, so this task is not deduplicated. No need to check anything, just enqueue it.
if (null === $uniqueTaskId)
{
return false;
}

foreach ($this->transportsHelper->getOrderedQueueNames() as $queueName)
{
foreach ($this->fetchTasksInQueue($queueName) as $envelope)
{
$message = $envelope->getMessage();

if ($message instanceof Task && $message->getMetaData()->uniqueTaskId === $uniqueTaskId)
{
return true;
}
}
}

return false;
}

/**
* Fetches all tasks for the given priority
*
* @return iterable<Envelope>
*
* @api
*/
public function fetchTasksInQueue (string $queueName) : iterable
{
$receiver = $this->transportsHelper->getTransport($queueName);

// ignore schedulers
if ($receiver instanceof SchedulerTransport)
{
return [];
}
$uniqueTaskId = $task->getMetaData()->uniqueTaskId;

// skip, as sync transports can't queue messages like regular transports
if ($receiver instanceof SyncTransport)
if (null !== $uniqueTaskId)
{
return [];
$stamps[] = new DeduplicateStamp($uniqueTaskId);
}

if (!$receiver instanceof ListableReceiverInterface)
{
throw new InvalidMessageTransportException(\sprintf(
"Transport for queue '%s' must implement ListableReceiverInterface",
$queueName,
));
}
$this->messageBus->dispatch(
new Envelope($task, $stamps),
);

return $receiver->all();
return true;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/Model/TaskLogModel.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public function findByTaskId (string $taskId) : ?TaskLog
*/
public function getLogForTask (Task $task) : TaskLog
{
$log = $this->findByTaskId($task->ulid);
$log = $this->findByTaskId($task->uuid);

if (null !== $log)
{
Expand Down
19 changes: 16 additions & 3 deletions src/Task/Task.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,30 @@

namespace Torr\TaskManager\Task;

use Symfony\Component\Uid\Ulid;
use Symfony\Component\Uid\UuidV7;

/**
* A runnable task
*/
abstract readonly class Task
{
public string $uuid;

/**
* @deprecated use $taskId instead
*
* @todo remove in 4.0
*/
public string $ulid;

/**
*/
public function __construct ()
{
$this->ulid = new Ulid()->toBase58();
$uuid = new UuidV7()->toString();
$this->uuid = $uuid;
/** @phpstan-ignore-next-line property.deprecated (We still need to support the deprecated property) */
$this->ulid = $uuid;
}

/**
Expand All @@ -31,8 +41,11 @@ abstract public function getMetaData () : TaskMetaData;
*/
public function withNewTaskUlid () : static
{
$uuid = new UuidV7()->toString();

return clone($this, [
"ulid" => new Ulid()->toBase58(),
"uuid" => $uuid,
"ulid" => $uuid,
]);
}
}
2 changes: 1 addition & 1 deletion tests/Entity/TaskLogTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public function testTaskIdMatchesTaskUlid () : void
$task = $this->createTask();
$log = new TaskLog($task);

self::assertSame($task->ulid, $log->taskId);
self::assertSame($task->uuid, $log->taskId);
}

public function testTaskClassMatchesTaskClass () : void
Expand Down
8 changes: 6 additions & 2 deletions tests/Entity/TaskRunTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Clock\Test\ClockSensitiveTrait;
use Torr\TaskManager\Entity\TaskLog;
use Torr\TaskManager\Entity\TaskRun;
use Torr\TaskManager\Task\Task;
Expand All @@ -14,8 +15,9 @@
*/
final class TaskRunTest extends TestCase
{
// region Helpers
use ClockSensitiveTrait;

// region Helpers
private function createLog () : TaskLog
{
// @phpstan-ignore-next-line 21torr.custom.task.suffix
Expand Down Expand Up @@ -45,14 +47,16 @@ public function testInitialStateIsUnfinished () : void

public function testFinishMarksAsSuccessful () : void
{
self::mockTime("2026-06-16 12:00:00");
$run = new TaskRun($this->createLog());
self::mockTime("2026-06-16 12:00:05");
$run->finish(true, "some output");

self::assertTrue($run->isFinished);
self::assertTrue($run->success);
self::assertSame("some output", $run->output);
self::assertTrue($run->finishedProperly);
self::assertGreaterThan(0, $run->duration);
self::assertEquals(5e9, $run->duration);
}

public function testFinishMarksAsFailure () : void
Expand Down
51 changes: 11 additions & 40 deletions tests/Manager/TaskManagerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DeduplicateStamp;
use Symfony\Component\Messenger\Stamp\StampInterface;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
Expand All @@ -21,7 +22,6 @@
final class TaskManagerTest extends TestCase
{
// region Helpers

private function createTask (?string $uniqueTaskId = null) : Task
{
// @phpstan-ignore-next-line 21torr.custom.task.suffix
Expand All @@ -46,7 +46,7 @@ public function getMetaData () : TaskMetaData
*
* @param Envelope[] $envelopes
*/
private function createListableTransport (array $envelopes = []) : TransportInterface&ListableReceiverInterface
private function createListableTransport (array $envelopes = []) : ListableReceiverInterface|TransportInterface
{
return new class($envelopes) implements TransportInterface, ListableReceiverInterface {
/** @param Envelope[] $envelopes */
Expand Down Expand Up @@ -112,46 +112,17 @@ public function testEnqueueReturnsTrueWhenNoConflict () : void

public function testEnqueueReturnsFalseWhenDuplicateInQueue () : void
{
$existingTask = $this->createTask("test.task");
$transport = $this->createListableTransport([new Envelope($existingTask)]);

$bus = $this->createMock(MessageBusInterface::class);
$bus->expects(self::never())->method("dispatch");

$manager = $this->createManager(["queue" => $transport], $bus);
$newTask = $this->createTask("test.task");

self::assertFalse($manager->enqueue($newTask));
}

public function testEnqueueScansMultipleQueuesForDuplicate () : void
{
$existingTask = $this->createTask("test.task");
$emptyTransport = $this->createListableTransport([]);
$fullTransport = $this->createListableTransport([new Envelope($existingTask)]);

$bus = $this->createMock(MessageBusInterface::class);
$bus->expects(self::never())->method("dispatch");

$manager = $this->createManager([
"queue_a" => $emptyTransport,
"queue_b" => $fullTransport,
], $bus);

self::assertFalse($manager->enqueue($this->createTask("test.task")));
}

public function testEnqueueDispatchesWhenDifferentUniqueTaskIdInQueue () : void
{
$otherTask = $this->createTask("other.task");
$transport = $this->createListableTransport([new Envelope($otherTask)]);
$bus = self::createMock(MessageBusInterface::class);

$bus = $this->createMock(MessageBusInterface::class);
$bus->expects(self::once())->method("dispatch")->willReturnArgument(0);

$manager = $this->createManager(["queue" => $transport], $bus);
$bus->expects(self::once())
->method("dispatch")
->with(self::callback(
static fn (Envelope $envelope) : bool => "unique.key" === $envelope->last(DeduplicateStamp::class)?->getKey()->__toString(),
))
->willReturnArgument(0);

self::assertTrue($manager->enqueue($this->createTask("test.task")));
$manager = $this->createManager(["queue" => $this->createListableTransport()], $bus);
$manager->enqueue($this->createTask("unique.key"));
}

public function testEnqueueForwardsStampsToDispatchedEnvelope () : void
Expand Down
4 changes: 2 additions & 2 deletions tests/Task/TaskTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ public function getMetaData () : TaskMetaData
}
};

$initialUlid = $task->ulid;
$initialUlid = $task->uuid;
$newTask = $task->withNewTaskUlid();
self::assertNotSame($initialUlid, $newTask->ulid, "Task ULID should change on PHP 8.4");
self::assertNotSame($initialUlid, $newTask->uuid, "Task ULID should change on PHP 8.4");
}
}
Loading