Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions routes/web.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use Dotenv\Dotenv;
use Lion\Bundle\Enums\LogTypeEnum;
use Lion\Bundle\Helpers\Commands\Queue\TaskQueue;
use Lion\Bundle\Support\Task;
use Lion\Database\Driver;
use Lion\Files\Store;
use Lion\Route\Route;
Expand Down Expand Up @@ -79,23 +80,24 @@
Route::addMiddleware(Routes::getMiddleware());
// -----------------------------------------------------------------------------
Route::get('/', function (): stdClass {
/** @phpstan-ignore-next-line */
$taskQueue = new TaskQueue([
$data = [
'scheme' => env('REDIS_SCHEME'),
'host' => env('REDIS_HOST'),
'port' => env('REDIS_PORT'),
'database' => TaskQueue::LION_DATABASE,
'parameters' => [
'password' => env('REDIS_PASSWORD'),
'database' => TaskQueue::LION_DATABASE,
],
]);
];

/** @phpstan-ignore-next-line */
$taskQueue = new TaskQueue($data);

$taskQueue
->push(
new \Lion\Bundle\Support\Task(ExampleProvider::class, 'getArrExample', [
'name' => 'root',
]),
new \Lion\Bundle\Support\Task(ExampleProvider::class, 'getResult')
new Task(ExampleProvider::class, 'getArrExample', ['name' => 'root']),
new Task(ExampleProvider::class, 'getResult'),
new Task(ExampleProvider::class, 'generateError')
);

return info('[index]');
Expand Down
212 changes: 136 additions & 76 deletions src/LionBundle/Commands/Lion/Queue/RunQueuedTasksCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@
namespace Lion\Bundle\Commands\Lion\Queue;

use DI\Attribute\Inject;
use JsonException;
use Lion\Bundle\Enums\LogTypeEnum;
use Lion\Bundle\Helpers\Commands\Queue\TaskQueue;
use Lion\Bundle\Helpers\Commands\Selection\MenuCommand;
use Lion\Bundle\Support\Task;
use Lion\Dependency\Injection\Container;
use LogicException;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Throwable;

/**
* Allows queued tasks to run in the background.
Expand All @@ -33,6 +36,13 @@ class RunQueuedTasksCommand extends MenuCommand
*/
private TaskQueue $taskQueue;

/**
* Database in use.
*
* @var int $database
*/
private int $database;

#[Inject]
public function setContainer(Container $container): RunQueuedTasksCommand
{
Expand All @@ -50,34 +60,59 @@ protected function configure(): void
{
$this
->setName('queue:run')
->setDescription('Run queued tasks')
->addOption(
'pause',
'p',
InputOption::VALUE_OPTIONAL,
'Defines the time to wait before retrieving tasks if all have been executed.',
60
);
->setDescription('Run queued tasks.')
->addOption('database', 'd', InputOption::VALUE_OPTIONAL, 'Redis database, default value 0 for internal operations.', TaskQueue::LION_DATABASE) // phpcs:ignore
->addOption('pause', 'p', InputOption::VALUE_OPTIONAL, 'Defines the time to wait before retrieving tasks if all have been executed.', 60); // phpcs:ignore
}

/**
* Initializes the command after the input has been bound and before the
* input is validated
* input is validated.
*
* This is mainly useful when a lot of commands extends one main command
* where some things need to be initialized based on the input arguments and
* options
* options.
*
* @param InputInterface $input [InputInterface is the interface implemented
* by all input classes]
* @param OutputInterface $output [OutputInterface is the interface
* implemented by all Output classes]
* @param InputInterface $input InputInterface is the interface implemented
* by all input classes.
* @param OutputInterface $output OutputInterface is the interface
* implemented by all Output classes.
*
* @return void
*/
protected function initialize(InputInterface $input, OutputInterface $output): void
{
parent::initialize($input, $output);
}

/**
* Executes the current command.
*
* This method is not abstract because you can use this class as a concrete
* class. In this case, instead of defining the execute() method, you set
* the code to execute by passing a Closure to the setCode() method.
*
* @param InputInterface $input InputInterface is the interface implemented
* by all input classes.
* @param OutputInterface $output OutputInterface is the interface
* implemented by all Output classes.
*
* @return int
*
* @throws JsonException If encoding to JSON fails.
* @throws LogicException When this abstract method is not implemented.
*
* @codeCoverageIgnore
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
/** @var string $pause */
$pause = $input->getOption('pause');

/** @var string $database */
$database = $input->getOption('database');

$this->database = (int) $database;

/** @var string $redisScheme */
$redisScheme = env('REDIS_SCHEME');
Expand All @@ -91,47 +126,22 @@ protected function initialize(InputInterface $input, OutputInterface $output): v
/** @var string $password */
$password = env('REDIS_PASSWORD');

$this->taskQueue = new TaskQueue([
'scheme' => $redisScheme,
'host' => $host,
'port' => $port,
'parameters' => [
'password' => $password,
'database' => TaskQueue::LION_DATABASE,
$this->taskQueue = new TaskQueue(parameters: [
TaskQueue::SCHEME => $redisScheme,
TaskQueue::HOST => $host,
TaskQueue::PORT => $port,
TaskQueue::DATABASE => $this->database,
TaskQueue::PARAMETERS => [
TaskQueue::PASSWORD => $password,
],
]);
}

/**
* Executes the current command
*
* This method is not abstract because you can use this class
* as a concrete class. In this case, instead of defining the
* execute() method, you set the code to execute by passing
* a Closure to the setCode() method
*
* @param InputInterface $input [InputInterface is the interface implemented
* by all input classes]
* @param OutputInterface $output [OutputInterface is the interface
* implemented by all Output classes]
*
* @return int
*
* @throws LogicException [When this abstract method is not implemented]
*
* @codeCoverageIgnore
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
/** @var int|string $pause */
$pause = $input->getOption('pause');

/** @phpstan-ignore-next-line */
while (true) {
$json = $this->taskQueue->get();

if (null === $json) {
$output->writeln($this->infoOutput("\t>> SCHEDULE: no queued tasks available"));
$output->writeln($this->infoOutput("\t>> TASK [DATABASE: {$this->database}]: There are no queued tasks available. [OMITTED]")); // phpcs:ignore

$this->taskQueue->pause((int) $pause);

Expand All @@ -146,38 +156,88 @@ protected function execute(InputInterface $input, OutputInterface $output): int
* } $queue */
$queue = json_decode($json, true);

$output->writeln(
$this->warningOutput(
"\t>> SCHEDULE: {$queue['id']} / {$queue['namespace']}::{$queue['method']} [PROCESSING]"
)
);

$return = $this->container->callMethod(
$this->container->resolve($queue['namespace']),
$queue['method'],
[
'queue' => $queue,
...$queue['data'],
],
);

if (is_object($return)) {
$return = (array) $return;
}
$output->writeln($this->warningOutput($this->getOutput('PROCESSING', $queue)));

try {
/** @var string $id */
$id = $queue[Task::ID];

/** @var string $namespace */
$namespace = $queue[Task::NAMESPACE];

/** @var string $method */
$method = $queue['method'];

/**
* @var array $data
*
* @phpstan-ignore-next-line
*/
$data = $queue[Task::DATA];

$instance = $this->container->resolve($namespace);

$instanceParams = ['queue' => $queue, ...$data];

/** @phpstan-ignore-next-line */
$return = $this->container->callMethod($instance, $method, $instanceParams);

if (is_object($return)) {
$return = (array) $return;
}

$json = [
'class' => "{$queue['namespace']}::{$queue['method']}",
'params' => $queue['data'],
'return' => $return,
];
$log = [
'class' => "{$namespace}::{$method}",
'params' => $data,
'return' => $return,
];

logger("TASK: {$queue['id']}", LogTypeEnum::INFO, $json);
logger("TASK: {$id}", LogTypeEnum::INFO, $log);

$output->writeln(
$this->successOutput(
"\t>> SCHEDULE: {$queue['id']} / {$queue['namespace']}::{$queue['method']} [COMPLETED]"
)
);
$output->writeln($this->successOutput($this->getOutput('COMPLETED', $queue)));
} catch (Throwable $exception) {
$loggerData = [
'class' => "{$namespace}::{$method}",
'params' => $data,
'error' => [
'message' => $exception->getMessage(),
'file' => $exception->getFile(),
'line' => $exception->getLine(),
'trace' => $exception->getTraceAsString(),
],
];

logger("TASK [DATABASE: {$this->database}]: {$id}", LogTypeEnum::ERROR, $loggerData);

$output->writeln($this->errorOutput($this->getOutput('ERROR', $queue)));
}
}
}

/**
* @param string $type
* @param array{
* id: string,
* namespace: string,
* method: string,
* data: array<string, mixed>
* } $queue
*
* @return string
*
* @codeCoverageIgnore
*/
private function getOutput(string $type, array $queue): string
{
/** @var string $id */
$id = $queue['id'];

/** @var string $namespace */
$namespace = $queue['namespace'];

/** @var string $method */
$method = $queue['method'];

return "\t>> TASK [DATABASE: {$this->database}]: {$id} / {$namespace}::{$method} [{$type}]";
}
}
23 changes: 17 additions & 6 deletions src/LionBundle/Helpers/Commands/Queue/TaskQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,28 @@
*/
class TaskQueue
{
/**
* Reference constants for Redis initialization.
*/
public const string SCHEME = 'scheme';

public const string HOST = 'host';

public const string PORT = 'port';

public const string DATABASE = 'database';

public const string PARAMETERS = 'parameters';

public const string PASSWORD = 'password';

/**
* Defines the property that contains the queued task data.
*
* @const LION_TASKS
*/
public const string LION_TASKS = 'lion-tasks';

/**
* Defines the database to connect to and manipulate tasks.
*
* @const LION_DATABASE
*/
public const int LION_DATABASE = 0;

Expand All @@ -41,9 +52,9 @@ class TaskQueue
* scheme: string,
* host: string,
* port: int,
* database: int,
* parameters: array{
* password: string,
* database: int
* password: string
* }
* } $parameters
*/
Expand Down
Loading
Loading