Make the Tamanu task queue resilient to failing tasks#157
Open
xispa wants to merge 3 commits into
Open
Conversation
A failing task could block the whole outbound queue. exec_tamanu_tasks popped
a task (del from the OOBTree), then ran task.process() with no exception
handling and only a savepoint per task, committing once at the end. When Tamanu
rejected a record (e.g. an empty result, raise_for_status), the exception
aborted the transaction, which rolled back the dequeue — so the same task
re-headed the queue every run and blocked everything behind it. A late failure
also rolled back already-completed dequeues whose Bundle POSTs had already
reached Tamanu, causing duplicate reports.
queue.py:
* get() returns a (task_id, task) tuple so the worker can act on a failed
task. (None, None) means nothing is ready; (task_id, None) means the head
was popped but cannot be resolved (invalid id or its context no longer
exists) and should be dropped — previously this made the worker stop the
whole batch.
* Add fail(task_id, error): increments the attempt count and reschedules the
task with a growing back-off (attempt * RETRY_BACKOFF) for the first
MAX_ATTEMPTS, then moves it to a dead-letter store (TAMANU_TASKS_DEADLETTER)
with the error and attempt count. Transient outages recover within the
retries; a persistently failing record is parked instead of blocking the
head, while staying visible for inspection/re-injection.
* Queue values are now {"when", "attempts"} dicts, but reads tolerate the
legacy bare-int values, so no migration is required.
exec_tamanu_tasks.py:
* Commit per task so a successful send durably removes the task and a later
failure can no longer roll back earlier sends (no more duplicate reports).
* Wrap task.process(): on ConflictError, abort and retry next run without
penalising the attempt count; on any other error, log it, abort the partial
writes, and retry-with-back-off or dead-letter the task in a fresh
transaction. A failing task no longer aborts the batch.
* Drop unresolvable tasks instead of stopping the batch.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
Linked issue: #139
A single failing task could block the entire outbound queue. This adds per-task isolation, retry-with-back-off, and a dead-letter store so a bad record is parked instead of blocking everything behind it, and successful sends can no longer be rolled back into duplicates.
Changes:
tamanu/tasks/queue.py:get()returns a(task_id, task)tuple so the worker can act on a failed task.(None, None)= nothing ready;(task_id, None)= head popped but unresolvable (invalid id / context gone) and should be dropped.fail(task_id, error): increments the attempt count and reschedules with a growing back-off for the firstMAX_ATTEMPTS, then moves the task to a dead-letter store (TAMANU_TASKS_DEADLETTER) with the error and attempt count.{"when", "attempts"}dicts, but reads tolerate the legacy bare-int values, so no migration is required.scripts/exec_tamanu_tasks.py:task.process(): onConflictError, abort and retry next run without penalising the attempt count; on any other error, log it, abort the partial writes, and retry-with-back-off or dead-letter the task in a fresh transaction.Current behavior
exec_tamanu_taskspops a task (deletes it from the queue), then runstask.process()with no exception handling and only a savepoint per task, committing once at the end. When Tamanu rejects a record (e.g. an empty result,raise_for_status), the exception aborts the transaction, which rolls back the dequeue — so the same task re-heads the queue on every run and blocks everything behind it. A late failure also rolls back already-completed dequeues whose Bundle POSTs already reached Tamanu, producing duplicate reports.Desired behavior
A failing task is retried with back-off and, if it keeps failing, moved to a dead-letter store where it stays visible for inspection/re-injection — it never blocks the queue head. Successful tasks are committed individually, so no report is ever re-sent because of an unrelated later failure. Transient
ConflictErrors are retried on the next run without counting against the task.--
I confirm I have tested this PR thoroughly and coded it according to PEP8
and Plone's Python styleguide standards.