From a7b1f2d781eb905bd9bc8ebb65712cc084356b24 Mon Sep 17 00:00:00 2001 From: Julien Veyssier Date: Mon, 11 May 2026 15:35:54 +0200 Subject: [PATCH 1/4] feat(task-streaming): allow the Php providers to set intermediate results with a callback in process Signed-off-by: Julien Veyssier --- lib/private/TaskProcessing/Manager.php | 42 +++++++++++++++++++ .../ISynchronousProgressiveProvider.php | 36 ++++++++++++++++ 2 files changed, 78 insertions(+) create mode 100644 lib/public/TaskProcessing/ISynchronousProgressiveProvider.php diff --git a/lib/private/TaskProcessing/Manager.php b/lib/private/TaskProcessing/Manager.php index 6821aee65a002..390b6b622ec16 100644 --- a/lib/private/TaskProcessing/Manager.php +++ b/lib/private/TaskProcessing/Manager.php @@ -59,6 +59,7 @@ use OCP\TaskProcessing\IInternalTaskType; use OCP\TaskProcessing\IManager; use OCP\TaskProcessing\IProvider; +use OCP\TaskProcessing\ISynchronousProgressiveProvider; use OCP\TaskProcessing\ISynchronousProvider; use OCP\TaskProcessing\ISynchronousWatermarkingProvider; use OCP\TaskProcessing\ITaskType; @@ -1135,6 +1136,13 @@ public function processTask(Task $task, ISynchronousProvider $provider): bool { $this->setTaskStatus($task, Task::STATUS_RUNNING); if ($provider instanceof ISynchronousWatermarkingProvider) { $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress), $task->getIncludeWatermark()); + } elseif ($provider instanceof ISynchronousProgressiveProvider) { + $output = $provider->process( + $task->getUserId(), + $input, + fn (float $progress) => $this->setTaskProgress($task->getId(), $progress), + fn (array $output) => $this->setTaskIntermediateOutput($task->getId(), $output) + ); } else { $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress)); } @@ -1216,6 +1224,40 @@ public function setTaskProgress(int $id, float $progress): bool { return true; } + public function setTaskIntermediateOutput(int $id, array $output): bool { + // TODO: Not sure if we should rather catch the exceptions of getTask here and fail silently + $task = $this->getTask($id); + if ($task->getStatus() === Task::STATUS_CANCELLED) { + return false; + } + $userId = $task->getUserId(); + if ($userId !== null && $userId !== '') { + try { + // TODO figure out how to get the queue with DI + // $queue = Server::get(\OCA\NotifyPush\IQueue::class); + $queue = $this->serverContainer->get(\OCA\NotifyPush\IQueue::class); + $queue->push('notify_custom', [ + 'user' => $userId, + 'message' => 'taskprocessing_task_results', + 'body' => $output, + ]); + error_log('sending to queue!!!!!!'); + } catch (ContainerExceptionInterface|NotFoundExceptionInterface $e) { + $this->logger->debug('OCA\NotifyPush\IQueue not found, not sending to queue'); + error_log('NOT sending to queue!!!!!! ' . $e->getMessage()); + } + } + // no output shape validation for now + $task->setOutput($output); + $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task); + try { + $this->taskMapper->update($taskEntity); + } catch (\OCP\DB\Exception $e) { + throw new \OCP\TaskProcessing\Exception\Exception('There was a problem finding the task', 0, $e); + } + return true; + } + #[\Override] public function setTaskResult(int $id, ?string $error, ?array $result, bool $isUsingFileIds = false, ?string $userFacingError = null): void { // TODO: Not sure if we should rather catch the exceptions of getTask here and fail silently diff --git a/lib/public/TaskProcessing/ISynchronousProgressiveProvider.php b/lib/public/TaskProcessing/ISynchronousProgressiveProvider.php new file mode 100644 index 0000000000000..102062d8d1ab3 --- /dev/null +++ b/lib/public/TaskProcessing/ISynchronousProgressiveProvider.php @@ -0,0 +1,36 @@ +|numeric|string|File> $input The task input + * @param callable(float):bool $reportProgress Report the task progress. If this returns false, that means the task was cancelled and processing should be stopped. + * @param null|callable(array):bool $reportOutput Set the task intermediate output + * @psalm-return array|numeric|string> + * @throws ProcessingException + * @since 33.0.0 + */ + #[\Override] + public function process(?string $userId, array $input, callable $reportProgress, ?callable $reportOutput = null): array; +} From 062b0762f5d1fc895c17af0b8a63ef45d995abed Mon Sep 17 00:00:00 2001 From: Julien Veyssier Date: Mon, 11 May 2026 15:57:42 +0200 Subject: [PATCH 2/4] fix and test sending data via notify_push Signed-off-by: Julien Veyssier --- lib/private/TaskProcessing/Manager.php | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/private/TaskProcessing/Manager.php b/lib/private/TaskProcessing/Manager.php index 390b6b622ec16..6cdf31c4f7ab4 100644 --- a/lib/private/TaskProcessing/Manager.php +++ b/lib/private/TaskProcessing/Manager.php @@ -1231,11 +1231,11 @@ public function setTaskIntermediateOutput(int $id, array $output): bool { return false; } $userId = $task->getUserId(); - if ($userId !== null && $userId !== '') { + if ($userId !== null && $userId !== '' && $this->appManager->isEnabledForAnyone('notify_push')) { try { - // TODO figure out how to get the queue with DI - // $queue = Server::get(\OCA\NotifyPush\IQueue::class); - $queue = $this->serverContainer->get(\OCA\NotifyPush\IQueue::class); + // $this->appManager->loadApp('notify_push'); + $queue = Server::get(\OCA\NotifyPush\Queue\IQueue::class); + // $queue = $this->serverContainer->get(\OCA\NotifyPush\Queue\IQueue::class); $queue->push('notify_custom', [ 'user' => $userId, 'message' => 'taskprocessing_task_results', From 9cb1f3f4ea85945a5a3c94157a34bf59423c34f8 Mon Sep 17 00:00:00 2001 From: Julien Veyssier Date: Tue, 12 May 2026 13:10:19 +0200 Subject: [PATCH 3/4] feat(task-streaming): add an endpoint to set a task intermediate output Signed-off-by: Julien Veyssier --- .../TaskProcessingApiController.php | 31 +++++++++++++++++++ lib/private/TaskProcessing/Manager.php | 2 +- lib/public/TaskProcessing/IManager.php | 10 ++++++ 3 files changed, 42 insertions(+), 1 deletion(-) diff --git a/core/Controller/TaskProcessingApiController.php b/core/Controller/TaskProcessingApiController.php index fd22d485af380..7b52c543d91a0 100644 --- a/core/Controller/TaskProcessingApiController.php +++ b/core/Controller/TaskProcessingApiController.php @@ -645,6 +645,37 @@ public function setResult(int $taskId, ?array $output = null, ?string $errorMess } } + /** + * Sets the task intermediate result while it is running + * + * @param int $taskId The id of the task + * @param array|null $output The intermediate task output, files are represented by their IDs + * @return DataResponse|DataResponse + * + * 200: Result updated successfully + * 404: Task not found + */ + #[ExAppRequired] + #[ApiRoute(verb: 'POST', url: '/tasks_provider/{taskId}/stream-result', root: '/taskprocessing')] + public function setIntermediateResult(int $taskId, array $output): DataResponse { + try { + // set result + $this->taskProcessingManager->setTaskIntermediateOutput($taskId, $output); + $task = $this->taskProcessingManager->getTask($taskId); + + /** @var CoreTaskProcessingTask $json */ + $json = $task->jsonSerialize(); + + return new DataResponse([ + 'task' => $json, + ]); + } catch (NotFoundException) { + return new DataResponse(['message' => $this->l->t('Not found')], Http::STATUS_NOT_FOUND); + } catch (Exception) { + return new DataResponse(['message' => $this->l->t('Internal error')], Http::STATUS_INTERNAL_SERVER_ERROR); + } + } + /** * @return DataResponse|DataResponse */ diff --git a/lib/private/TaskProcessing/Manager.php b/lib/private/TaskProcessing/Manager.php index 6cdf31c4f7ab4..e9e6fbdd084fd 100644 --- a/lib/private/TaskProcessing/Manager.php +++ b/lib/private/TaskProcessing/Manager.php @@ -1227,7 +1227,7 @@ public function setTaskProgress(int $id, float $progress): bool { public function setTaskIntermediateOutput(int $id, array $output): bool { // TODO: Not sure if we should rather catch the exceptions of getTask here and fail silently $task = $this->getTask($id); - if ($task->getStatus() === Task::STATUS_CANCELLED) { + if ($task->getStatus() !== Task::STATUS_RUNNING) { return false; } $userId = $task->getUserId(); diff --git a/lib/public/TaskProcessing/IManager.php b/lib/public/TaskProcessing/IManager.php index 2cd0244b52e8d..f919410d7c4fe 100644 --- a/lib/public/TaskProcessing/IManager.php +++ b/lib/public/TaskProcessing/IManager.php @@ -143,6 +143,16 @@ public function cancelTask(int $id): void; */ public function setTaskResult(int $id, ?string $error, ?array $result, bool $isUsingFileIds = false, ?string $userFacingError = null): void; + /** + * @param int $id The id of the task + * @param array $output + * @return bool `true` if the task should still be running; `false` if the task has been cancelled in the meantime + * @throws Exception If the query failed + * @throws NotFoundException If the task could not be found + * @since 34.0.0 + */ + public function setTaskIntermediateOutput(int $id, array $output): bool; + /** * @param int $id * @param float $progress From e9c84903882684fa27b6526e1c96342550596260 Mon Sep 17 00:00:00 2001 From: Julien Veyssier Date: Sat, 16 May 2026 20:23:16 +0200 Subject: [PATCH 4/4] make the notify_push message shorter Signed-off-by: Julien Veyssier --- lib/private/TaskProcessing/Manager.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/private/TaskProcessing/Manager.php b/lib/private/TaskProcessing/Manager.php index e9e6fbdd084fd..59d0c5dfca880 100644 --- a/lib/private/TaskProcessing/Manager.php +++ b/lib/private/TaskProcessing/Manager.php @@ -1238,7 +1238,7 @@ public function setTaskIntermediateOutput(int $id, array $output): bool { // $queue = $this->serverContainer->get(\OCA\NotifyPush\Queue\IQueue::class); $queue->push('notify_custom', [ 'user' => $userId, - 'message' => 'taskprocessing_task_results', + 'message' => 'task_' . $task->getId(), 'body' => $output, ]); error_log('sending to queue!!!!!!');