Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions core/Controller/TaskProcessingApiController.php
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,37 @@
}
}

/**
* Sets the task intermediate result while it is running
*
* @param int $taskId The id of the task
* @param array<string,mixed>|null $output The intermediate task output, files are represented by their IDs

Check failure on line 652 in core/Controller/TaskProcessingApiController.php

View workflow job for this annotation

GitHub Actions / static-code-analysis

MismatchingDocblockParamType

core/Controller/TaskProcessingApiController.php:652:12: MismatchingDocblockParamType: Parameter $output has wrong type 'array<string, mixed>|null', should be 'array<array-key, mixed>' (see https://psalm.dev/141)
* @return DataResponse<Http::STATUS_OK, array{task: CoreTaskProcessingTask}, array{}>|DataResponse<Http::STATUS_INTERNAL_SERVER_ERROR|Http::STATUS_NOT_FOUND, array{message: string}, array{}>
*
* 200: Result updated successfully
* 404: Task not found
*/
#[ExAppRequired]
#[ApiRoute(verb: 'POST', url: '/tasks_provider/{taskId}/stream-result', root: '/taskprocessing')]
public function setIntermediateResult(int $taskId, array $output): DataResponse {
try {
// set result
$this->taskProcessingManager->setTaskIntermediateOutput($taskId, $output);
$task = $this->taskProcessingManager->getTask($taskId);

/** @var CoreTaskProcessingTask $json */
$json = $task->jsonSerialize();

return new DataResponse([
'task' => $json,
]);
} catch (NotFoundException) {
return new DataResponse(['message' => $this->l->t('Not found')], Http::STATUS_NOT_FOUND);
} catch (Exception) {
return new DataResponse(['message' => $this->l->t('Internal error')], Http::STATUS_INTERNAL_SERVER_ERROR);
}
}

/**
* @return DataResponse<Http::STATUS_OK, array{task: CoreTaskProcessingTask}, array{}>|DataResponse<Http::STATUS_INTERNAL_SERVER_ERROR|Http::STATUS_NOT_FOUND, array{message: string}, array{}>
*/
Expand Down
42 changes: 42 additions & 0 deletions lib/private/TaskProcessing/Manager.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
use OCP\TaskProcessing\IInternalTaskType;
use OCP\TaskProcessing\IManager;
use OCP\TaskProcessing\IProvider;
use OCP\TaskProcessing\ISynchronousProgressiveProvider;
use OCP\TaskProcessing\ISynchronousProvider;
use OCP\TaskProcessing\ISynchronousWatermarkingProvider;
use OCP\TaskProcessing\ITaskType;
Expand Down Expand Up @@ -1135,6 +1136,13 @@
$this->setTaskStatus($task, Task::STATUS_RUNNING);
if ($provider instanceof ISynchronousWatermarkingProvider) {
$output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress), $task->getIncludeWatermark());
} elseif ($provider instanceof ISynchronousProgressiveProvider) {
$output = $provider->process(
$task->getUserId(),
$input,
fn (float $progress) => $this->setTaskProgress($task->getId(), $progress),
fn (array $output) => $this->setTaskIntermediateOutput($task->getId(), $output)
);
} else {
$output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress));
}
Expand Down Expand Up @@ -1216,6 +1224,40 @@
return true;
}

public function setTaskIntermediateOutput(int $id, array $output): bool {

Check failure on line 1227 in lib/private/TaskProcessing/Manager.php

View workflow job for this annotation

GitHub Actions / static-code-analysis

MissingOverrideAttribute

lib/private/TaskProcessing/Manager.php:1227:2: MissingOverrideAttribute: Method OC\TaskProcessing\Manager::settaskintermediateoutput should have the "Override" attribute (see https://psalm.dev/358)
// TODO: Not sure if we should rather catch the exceptions of getTask here and fail silently
$task = $this->getTask($id);
if ($task->getStatus() !== Task::STATUS_RUNNING) {
return false;
}
$userId = $task->getUserId();
if ($userId !== null && $userId !== '' && $this->appManager->isEnabledForAnyone('notify_push')) {
try {
// $this->appManager->loadApp('notify_push');
$queue = Server::get(\OCA\NotifyPush\Queue\IQueue::class);

Check failure on line 1237 in lib/private/TaskProcessing/Manager.php

View workflow job for this annotation

GitHub Actions / static-code-analysis

UndefinedClass

lib/private/TaskProcessing/Manager.php:1237:26: UndefinedClass: Class, interface or enum named OCA\NotifyPush\Queue\IQueue does not exist (see https://psalm.dev/019)
// $queue = $this->serverContainer->get(\OCA\NotifyPush\Queue\IQueue::class);
$queue->push('notify_custom', [
'user' => $userId,
'message' => 'task_' . $task->getId(),
'body' => $output,
]);
error_log('sending to queue!!!!!!');
} catch (ContainerExceptionInterface|NotFoundExceptionInterface $e) {
$this->logger->debug('OCA\NotifyPush\IQueue not found, not sending to queue');
error_log('NOT sending to queue!!!!!! ' . $e->getMessage());
}
}
// no output shape validation for now
$task->setOutput($output);
$taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
try {
$this->taskMapper->update($taskEntity);
} catch (\OCP\DB\Exception $e) {
throw new \OCP\TaskProcessing\Exception\Exception('There was a problem finding the task', 0, $e);
}
return true;
}

#[\Override]
public function setTaskResult(int $id, ?string $error, ?array $result, bool $isUsingFileIds = false, ?string $userFacingError = null): void {
// TODO: Not sure if we should rather catch the exceptions of getTask here and fail silently
Expand Down
10 changes: 10 additions & 0 deletions lib/public/TaskProcessing/IManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,16 @@ public function cancelTask(int $id): void;
*/
public function setTaskResult(int $id, ?string $error, ?array $result, bool $isUsingFileIds = false, ?string $userFacingError = null): void;

/**
* @param int $id The id of the task
* @param array $output
* @return bool `true` if the task should still be running; `false` if the task has been cancelled in the meantime
* @throws Exception If the query failed
* @throws NotFoundException If the task could not be found
* @since 34.0.0
*/
public function setTaskIntermediateOutput(int $id, array $output): bool;

/**
* @param int $id
* @param float $progress
Expand Down
36 changes: 36 additions & 0 deletions lib/public/TaskProcessing/ISynchronousProgressiveProvider.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

declare(strict_types=1);

/**
* SPDX-FileCopyrightText: 2026 Nextcloud GmbH and Nextcloud contributors
* SPDX-License-Identifier: AGPL-3.0-or-later
*/


namespace OCP\TaskProcessing;

use OCP\Files\File;
use OCP\TaskProcessing\Exception\ProcessingException;

/**
* This is the interface that is implemented by apps that
* implement a task processing provider that supports updating the output during processing
* @since 34.0.0
*/
interface ISynchronousProgressiveProvider extends ISynchronousProvider {

/**
* Returns the shape of optional output parameters
*
* @param null|string $userId The user that created the current task
* @param array<string, list<numeric|string|File>|numeric|string|File> $input The task input
* @param callable(float):bool $reportProgress Report the task progress. If this returns false, that means the task was cancelled and processing should be stopped.
* @param null|callable(array):bool $reportOutput Set the task intermediate output
* @psalm-return array<string, list<numeric|string>|numeric|string>
* @throws ProcessingException
* @since 33.0.0
*/
#[\Override]
public function process(?string $userId, array $input, callable $reportProgress, ?callable $reportOutput = null): array;
}
Loading